fc8b48bbf12f23c22e09a6508116349f1e260a85
[charm.git] / src / ck-core / ckmessagelogging.C
1 #include "charm.h"
2 #include "ck.h"
3 #include "ckmessagelogging.h"
4 #include "queueing.h"
5 #include <sys/types.h>
6 #include <signal.h>
7 #include "CentralLB.h"
8
9 #ifdef _FAULT_MLOG_
10
11 //#define DEBUG(x)  if(_restartFlag) {x;}
12 #define DEBUG(x)  //x
13 #define DEBUGRESTART(x)  //x
14 #define DEBUGLB(x) // x
15
16 #define BUFFERED_LOCAL
17 #define BUFFERED_REMOTE
18
19 extern const char *idx2str(const CkArrayIndex &ind);
20 extern const char *idx2str(const ArrayElement *el);
21 const char *idx2str(const CkArrayIndexMax &ind){
22         return idx2str((const CkArrayIndex &)ind);
23 };
24
25 void getGlobalStep(CkGroupID gID);
26
27 bool fault_aware(CkObjID &recver);
28
29 int _restartFlag=0;
30 int restarted=0;
31
32 //TODO: remove for perf runs
33 int countHashRefs=0; //count the number of gets
34 int countHashCollisions=0;
35
36
37 //#define CHECKPOINT_DISK
38 char *checkpointDirectory=".";
39 int unAckedCheckpoint=0;
40
41 int countLocal=0,countBuffered=0;
42 int countPiggy=0;
43 int countClearBufferedLocalCalls=0;
44
45 int countUpdateHomeAcks=0;
46
47 extern int chkptPeriod;
48 extern bool parallelRestart;
49
50 char *killFile;
51 int killFlag=0;
52 int restartingMlogFlag=0;
53 void readKillFile();
54 double killTime=0.0;
55 int checkpointCount=0;
56
57
58 CpvDeclare(Chare *,_currentObj);
59 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
60 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
61 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
62 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
63 CpvDeclare(Queue, _outOfOrderMessageQueue);
64 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
65 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
66 CpvDeclare(char **,_bufferedTicketRequests);
67 CpvDeclare(int *,_numBufferedTicketRequests);
68 CpvDeclare(char *,_bufferTicketReply);
69
70
71
72 static double adjustChkptPeriod=0.0; //in ms
73 static double nextCheckpointTime=0.0;//in seconds
74
75 double lastBufferedLocalMessageCopyTime;
76
77 int _maxBufferedMessages;
78 int _maxBufferedTicketRequests;
79 int BUFFER_TIME=2; // in ms
80
81
82 int _ticketRequestHandlerIdx;
83 int _ticketHandlerIdx;
84 int _localMessageCopyHandlerIdx;
85 int _localMessageAckHandlerIdx;
86 int _pingHandlerIdx;
87 int _bufferedLocalMessageCopyHandlerIdx;
88 int _bufferedLocalMessageAckHandlerIdx;
89 int _bufferedTicketRequestHandlerIdx;
90 int _bufferedTicketHandlerIdx;
91
92
93 char objString[100];
94 int _checkpointRequestHandlerIdx;
95 int _storeCheckpointHandlerIdx;
96 int _checkpointAckHandlerIdx;
97 int _getCheckpointHandlerIdx;
98 int _recvCheckpointHandlerIdx;
99 int _removeProcessedLogHandlerIdx;
100
101 int _verifyAckRequestHandlerIdx;
102 int _verifyAckHandlerIdx;
103 int _dummyMigrationHandlerIdx;
104
105
106 int     _getGlobalStepHandlerIdx;
107 int     _recvGlobalStepHandlerIdx;
108
109 int _updateHomeRequestHandlerIdx;
110 int _updateHomeAckHandlerIdx;
111 int _resendMessagesHandlerIdx;
112 int _resendReplyHandlerIdx;
113 int _receivedTNDataHandlerIdx;
114 int _distributedLocationHandlerIdx;
115
116
117 int verifyAckTotal;
118 int verifyAckCount;
119
120 int verifyAckedRequests=0;
121
122 RestartRequest *storedRequest;
123
124
125
126 int _falseRestart =0; /**
127                                                                                                         For testing on clusters we might carry out restarts on 
128                                                                                                         a porcessor without actually starting it
129                                                                                                         1 -> false restart
130                                                                                                         0 -> restart after an actual crash
131                                                                                                 */                                                                                                                              
132
133 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
134 int _lockNewTicket=0;
135
136
137 //Load balancing globals
138 int onGoingLoadBalancing=0;
139 void *centralLb;
140 void (*resumeLbFnPtr)(void *);
141 int _receiveMlogLocationHandlerIdx;
142 int _receiveMigrationNoticeHandlerIdx;
143 int _receiveMigrationNoticeAckHandlerIdx;
144 int _checkpointBarrierHandlerIdx;
145 int _checkpointBarrierAckHandlerIdx;
146
147 CkVec<MigrationRecord> migratedNoticeList;
148 CkVec<RetainedMigratedObject *> retainedObjectList;
149 int donotCountMigration=0;
150 int countLBMigratedAway=0;
151 int countLBToMigrate=0;
152 int migrationDoneCalled=0;
153 int checkpointBarrierCount=0;
154 int globalResumeCount=0;
155 CkGroupID globalLBID;
156 int restartDecisionNumber=-1;
157
158
159 double lastCompletedAlarm=0;
160 double lastRestart=0;
161
162
163 //update location globals
164 int _receiveLocationHandlerIdx;
165
166
167
168 // initialize message logging datastructures and register handlers
169 void _messageLoggingInit(){
170         //current object
171         CpvInitialize(Chare *,_currentObj);
172         
173         //registering handlers for message logging
174         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
175         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
176         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
177         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
178         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
179         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
180         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
181   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
182         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
183
184                 
185         //handlers for checkpointing
186         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
187         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
188         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
189         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
190
191
192         //handlers for restart
193         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
194         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
195         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
196         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
197         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
198         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
199         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
200         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
201         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
202         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
203         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
204
205         
206         //handlers for load balancing
207         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
208         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
209         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
210         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
211         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
212         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
213         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
214         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
215         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
216
217         
218         //handlers for updating locations
219         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
220         
221         //Cpv variables for message logging
222         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
223         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
224         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
225         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
226         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
227         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
228         CpvInitialize(Queue, _outOfOrderMessageQueue);
229         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
230         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
231         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
232         
233         CpvInitialize(char **,_bufferedTicketRequests);
234         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
235         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
236         for(int i=0;i<CkNumPes();i++){
237                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
238                 CpvAccess(_numBufferedTicketRequests)[i]=0;
239         }
240   CpvInitialize(char *,_bufferTicketReply);
241         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
242         
243 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
244         CcdCallFnAfter(retryTicketRequest,NULL,100);    
245         
246         
247         //Cpv variables for checkpoint
248         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
249         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
250         
251 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
252 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
253 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
254         if(CkMyPe() == 0){
255 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
256 #ifdef  BUFFERED_LOCAL
257                 if(CmiMyPe() == 0){
258                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
259                 }
260 #endif
261         }
262 #ifdef  BUFFERED_REMOTE
263         if(CmiMyPe() == 0){
264                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
265         }
266 #endif
267
268         traceRegisterUserEvent("Remove Logs", 20);
269         traceRegisterUserEvent("Ticket Request Handler", 21);
270         traceRegisterUserEvent("Ticket Handler", 22);
271         traceRegisterUserEvent("Local Message Copy Handler", 23);
272         traceRegisterUserEvent("Local Message Ack Handler", 24);        
273         traceRegisterUserEvent("Preprocess current message",25);
274         traceRegisterUserEvent("Preprocess past message",26);
275         traceRegisterUserEvent("Preprocess future message",27);
276         traceRegisterUserEvent("Checkpoint",28);
277         traceRegisterUserEvent("Checkpoint Store",29);
278         traceRegisterUserEvent("Checkpoint Ack",30);
279         
280         traceRegisterUserEvent("Send Ticket Request",31);
281         traceRegisterUserEvent("Generalticketrequest1",32);
282         traceRegisterUserEvent("TicketLogLocal",33);
283         traceRegisterUserEvent("next_ticket and SN",34);
284         traceRegisterUserEvent("Timeout for buffered remote messages",35);
285         traceRegisterUserEvent("Timeout for buffered local messages",36);
286         traceRegisterUserEvent("Inform Location Home",37);
287         traceRegisterUserEvent("Receive Location Handler",38);
288         
289         lastCompletedAlarm=CmiWallTimer();
290         lastRestart = CmiWallTimer();
291 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
292         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
293 //      printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
294 //      if(killFlag){
295 //              readKillFile();
296 //      }
297 }
298
299 void killLocal(void *_dummy,double curWallTime);        
300
301 void readKillFile(){
302         FILE *fp=fopen(killFile,"r");
303         if(!fp){
304                 return;
305         }
306         int proc;
307         double sec;
308         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
309                 if(proc == CkMyPe()){
310                         killTime = CmiWallTimer()+sec;
311                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
312                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
313                 }
314         }
315         fclose(fp);
316 }
317
318 void killLocal(void *_dummy,double curWallTime){
319         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
320         if(CmiWallTimer()<killTime-1){
321                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
322         }else{  
323                 kill(getpid(),SIGKILL);
324         }
325 }
326
327
328
329 /************************ Message logging methods ****************/
330
331 // send a ticket request to a group on a processor
332 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
333         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
334                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
335                 void *origMsg = EnvToUsr(env);
336                 for(int i=0;i<CmiNumPes();i++){
337                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
338                                 void *copyMsg = CkCopyMsg(&origMsg);
339                                 envelope *copyEnv = UsrToEnv(copyMsg);
340                                 copyEnv->SN=0;
341                                 copyEnv->TN=0;
342                                 copyEnv->sender.type = TypeInvalid;
343                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
344                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
345                         }
346                 }
347                 return;
348         }
349         CkObjID recver;
350         recver.type = TypeGroup;
351         recver.data.group.id = env->getGroupNum();
352         recver.data.group.onPE = destPE;
353 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
354                 CmiPrintStackTrace(0);
355         }*/
356         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
357 }
358
359 //send a ticket request to a nodegroup
360 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
361         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
362                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
363                 void *origMsg = EnvToUsr(env);
364                 for(int i=0;i<CmiNumNodes();i++){
365                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
366                                 void *copyMsg = CkCopyMsg(&origMsg);
367                                 envelope *copyEnv = UsrToEnv(copyMsg);
368                                 copyEnv->SN=0;
369                                 copyEnv->TN=0;
370                                 copyEnv->sender.type = TypeInvalid;
371                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
372                         }
373                 }
374                 return;
375         }
376         CkObjID recver;
377         recver.type = TypeNodeGroup;
378         recver.data.group.id = env->getGroupNum();
379         recver.data.group.onPE = destNode;
380         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
381 }
382
383 //send a ticket request to an array element
384 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
385         CkObjID recver;
386         recver.type = TypeArray;
387         recver.data.array.id = env->getsetArrayMgr();
388         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
389
390         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
391                 char recverString[100],senderString[100];
392                 
393                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
394         }
395
396         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
397 };
398
399 //a method to generate the actual ticket requests for groups,nodegroups or arrays
400 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
401         envelope *env = _env;
402         int resend=0; //is it a resend
403         char recverName[100];
404         double _startTime=CkWallTimer();
405         
406         if(CpvAccess(_currentObj) == NULL){
407 //              CkAssert(0);
408                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
409                 generalCldEnqueue(destPE,env,_infoIdx);
410                 return;
411         }
412         
413         if(env->sender.type == TypeInvalid){
414                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
415                 //Set message logging data in the envelope
416         }else{
417                 envelope *copyEnv = copyEnvelope(env);
418                 env = copyEnv;
419                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
420                 env->SN = 0;
421         }
422         
423         
424         CkObjID &sender = env->sender;
425         env->recver = recver;
426
427         Chare *obj = (Chare *)env->sender.getObject();
428           
429         if(env->SN == 0){
430                 env->SN = obj->mlogData->nextSN(recver);
431         }else{
432                 resend = 1;
433         }
434         
435         
436         char senderString[100];
437 //      if(env->SN != 1){
438                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
439         //      CmiPrintStackTrace(0);
440 /*      }else{
441                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
442         }*/
443                 
444         MlogEntry * mEntry = new MlogEntry(env,destPE,_infoIdx);
445 //      CkPackMessage(&(mEntry->env));
446 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
447         
448         
449
450         _startTime = CkWallTimer();
451         if(destPE == CkMyPe()){
452                 ticketLogLocalMessage(mEntry);
453         }else{
454                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
455 //              traceUserBracketEvent(31,_startTime,CkWallTimer());                     
456         }
457 }
458
459 //method that does the actual send by creating a ticketrequest filling it up and sending it
460 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
461         char recverString[100],senderString[100];
462         envelope *env = entry->env;
463         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
464 /*      envelope *env = entry->env;
465         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
466
467         Chare *obj = (Chare *)entry->env->sender.getObject();
468         if(!resend){
469                 obj->mlogData->addLogEntry(entry);
470         }
471         
472
473 #ifdef BUFFERED_REMOTE
474         //buffer the ticket request 
475         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
476                 //first message to this processor, buffer needs to be created
477                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
478                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
479                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
480         }
481         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
482         //Buffer the ticketrequests
483         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
484         ticketRequest->sender = sender;
485         ticketRequest->recver = recver;
486         ticketRequest->logEntry = entry;
487         ticketRequest->SN = SN;
488         ticketRequest->senderPE = CkMyPe();
489
490         CpvAccess(_numBufferedTicketRequests)[destPE]++;
491         
492         
493         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
494                 sendBufferedTicketRequests(destPE);
495         }else{
496                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
497                         int *checkPE = new int;
498                         *checkPE = destPE;
499                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
500                 }
501         }
502 #else
503
504         TicketRequest ticketRequest;
505         ticketRequest.sender = sender;
506         ticketRequest.recver = recver;
507         ticketRequest.logEntry = entry;
508         ticketRequest.SN = SN;
509         ticketRequest.senderPE = CkMyPe();
510         
511         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
512 //      CmiBecomeImmediate(&ticketRequest);
513         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
514 #endif
515         DEBUG(CmiMemoryCheck());
516 };
517
518 /**
519  * Send the ticket requests buffered for processor PE
520  **/
521 void sendBufferedTicketRequests(int destPE){
522         DEBUG(CmiMemoryCheck());
523         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
524         if(numberRequests == 0){
525                 return;
526         }
527         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
528         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
529         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
530         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
531         header->numberLogs = numberRequests;
532         
533         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
534         CmiSyncSend(destPE,totalSize,(char *)buf);
535         
536         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
537         DEBUG(CmiMemoryCheck());
538 };
539
540 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
541         int destPE = *(int *)_destPE;
542   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
543                 sendBufferedTicketRequests(destPE);
544 //              traceUserEvent(35);
545         }
546         delete (int *)_destPE;
547         DEBUG(CmiMemoryCheck());
548 };
549
550
551
552
553 //get a ticket for a local message and then send a copy to the buddy
554 void ticketLogLocalMessage(MlogEntry *entry){
555         //this method is always in the main thread(not interrupt).. so it should 
556         //never find itself locked out of a newTicket
557 //      CkAssert(_lockNewTicket==0);
558         double _startTime=CkWallTimer();
559         
560 //      _lockNewTicket=1;
561         
562                 DEBUG(CmiMemoryCheck());
563         
564
565         Chare *recverObj = (Chare *)entry->env->recver.getObject();
566         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
567         if(recverObj){
568                 //Consider the case, after a restart when this message has already been allotted a ticket number
569                 // and should get the same one as the old one.
570                 Ticket ticket;
571                 if(recverObj->mlogData->mapTable.numObjects() > 0){
572                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
573                 }else{
574                         ticket.TN = 0;
575                 }
576                 
577                 char senderString[100], recverString[100] ;
578                 
579                 if(ticket.TN == 0){
580                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
581         
582                         if(ticket.TN == 0){
583                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
584                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
585                                 
586         //              _lockNewTicket = 0;
587 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
588                                 return;
589                         }
590                 }       
591                 //TODO: check for the case when an invalid ticket is returned
592                 //TODO: check for OLD or RECEIVED TICKETS
593                 entry->env->TN = ticket.TN;
594                 CkAssert(entry->env->TN > 0);
595                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
596                 
597
598                 sendLocalMessageCopy(entry);
599                 
600                 
601                 DEBUG(CmiMemoryCheck());
602                 entry->unackedLocal = 1;
603                 DEBUG(CmiMemoryCheck());
604                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
605                 DEBUG(CmiMemoryCheck());
606         }else{
607                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
608         }
609         _lockNewTicket=0;
610 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
611 };
612
613
614 void sendLocalMessageCopy(MlogEntry *entry){
615         LocalMessageLog msgLog;
616         msgLog.sender = entry->env->sender;
617         msgLog.recver = entry->env->recver;
618         msgLog.SN = entry->env->SN;
619         msgLog.TN = entry->env->TN;
620         msgLog.entry = entry;
621         msgLog.PE = CkMyPe();
622         
623         char recvString[100];
624         char senderString[100];
625         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
626
627 #ifdef BUFFERED_LOCAL
628         countLocal++;
629         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
630         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
631                 sendBufferedLocalMessageCopy();
632         }else{
633                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
634                         lastBufferedLocalMessageCopyTime = CkWallTimer();
635                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
636                         countClearBufferedLocalCalls++;
637                 }       
638         }
639 #else   
640         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
641         
642         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
643 #endif
644         DEBUG(CmiMemoryCheck());
645 };
646
647
648 void sendBufferedLocalMessageCopy(){
649         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
650         if(numberLogs == 0){
651                 return;
652         }
653         countBuffered++;
654         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
655         void *buf=CmiAlloc(totalSize);
656         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
657         header->numberLogs=numberLogs;
658
659         DEBUG(CmiMemoryCheck());
660         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
661         
662         char *ptr = (char *)buf;
663         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
664         
665         for(int i=0;i<numberLogs;i++){
666                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
667                 memcpy(ptr,&log,sizeof(LocalMessageLog));
668                 ptr = &ptr[sizeof(LocalMessageLog)];
669         }
670
671         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
672
673         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
674         DEBUG(CmiMemoryCheck());
675 };
676
677 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
678         countClearBufferedLocalCalls--;
679         if(countClearBufferedLocalCalls > 10){
680                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
681         }
682         DEBUG(CmiMemoryCheck());
683         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
684         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
685                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
686                         sendBufferedLocalMessageCopy();
687 //                      traceUserEvent(36);
688                 }
689         }
690         DEBUG(CmiMemoryCheck());
691 }
692
693 /****
694         The handler functions
695 *****/
696
697
698 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
699 /**
700  *  If there are any delayed requests, process them first before 
701  *  processing this request
702  * */
703 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
704         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
705         double  _startTime = CkWallTimer();
706         if(CpvAccess(_delayedTicketRequests)->length() > 0){
707                 retryTicketRequest(NULL,_startTime);
708         }
709         _processTicketRequest(ticketRequest);
710         CmiFree(ticketRequest);
711 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
712 }
713 /** Handler used for dealing with a bunch of ticket requests
714  * from one processor. The replies are also bunched together
715  * Does not use _ticketRequestHandler
716  * */
717 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
718         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
719         DEBUG(CmiMemoryCheck());
720         double _startTime = CkWallTimer();
721         if(CpvAccess(_delayedTicketRequests)->length() > 0){
722                 retryTicketRequest(NULL,_startTime);
723         }
724         DEBUG(CmiMemoryCheck());
725   int numRequests = recvdHeader->numberLogs;
726         char *msg = (char *)recvdHeader;
727         msg = &msg[sizeof(BufferedTicketRequestHeader)];
728         int senderPE=((TicketRequest *)msg)->senderPE;
729
730         
731         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
732         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
733         
734         char *ptr = (char *)buf;
735         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
736         header->numberLogs = 0;
737
738         DEBUG(CmiMemoryCheck());
739         
740         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
741         
742         for(int i=0;i<numRequests;i++){
743                 TicketRequest *request = (TicketRequest *)msg;
744                 msg = &msg[sizeof(TicketRequest)];
745                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
746
747                 if(replied){
748                         //the ticket request has been processed and 
749                         //the reply will be stored in the ptr
750                         header->numberLogs++;
751                         ptr = &ptr[sizeof(TicketReply)];
752                 }
753         }
754 /*      if(header->numberLogs == 0){
755                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
756         }*/
757
758         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
759         CmiSyncSend(senderPE,totalSize,(char *)buf);
760         CmiFree(recvdHeader);
761 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
762         DEBUG(CmiMemoryCheck());
763 };
764
765 /**Process the ticket request. 
766  * If it is processed and a reply is being sent 
767  * by this processor return true
768  * else return false.
769  * If a reply buffer is specified put the reply into that
770  * else send the reply
771  * */
772 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
773
774 /*      if(_lockNewTicket){
775                 printf("ddeded %d\n",CkMyPe());
776                 if(CmiIsImmediate(ticketRequest)){
777                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
778                 }
779                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
780                 
781         }else{
782                 _lockNewTicket = 1;
783         }*/
784
785         DEBUG(CmiMemoryCheck());
786         CkObjID sender = ticketRequest->sender;
787         CkObjID recver = ticketRequest->recver;
788         MCount SN = ticketRequest->SN;
789         
790         
791         Chare *recverObj = (Chare *)recver.getObject();
792         
793         
794         char recverName[100];
795         DEBUG(recver.toString(recverName);)
796         if(recverObj == NULL){
797                 int estPE = recver.guessPE();
798                 if(estPE == CkMyPe() || estPE == -1){           
799                         //try to fulfill the request after some time
800                         char senderString[100];
801                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
802                         if(estPE == CkMyPe() && recver.type == TypeArray){
803                                 CkArrayID aid(recver.data.array.id);            
804                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
805                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
806
807                         }
808                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
809                         *delayed = *ticketRequest;
810                         CpvAccess(_delayedTicketRequests)->enq(delayed);
811                         
812                 }else{
813                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
814                         TicketRequest forward = *ticketRequest;
815                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
816                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
817                         
818                         
819                 }
820         DEBUG(CmiMemoryCheck());
821                 return false; // if the receverObj does not exist the ticket request cannot have been 
822                               // processed successfully
823         }else{
824                 char senderString[100];
825                 Ticket ticket;
826                 //check if a ticket for this has been already handed out to an object that used to be local but 
827                 // is no longer so.. need for parallel restart
828                 
829                 if(recverObj->mlogData->mapTable.numObjects() > 0){
830                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
831                 }
832                 
833                 if(ticket.TN == 0){
834                         ticket = recverObj->mlogData->next_ticket(sender,SN);
835                 }
836                 if(ticket.TN > recverObj->mlogData->tProcessed){
837                         ticket.state = NEW_TICKET;
838                 }else{
839                         ticket.state = OLD_TICKET;
840                 }
841                 //TODO: check for the case when an invalid ticket is returned
842                 if(ticket.TN == 0){
843                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
844                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
845                         *delayed = *ticketRequest;
846                         CpvAccess(_delayedTicketRequests)->enq(delayed);
847                         return false;
848                 }
849 /*              if(ticket.TN < SN){ //error state this really should not happen
850                         recver.toString(recverName);
851                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
852                 }*/
853 //              CkAssert(ticket.TN >= SN);
854                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
855
856 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
857     if(reply == NULL){ 
858                         //There is no reply buffer and the ticketreply is going to be 
859                         //sent immediately
860                         TicketReply ticketReply;
861                         ticketReply.request = *ticketRequest;
862                         ticketReply.ticket = ticket;
863                         ticketReply.recverPE = CkMyPe();
864                 
865                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
866 //              CmiBecomeImmediate(&ticketReply);
867                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
868          }else{ // Store ticket reply in the buffer provided
869                  reply->request = *ticketRequest;
870                  reply->ticket = ticket;
871                  reply->recverPE = CkMyPe();
872                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
873                                                          // in case the ticket needs to be forwarded or something
874
875          }
876                 DEBUG(CmiMemoryCheck());
877                 return true;
878         }
879 //      _lockNewTicket=0;
880 };
881
882 inline void _ticketHandler(TicketReply *ticketReply){
883
884         double _startTime = CkWallTimer();
885         DEBUG(CmiMemoryCheck());
886         
887         
888         char senderString[100];
889         CkObjID sender = ticketReply->request.sender;
890         CkObjID recver = ticketReply->request.recver;
891         
892         if(sender.guessPE() != CkMyPe()){               
893                 DEBUG(CkAssert(sender.guessPE()>= 0));
894                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
895         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
896                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
897                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
898 #ifdef BUFFERED_REMOTE
899                 //will be freed by the buffered ticket handler most of the time
900                 //this might lead to a leak just after migration
901                 //when the ticketHandler is directly used without going through the buffered handler
902                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
903 #else
904                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
905 #endif  
906         }else{
907                 char recverName[100];
908                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
909                 MlogEntry *logEntry=NULL;
910                 if(ticketReply->ticket.state & FORWARDED_TICKET){
911                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
912                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
913                         Chare *senderObj = (Chare *)sender.getObject();
914                         if(senderObj){
915                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
916                                 for(int i=0;i<mlog->length();i++){
917                                         MlogEntry *tempEntry = (*mlog)[i];
918                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
919                                                 logEntry = tempEntry;
920                                                 break;
921                                         }
922                                 }
923                                 if(logEntry == NULL){
924 #ifdef BUFFERED_REMOTE
925 #else
926                                         CmiFree(ticketReply);
927 #endif                                  
928                                         return;
929                                 }
930                         }else{
931                                 CmiAbort("This processor thinks it should have the sender\n");
932                         }
933                         ticketReply->ticket.state ^= FORWARDED_TICKET;
934                 }else{
935                         logEntry = ticketReply->request.logEntry;
936                 }
937                 if(logEntry->env->TN <= 0){
938                         //This logEntry has not received a TN earlier
939                         char recverString[100];
940                         logEntry->env->TN = ticketReply->ticket.TN;
941                         logEntry->env->setSrcPe(CkMyPe());
942                         if(ticketReply->ticket.state == NEW_TICKET){
943                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
944                                 if(ticketReply->recverPE != CkMyPe()){
945                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
946                                 }else{
947                                         //It is now a local message use the local message protocol
948                                         sendLocalMessageCopy(logEntry);
949                                 }               
950                         }
951                 }else{
952                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
953                 }
954                 recver.updatePosition(ticketReply->recverPE);
955 #ifdef BUFFERED_REMOTE
956 #else
957                 CmiFree(ticketReply);
958 #endif
959         }
960         CmiMemoryCheck();
961 #ifdef BUFFERED_REMOTE
962 #else
963 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
964 #endif
965         
966 };
967
968 /**
969  * Message to handle the bunch of tickets 
970  * that we get from one processor. We send 
971  * the tickets to be handled one at a time
972  * */
973
974 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
975         double _startTime=CmiWallTimer();
976         int numTickets = recvdHeader->numberLogs;
977         char *msg = (char *)recvdHeader;
978         msg = &msg[sizeof(BufferedTicketRequestHeader)];
979         DEBUG(CmiMemoryCheck());
980         
981         TicketReply *_reply = (TicketReply *)msg;
982
983         
984         for(int i=0;i<numTickets;i++){
985                 TicketReply *reply = (TicketReply *)msg;
986                 _ticketHandler(reply);
987                 
988                 msg = &msg[sizeof(TicketReply)];
989         }
990         
991         CmiFree(recvdHeader);
992 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
993         DEBUG(CmiMemoryCheck());
994 };
995
996
997 void _localMessageCopyHandler(LocalMessageLog *msgLog){
998         double _startTime = CkWallTimer();
999         
1000         char senderString[100],recverString[100];
1001         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1002 /*      if(!fault_aware(msgLog->recver)){
1003                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1004         }*/
1005         CpvAccess(_localMessageLog)->enq(*msgLog);
1006         
1007         LocalMessageLogAck ack;
1008         ack.entry = msgLog->entry;
1009         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1010         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1011         CmiSyncSend(msgLog->PE,sizeof(LocalMessageLogAck),(char *)&ack);
1012         
1013 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1014 };
1015
1016 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1017         double _startTime = CkWallTimer();
1018         DEBUG(CmiMemoryCheck());
1019         
1020         int numLogs = recvdHeader->numberLogs;
1021         char *msg = (char *)recvdHeader;
1022
1023         //piggy back the logs already stored on this processor
1024         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1025         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1026 /*      if(numPiggyLogs > 0){
1027                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1028                         CmiAssert(0);
1029                 }
1030         }*/
1031         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1032         
1033         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1034         void *buf = CmiAlloc(totalSize);
1035         char *ptr = (char *)buf;
1036         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1037         
1038         msg = &msg[sizeof(BufferedLocalLogHeader)];
1039         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1040
1041         DEBUG(CmiMemoryCheck());
1042         int PE;
1043         for(int i=0;i<numLogs;i++){
1044                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1045                 CpvAccess(_localMessageLog)->enq(*msgLog);
1046                 PE = msgLog->PE;
1047                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1048
1049                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1050                 ack->entry = msgLog->entry;
1051                 
1052                 msg = &msg[sizeof(LocalMessageLog)];
1053                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1054         }
1055         DEBUG(CmiMemoryCheck());
1056
1057         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1058         piggyHeader->numberLogs = numPiggyLogs;
1059         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1060         if(numPiggyLogs > 0){
1061                 countPiggy++;
1062         }
1063
1064         for(int i=0;i<numPiggyLogs;i++){
1065                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1066                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1067                 ptr = &ptr[sizeof(LocalMessageLog)];
1068         }
1069         DEBUG(CmiMemoryCheck());
1070         
1071         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1072         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1073                 
1074 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1075                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1076                         if(!fault_aware(localLogEntry.recver)){
1077                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1078                         }
1079         }*/
1080         if(freeHeader){
1081                 CmiFree(recvdHeader);
1082         }
1083         DEBUG(CmiMemoryCheck());
1084 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1085 }
1086
1087
1088 void _localMessageAckHandler(LocalMessageLogAck *ack){
1089         
1090         double _startTime = CkWallTimer();
1091         
1092         MlogEntry *entry = ack->entry;
1093         if(entry == NULL){
1094                 CkExit();
1095         }
1096         entry->unackedLocal = 0;
1097         envelope *env = entry->env;
1098         char recverName[100];
1099         char senderString[100];
1100         DEBUG(CmiMemoryCheck());
1101         
1102         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1103         if(env == NULL)
1104                 return;
1105         CkAssert(env->SN > 0);
1106         CkAssert(env->TN > 0);
1107         env->sender.toString(senderString);
1108         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1109         env->recver.toString(recverName);
1110
1111         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1112         
1113 /*      
1114         void *origMsg = EnvToUsr(env);
1115         void *copyMsg = CkCopyMsg(&origMsg);
1116         envelope *copyEnv = UsrToEnv(copyMsg);
1117         entry->env = UsrToEnv(origMsg);*/
1118
1119 //      envelope *copyEnv = copyEnvelope(env);
1120
1121         envelope *copyEnv = env;
1122         copyEnv->localMlogEntry = entry;
1123
1124         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1125         if(CmiMyPe() != entry->destPE){
1126                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1127         }
1128 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1129         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1130         
1131         
1132 #ifdef BUFFERED_LOCAL
1133 #else
1134         CmiFree(ack);
1135 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1136 #endif
1137         
1138         DEBUG(CmiMemoryCheck());
1139         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1140 }
1141
1142
1143 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1144
1145         double _startTime=CkWallTimer();
1146         DEBUG(CmiMemoryCheck());
1147
1148         int numLogs = recvdHeader->numberLogs;
1149         char *msg = (char *)recvdHeader;
1150         msg = &msg[sizeof(BufferedLocalLogHeader)];
1151
1152         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1153         
1154         for(int i=0;i<numLogs;i++){
1155                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1156                 _localMessageAckHandler(ack);
1157                 
1158                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1159         }
1160
1161         //deal with piggy backed local logs
1162         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1163         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1164         if(piggyHeader->numberLogs > 0){
1165                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1166         }
1167         
1168         CmiFree(recvdHeader);
1169         DEBUG(CmiMemoryCheck());
1170 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1171 }
1172
1173
1174
1175
1176
1177
1178 bool fault_aware(CkObjID &recver){
1179         switch(recver.type){
1180                 case TypeChare:
1181                         return false;
1182                 case TypeGroup:
1183                 case TypeNodeGroup:
1184                 case TypeArray:
1185                         return true;
1186                 default:
1187                         return false;
1188         }
1189 };
1190
1191 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1192         char recverString[100];
1193         char senderString[100];
1194         
1195         DEBUG(CmiMemoryCheck());
1196         CkObjID recver = env->recver;
1197         if(!fault_aware(recver))
1198                 return 1;
1199
1200
1201         Chare *obj = (Chare *)recver.getObject();
1202         *objPointer = obj;
1203         if(obj == NULL){
1204                 int possiblePE = recver.guessPE();
1205                 if(possiblePE != CkMyPe()){
1206                         int totalSize = env->getTotalsize();                    
1207                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1208                 }
1209                 return 0;
1210         }
1211
1212
1213         double _startTime = CkWallTimer();
1214 //env->sender.updatePosition(env->getSrcPe());
1215         if(env->TN == obj->mlogData->tProcessed+1){
1216                 //the message that needs to be processed now
1217                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1218                 // once we find a message that we can process we put back all the messages in the out of order queue
1219                 // back into the main scheduler queue. 
1220                 if(env->sender.guessPE() == CkMyPe()){
1221                         *logEntryPointer = env->localMlogEntry;
1222                 }
1223         DEBUG(CmiMemoryCheck());
1224                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1225                         void *qMsgPtr;
1226                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1227                         envelope *qEnv = (envelope *)qMsgPtr;
1228                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1229         DEBUG(CmiMemoryCheck());
1230                 }
1231 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1232                 //TODO: this might be a problem.. change made for leanMD
1233 //              CpvAccess(_currentObj) = obj;
1234         DEBUG(CmiMemoryCheck());
1235                 return 1;
1236         }
1237         if(env->TN <= obj->mlogData->tProcessed){
1238                 //message already processed
1239                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1240 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1241         DEBUG(CmiMemoryCheck());
1242                 return 0;
1243         }
1244         //message that needs to be processed in the future
1245
1246 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1247         //the message cant be processed now put it back in the out of order message Q, 
1248         //It will be transferred to the main queue later
1249         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1250 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1251         DEBUG(CmiMemoryCheck());
1252         
1253         return 0;
1254 }
1255
1256 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1257         char senderString[100];
1258         if(obj){
1259                 if(sender.guessPE() == CkMyPe()){
1260                         if(entry != NULL){
1261                                 entry->env = NULL;
1262                         }
1263                 }
1264                 obj->mlogData->tProcessed++;
1265 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1266                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1267 //              CpvAccess(_currentObj)= NULL;
1268         }
1269         DEBUG(CmiMemoryCheck());
1270 }
1271
1272 /***
1273         Helpers for the handlers and message logging methods
1274 ***/
1275
1276 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1277 //      double _startTime = CkWallTimer();
1278         if(env->recver.type != TypeNodeGroup){
1279         //This repeats a step performed in skipCldEnq for messages sent to
1280         //other processors. I do this here so that messages to local processors
1281         //undergo the same transformation.. It lets the resend be uniform for 
1282         //all messages
1283 //              CmiSetXHandler(env,CmiGetHandler(env));
1284                 _skipCldEnqueue(destPE,env,_infoIdx);
1285         }else{
1286                 _noCldNodeEnqueue(destPE,env);
1287         }
1288 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1289 }
1290 //extern "C" int CmiGetNonLocalLength();
1291 /** This method is used to retry the ticket requests
1292  * that had been queued up earlier
1293  * */
1294
1295 int calledRetryTicketRequest=0;
1296
1297 void retryTicketRequestTimer(void *_dummy,double _time){
1298                 calledRetryTicketRequest=0;
1299                 retryTicketRequest(_dummy,_time);
1300 }
1301
1302 void retryTicketRequest(void *_dummy,double curWallTime){       
1303         double start = CkWallTimer();
1304         DEBUG(CmiMemoryCheck());
1305         int length = CpvAccess(_delayedTicketRequests)->length();
1306         for(int i=0;i<length;i++){
1307                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1308                 if(ticketRequest){
1309                         char senderString[100],recverString[100];
1310                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1311                         DEBUG(CmiMemoryCheck());
1312                         _processTicketRequest(ticketRequest);
1313                   CmiFree(ticketRequest);
1314                         DEBUG(CmiMemoryCheck());
1315                 }       
1316         }       
1317         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1318                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1319                 ticketLogLocalMessage(entry);
1320         }
1321         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1322 //      int converse_qLength = CmiGetNonLocalLength();
1323         
1324 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1325
1326 /*      PingMsg pingMsg;
1327         pingMsg.PE = CkMyPe();
1328         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1329         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1330                 for(int i=0;i<CkNumPes();i++){
1331                         if(i != CkMyPe()){
1332                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1333                         }
1334                 }
1335         }*/     
1336         //TODO: change this back to 100
1337         if(calledRetryTicketRequest == 0){
1338                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1339                 calledRetryTicketRequest =1;
1340         }
1341         DEBUG(CmiMemoryCheck());
1342 }
1343
1344 void _pingHandler(CkPingMsg *msg){
1345         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1346         CmiFree(msg);
1347 }
1348
1349
1350 /*****************************************************************************
1351         Checkpointing methods..
1352                 Pack all the data on a processor and send it to the buddy periodically
1353                 Also used to throw away message logs
1354 *****************************************************************************/
1355 CkVec<TProcessedLog> processedTicketLog;
1356 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1357 void clearUpMigratedRetainedLists(int PE);
1358
1359 void checkpointAlarm(void *_dummy,double curWallTime){
1360         double diff = curWallTime-lastCompletedAlarm;
1361         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1362 /*      if(CkWallTimer()-lastRestart < 50){
1363                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1364                 return;
1365         }*/
1366         if(diff < ((chkptPeriod) - 2)){
1367                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1368                 return;
1369         }
1370         CheckpointRequest request;
1371         request.PE = CkMyPe();
1372         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1373         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1374 };
1375
1376 void _checkpointRequestHandler(CheckpointRequest *request){
1377         startMlogCheckpoint(NULL,CmiWallTimer());       
1378 }
1379
1380 void startMlogCheckpoint(void *_dummy,double curWallTime){
1381         double _startTime = CkWallTimer();
1382         checkpointCount++;
1383 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1384                 kill(getpid(),SIGKILL);
1385         }*/
1386         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1387                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1388         }
1389         PUP::sizer psizer;
1390         DEBUG(CmiMemoryCheck());
1391
1392         psizer | checkpointCount;
1393         
1394         CkPupROData(psizer);
1395         DEBUG(CmiMemoryCheck());
1396         CkPupGroupData(psizer);
1397         DEBUG(CmiMemoryCheck());
1398         CkPupNodeGroupData(psizer);
1399         DEBUG(CmiMemoryCheck());
1400         pupArrayElementsSkip(psizer,NULL);
1401         DEBUG(CmiMemoryCheck());
1402
1403         int dataSize = psizer.size();
1404         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1405         char *msg = (char *)CmiAlloc(totalSize);
1406         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1407         chkMsg->PE = CkMyPe();
1408         chkMsg->dataSize = dataSize;
1409
1410         
1411         char *buf = &msg[sizeof(CheckPointDataMsg)];
1412         PUP::toMem pBuf(buf);   
1413
1414         pBuf | checkpointCount;
1415         
1416         CkPupROData(pBuf);
1417         CkPupGroupData(pBuf);
1418         CkPupNodeGroupData(pBuf);
1419         pupArrayElementsSkip(pBuf,NULL);
1420
1421         unAckedCheckpoint=1;
1422         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1423         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1424         
1425         /*
1426                 Store the highest Ticket number processed for each chare on this processor
1427         */
1428         processedTicketLog.removeAll();
1429         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1430         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1431                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1432         }
1433
1434         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1435                 lastCompletedAlarm = curWallTime;
1436                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1437         }
1438         traceUserBracketEvent(28,_startTime,CkWallTimer());
1439 };
1440
1441 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1442         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1443         TProcessedLog logEntry;
1444         logEntry.recver = mlogData->objID;
1445         logEntry.tProcessed = mlogData->tProcessed;
1446         log->push_back(logEntry);
1447         char objString[100];
1448         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1449 }
1450
1451 class ElementPacker : public CkLocIterator {
1452 private:
1453         CkLocMgr *locMgr;
1454         PUP::er &p;
1455 public:
1456                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1457                 void addLocation(CkLocation &loc) {
1458                         CkArrayIndexMax idx=loc.getIndex();
1459                         CkGroupID gID = locMgr->ckGetGroupID();
1460                         p|gID;      // store loc mgr's GID as well for easier restore
1461                         p|idx;
1462                         p|loc;
1463     }
1464 };
1465
1466
1467 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1468         int numElements,i;
1469   int numGroups = CkpvAccess(_groupIDTable)->size();    
1470         if(!p.isUnpacking()){
1471                 numElements = CkCountArrayElements();
1472         }       
1473         p | numElements;
1474         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1475         if(!p.isUnpacking()){
1476                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1477         }else{
1478                 //Flush all recs of all LocMgrs before putting in new elements
1479 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1480                 for(int j=0;j<listsize;j++){
1481                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1482                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1483                                 listToSkip[j].idx.print();
1484                         }
1485                 }
1486                 
1487  printf("numElements = %d\n",numElements);
1488         
1489           for (int i=0; i<numElements; i++) {
1490                         CkGroupID gID;
1491                         CkArrayIndexMax idx;
1492                         p|gID;
1493             p|idx;
1494                         int flag=0;
1495                         int matchedIdx=0;
1496                         for(int j=0;j<listsize;j++){
1497                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1498                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1499                                                 matchedIdx = j;
1500                                                 flag = 1;
1501                                                 break;
1502                                         }
1503                                 }
1504                         }
1505                         if(flag == 1){
1506                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1507                         }else{
1508                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1509                         }
1510                                 
1511                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1512                         mgr->resume(idx,p,flag);
1513                         if(flag == 1){
1514                                 int homePE = mgr->homePe(idx);
1515                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1516                         }
1517           }
1518         }
1519 };
1520
1521
1522 void writeCheckpointToDisk(int size,char *chkpt){
1523         char fNameTemp[100];
1524         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1525         int fd = creat(fNameTemp,S_IRWXU);
1526         int ret = write(fd,chkpt,size);
1527         CkAssert(ret == size);
1528         close(fd);
1529         
1530         char fName[100];
1531         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1532         unlink(fName);
1533
1534         rename(fNameTemp,fName);
1535         
1536 }
1537
1538 //handler that receives the checkpoint from a processor
1539 //it stores it and acks it
1540 void _storeCheckpointHandler(char *msg){
1541         
1542         double _startTime=CkWallTimer();
1543                 
1544         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1545         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1546         
1547         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1548         
1549         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1550         if(oldChkpt != NULL){
1551                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1552                 CmiFree(oldmsg);
1553         }
1554         //turning off storing checkpoints
1555         
1556         int sendingPE = chkMsg->PE;
1557         
1558         CpvAccess(_storedCheckpointData)->buf = chkpt;
1559         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1560         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1561
1562 #ifdef CHECKPOINT_DISK
1563         //store the checkpoint on disk
1564         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1565         CpvAccess(_storedCheckpointData)->buf = NULL;
1566         CmiFree(msg);
1567 #endif
1568
1569         int count=0;
1570         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1571                 if(migratedNoticeList[j].fromPE == sendingPE){
1572                         migratedNoticeList[j].ackFrom = 1;
1573                 }else{
1574                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1575                 }
1576                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1577                         migratedNoticeList.remove(j);
1578                         count++;
1579                 }
1580                 
1581         }
1582         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1583         
1584         CheckPointAck ackMsg;
1585         ackMsg.PE = CkMyPe();
1586         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1587         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1588         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1589         
1590         
1591         
1592         traceUserBracketEvent(29,_startTime,CkWallTimer());
1593 };
1594
1595
1596 void sendRemoveLogRequests(){
1597         double _startTime = CkWallTimer();      
1598         //send out the messages asking senders to throw away message logs below a certain ticket number
1599         /*
1600                 The remove log request message looks like
1601                 |RemoveLogRequest||List of TProcessedLog|
1602         */
1603         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1604         char *requestMsg = (char *)CmiAlloc(totalSize);
1605         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1606         request->PE = CkMyPe();
1607         request->numberObjects = processedTicketLog.size();
1608         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1609         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1610         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1611         
1612         DEBUG(CmiMemoryCheck());
1613         for(int i=0;i<CkNumPes();i++){
1614                 CmiSyncSend(i,totalSize,requestMsg);
1615         }
1616         CmiFree(requestMsg);
1617
1618         clearUpMigratedRetainedLists(CmiMyPe());
1619         //TODO: clear ticketTable
1620         
1621         traceUserBracketEvent(30,_startTime,CkWallTimer());
1622         DEBUG(CmiMemoryCheck());
1623 }
1624
1625
1626 void _checkpointAckHandler(CheckPointAck *ackMsg){
1627         DEBUG(CmiMemoryCheck());
1628         unAckedCheckpoint=0;
1629         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1630         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1631         if(onGoingLoadBalancing){
1632                 onGoingLoadBalancing = 0;
1633                 finishedCheckpointLoadBalancing();
1634         }else{
1635                 sendRemoveLogRequests();
1636         }
1637         CmiFree(ackMsg);
1638         
1639 };
1640
1641 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1642         DEBUG(CmiMemoryCheck());
1643         CmiMemoryCheck();
1644         char *data = (char *)_data;
1645         RemoveLogRequest *request = (RemoveLogRequest *)data;
1646         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1647         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1648
1649         int count=0;
1650         for(int i=0;i<mlog->length();i++){
1651                 MlogEntry *logEntry = mlog->deq();
1652                 int match=0;
1653                 for(int j=0;j<request->numberObjects;j++){
1654                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1655                                 //this log Entry should be removed
1656                                 match = 1;
1657                                 break;
1658                         }
1659                 }
1660                 char senderString[100],recverString[100];
1661 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1662                 if(match){
1663                         count++;
1664                         delete logEntry;
1665                 }else{
1666                         mlog->enq(logEntry);
1667                 }
1668         }
1669         if(count > 0){
1670                 char nameString[100];
1671                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1672         }
1673         DEBUG(CmiMemoryCheck());
1674         CmiMemoryCheck();
1675 };
1676
1677 void _removeProcessedLogHandler(char *requestMsg){
1678         double start = CkWallTimer();
1679         forAllCharesDo(removeProcessedLogs,requestMsg);
1680         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1681         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1682         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1683         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1684         if(request->PE == getCheckPointPE()){
1685                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1686                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1687                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1688                 int count=0;
1689 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1690                 DEBUG(char nameString[100];)
1691                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1692                 DEBUG(})*/
1693                 for(int i=0;i<localQ->length();i++){
1694                         LocalMessageLog localLogEntry = (*localQ)[i];
1695                         if(!fault_aware(localLogEntry.recver)){
1696                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1697                         }
1698                         bool keep = true;
1699                         for(int j=0;j<request->numberObjects;j++){                              
1700                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1701                                         keep = false;
1702                                         break;
1703                                 }
1704                         }       
1705                         if(keep){
1706                                 tempQ->enq(localLogEntry);
1707                         }else{
1708                                 count++;
1709                         }
1710                 }
1711                 delete localQ;
1712                 CpvAccess(_localMessageLog) = tempQ;
1713                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1714         }
1715
1716         /*
1717                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1718         */
1719         CmiMemoryCheck();
1720         clearUpMigratedRetainedLists(request->PE);
1721         
1722         traceUserBracketEvent(20,start,CkWallTimer());
1723         CmiFree(requestMsg);    
1724 };
1725
1726
1727 void clearUpMigratedRetainedLists(int PE){
1728         int count=0;
1729         CmiMemoryCheck();
1730         
1731         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1732                 if(migratedNoticeList[j].toPE == PE){
1733                         migratedNoticeList[j].ackTo = 1;
1734                 }
1735                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1736                         migratedNoticeList.remove(j);
1737                         count++;
1738                 }
1739         }
1740         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1741         
1742         for(int j=retainedObjectList.size()-1;j>=0;j--){
1743                 if(retainedObjectList[j]->migRecord.toPE == PE){
1744                         RetainedMigratedObject *obj = retainedObjectList[j];
1745                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1746                         retainedObjectList.remove(j);
1747                         if(obj->msg != NULL){
1748                                 CmiMemoryCheck();
1749                                 CmiFree(obj->msg);
1750                         }
1751                         delete obj;
1752                 }
1753         }
1754 }
1755
1756 /***************************************************************
1757         Restart Methods and handlers
1758 ***************************************************************/        
1759
1760 /**
1761  * Function for restarting an object with message logging
1762  */
1763 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1764         printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1765         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1766         _restartFlag = 1;
1767         RestartRequest msg;
1768         msg.PE = CkMyPe();
1769         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1770         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1771 };
1772
1773 void CkMlogRestartDouble(void *,double){
1774         CkMlogRestart(NULL,NULL);
1775 };
1776
1777
1778 void readCheckpointFromDisk(int size,char *buf){
1779         char fName[100];
1780         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1781
1782         int fd = open(fName,O_RDONLY);
1783         int count=0;
1784         while(count < size){
1785                 count += read(fd,&buf[count],size-count);
1786         }
1787         close(fd);
1788         
1789 };
1790
1791
1792 void sendCheckpointData();
1793
1794 void _getCheckpointHandler(RestartRequest *restartMsg){
1795         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1796         CkAssert(restartMsg->PE == storedChkpt->PE);
1797         storedRequest = restartMsg;
1798         
1799         verifyAckTotal = 0;
1800         for(int i=0;i<migratedNoticeList.size();i++){
1801                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1802 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1803                         if(migratedNoticeList[i].ackFrom == 0){
1804                                 //need to verify if the object actually exists .. it might not
1805                                 //have been acked but it might exist on it
1806                                 VerifyAckMsg msg;
1807                                 msg.migRecord = migratedNoticeList[i];
1808                                 msg.index = i;
1809                                 msg.fromPE = CmiMyPe();
1810                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1811                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1812                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1813                                 verifyAckTotal++;
1814                         }
1815                 }
1816         }
1817         
1818         if(verifyAckTotal == 0){
1819                 sendCheckpointData();
1820         }
1821         verifyAckCount = 0;
1822 }
1823
1824
1825 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1826         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1827         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1828         if(rec != NULL && rec->type() == CkLocRec::local){
1829                         //this location exists on this processor
1830                         //and needs to be removed       
1831                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1832                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1833                         
1834                         int localIdx = localRec->getLocalIndex();
1835                         LBDatabase *lbdb = localRec->getLBDB();
1836                         LDObjHandle ldHandle = localRec->getLdHandle();
1837                                 
1838                         locMgr->setDuringMigration(true);
1839                         
1840                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1841                         lbdb->UnregisterObj(ldHandle);
1842                         
1843                         locMgr->setDuringMigration(false);
1844                         
1845                         verifyAckedRequests++;
1846
1847         }
1848         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1849         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1850 };
1851
1852
1853 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1854         int index =     verifyReply->index;
1855         migratedNoticeList[index] = verifyReply->migRecord;
1856         verifyAckCount++;
1857         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1858         if(verifyAckCount == verifyAckTotal){
1859                 sendCheckpointData();
1860         }
1861 }
1862
1863
1864
1865 void sendCheckpointData(){      
1866         RestartRequest *restartMsg = storedRequest;
1867         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1868         int numMigratedAwayElements = migratedNoticeList.size();
1869         if(migratedNoticeList.size() != 0){
1870                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1871 //                      CkAssert(migratedNoticeList.size() == 0);
1872         }
1873         
1874         
1875         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1876         
1877         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1878         
1879         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
1880         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
1881         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1882         
1883         char *msg = (char *)CmiAlloc(totalSize);
1884         
1885         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1886         dataMsg->PE = CkMyPe();
1887         dataMsg->restartWallTime = CmiTimer();
1888         dataMsg->checkPointSize = storedChkpt->bufSize;
1889         
1890         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1891 //      dataMsg->numMigratedAwayElements = 0;
1892         
1893         dataMsg->numMigratedInElements = 0;
1894         dataMsg->migratedElementSize = 0;
1895         dataMsg->lbGroupID = globalLBID;
1896         /*msg layout 
1897                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1898                 Local MessageLog|
1899         */
1900         //store checkpoint data
1901         char *buf = &msg[sizeof(RestartProcessorData)];
1902
1903         if(dataMsg->numMigratedAwayElements != 0){
1904                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1905                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1906         }
1907         
1908
1909 #ifdef CHECKPOINT_DISK
1910         readCheckpointFromDisk(storedChkpt->bufSize,buf);
1911 #else   
1912         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1913 #endif
1914         buf = &buf[storedChkpt->bufSize];
1915
1916
1917         //store localmessage Log
1918         dataMsg->numLocalMessages = localMsgQ->length();
1919         for(int i=0;i<localMsgQ->length();i++){
1920                 if(!fault_aware(((*localMsgQ)[i]).recver )){
1921                         CmiAbort("Non fault aware localMsgQ");
1922                 }
1923                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
1924                 buf = &buf[sizeof(LocalMessageLog)];
1925         }
1926         
1927         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1928         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1929         CmiFree(restartMsg);
1930 };
1931
1932
1933 // this list is used to create a vector of the object ids of all
1934 //the chares on this processor currently and the highest TN processed by them 
1935 //the first argument is actually a CkVec<TProcessedLog> *
1936 void createObjIDList(void *data,ChareMlogData *mlogData){
1937         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
1938         TProcessedLog entry;
1939         entry.recver = mlogData->objID;
1940         entry.tProcessed = mlogData->tProcessed;
1941         list->push_back(entry);
1942         char objString[100];
1943         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
1944 }
1945
1946
1947
1948 void _recvCheckpointHandler(char *_restartData){
1949         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1950         MigrationRecord *migratedAwayElements;
1951
1952         globalLBID = restartData->lbGroupID;
1953         
1954         restartData->restartWallTime *= 1000;
1955         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1956         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1957         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1958
1959         
1960         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1961         char *buf = &_restartData[sizeof(RestartProcessorData)];
1962         
1963         if(restartData->numMigratedAwayElements != 0){
1964                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1965                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1966                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1967                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1968         }
1969         
1970         PUP::fromMem pBuf(buf);
1971
1972         pBuf | checkpointCount;
1973
1974         CkPupROData(pBuf);
1975         CkPupGroupData(pBuf);
1976         CkPupNodeGroupData(pBuf);
1977 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1978         pupArrayElementsSkip(pBuf,NULL);
1979         CkAssert(pBuf.size() == restartData->checkPointSize);
1980         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1981         
1982         forAllCharesDo(initializeRestart,NULL);
1983         
1984         //store the restored local message log in a vector
1985         buf = &buf[restartData->checkPointSize];        
1986         for(int i=0;i<restartData->numLocalMessages;i++){
1987                 LocalMessageLog logEntry;
1988                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
1989                 
1990                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
1991                 if(recverObj!=NULL){
1992                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
1993                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
1994                         char senderString[100];
1995                         char recverString[100];
1996                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
1997                 }else{
1998 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
1999                 }
2000                 buf = &buf[sizeof(LocalMessageLog)];
2001         }
2002
2003         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2004
2005         CmiFree(_restartData);
2006         
2007         
2008         _initDone();
2009
2010         getGlobalStep(globalLBID);
2011         
2012         countUpdateHomeAcks = 0;
2013         RestartRequest updateHomeRequest;
2014         updateHomeRequest.PE = CmiMyPe();
2015         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2016         for(int i=0;i<CmiNumPes();i++){
2017                 if(i != CmiMyPe()){
2018                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2019                 }
2020         }
2021
2022 }
2023
2024 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2025         countUpdateHomeAcks++;
2026         CmiFree(updateHomeAck);
2027         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2028         if(countUpdateHomeAcks != CmiNumPes()){
2029                 return;
2030         }
2031
2032         // Send out the request to resend logged messages to all other processors
2033         CkVec<TProcessedLog> objectVec;
2034         forAllCharesDo(createObjIDList, (void *)&objectVec);
2035         int numberObjects = objectVec.size();
2036         
2037         /*
2038                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2039         */
2040         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2041         char *resendMsg = (char *)CmiAlloc(totalSize);
2042         
2043
2044         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2045         resendReq->PE =CkMyPe(); 
2046         resendReq->numberObjects = numberObjects;
2047         char *objList = &resendMsg[sizeof(ResendRequest)];
2048         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2049         
2050
2051         
2052
2053         /* test for parallel restart migrate away object**/
2054         if(parallelRestart){
2055                 distributeRestartedObjects();
2056                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2057         }
2058         
2059         /*      To make restart work for load balancing.. should only
2060         be used when checkpoint happens along with load balancing
2061         **/
2062 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2063
2064         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2065         CpvAccess(_currentObj) = lb;
2066         lb->ReceiveDummyMigration(restartDecisionNumber);
2067
2068         sleep(10);
2069         
2070         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2071         for(int i=0;i<CkNumPes();i++){
2072                 if(i != CkMyPe()){
2073                         CmiSyncSend(i,totalSize,resendMsg);
2074                 }       
2075         }
2076         _resendMessagesHandler(resendMsg);
2077         CmiFree(resendMsg);
2078 };
2079
2080 void initializeRestart(void *data,ChareMlogData *mlogData){
2081         mlogData->resendReplyRecvd = 0;
2082         mlogData->receivedTNs = new CkVec<MCount>;
2083         mlogData->restartFlag = 1;
2084         mlogData->restoredLocalMsgLog.removeAll();
2085         mlogData->mapTable.empty();
2086 };
2087
2088
2089 void updateHomePE(void *data,ChareMlogData *mlogData){
2090         RestartRequest *updateRequest = (RestartRequest *)data;
2091         int PE = updateRequest->PE; //restarted PE
2092         //if this object is an array Element and its home is the restarted processor
2093         // the home processor needs to know its current location
2094         if(mlogData->objID.type == TypeArray){
2095                 //it is an array element
2096                 CkGroupID myGID = mlogData->objID.data.array.id;
2097                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2098                 CkArrayID aid(mlogData->objID.data.array.id);           
2099                 //check if the restarted processor is the home processor for this object
2100                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2101                 if(locMgr->homePe(myIdx) == PE){
2102                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2103                         DEBUGRESTART(myIdx.print());
2104                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2105                 }
2106         }
2107 };
2108
2109
2110 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2111         
2112         int sender = updateRequest->PE;
2113         
2114         forAllCharesDo(updateHomePE,updateRequest);
2115         
2116         updateRequest->PE = CmiMyPe();
2117         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2118         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2119         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2120                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2121                 checkpointCount--;
2122                 startMlogCheckpoint(NULL,0);
2123         }
2124         if(sender == getCheckPointPE()){
2125                 for(int i=0;i<retainedObjectList.size();i++){
2126                         if(retainedObjectList[i]->acked == 0){
2127                                 MigrationNotice migMsg;
2128                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2129                                 migMsg.record = retainedObjectList[i];
2130                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2131                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2132                         }
2133                 }
2134
2135         }
2136 }
2137
2138
2139 //the data argument is of type ResendData which contains the 
2140 //array of objects on  the restartedProcessor
2141 //this method resends the messages stored in this chare's message log 
2142 //to the restarted processor. It also accumulates the maximum TN
2143 //for all the objects on the restarted processor
2144 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2145         char nameString[100];
2146         ResendData *resendData = (ResendData *)data;
2147         int PE = resendData->PE; //restarted PE
2148         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2149         int count=0;
2150         int ticketRequests=0;
2151         CkQ<MlogEntry *> *log = mlogData->getMlog();
2152
2153         
2154         
2155         
2156         for(int i=0;i<log->length();i++){
2157                 MlogEntry *logEntry = (*log)[i];
2158                 
2159                 // if we sent out the logs of a local message to buddy and he crashed
2160                 //before acking
2161                 envelope *env = logEntry->env;
2162                 if(env == NULL){
2163                         continue;
2164                 }
2165                 if(logEntry->unackedLocal){
2166                         char recverString[100];
2167                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2168                         sendLocalMessageCopy(logEntry);
2169                 }
2170                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2171                 if(env->TN <= 0){
2172                         //ticket not yet replied send it out again
2173                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2174                 }
2175                 
2176                 if(env->recver.type != TypeInvalid){
2177                         int flag = 0;//marks if any of the restarted objects matched this log entry
2178                         for(int j=0;j<resendData->numberObjects;j++){
2179                                 if(env->recver == (resendData->listObjects)[j].recver){
2180                                         flag = 1;
2181                                         //message has a valid TN
2182                                         if(env->TN > 0){
2183                                                 //store maxTicket
2184                                                 if(env->TN > resendData->maxTickets[j]){
2185                                                         resendData->maxTickets[j] = env->TN;
2186                                                 }
2187                                                 //if the TN for this entry is more than the TN processed, send the message out
2188                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2189                                                         //store the TNs that have been since the recver last checkpointed
2190                                                         resendData->ticketVecs[j].push_back(env->TN);
2191                                                         
2192                                                         if(PE != CkMyPe()){
2193                                                                 if(env->recver.type == TypeNodeGroup){
2194                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2195                                                                 }else{
2196                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2197                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2198                                                                 }
2199                                                         }else{
2200                                                                 envelope *copyEnv = copyEnvelope(env);
2201                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2202                                                         }
2203                                                         char senderString[100];
2204                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2205                                                         count++;
2206                                                 }       
2207                                         }else{
2208 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2209                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2210                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2211                                                 CkAssert(logEntry->destPE != CkMyPe());
2212                                                 
2213                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2214                                                 
2215                                                 ticketRequests++;*/
2216                                         }
2217                                 }
2218                         }//end of for loop of objects
2219                         
2220                 }       
2221         }
2222         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2223 }
2224
2225 void _resendMessagesHandler(char *msg){
2226         ResendRequest *resendReq = (ResendRequest *)msg;
2227         char *listObjects = &msg[sizeof(ResendRequest)];
2228         ResendData d;
2229         d.numberObjects = resendReq->numberObjects;
2230         d.PE = resendReq->PE;
2231         d.listObjects = (TProcessedLog *)listObjects;
2232         d.maxTickets = new MCount[d.numberObjects];
2233         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2234         for(int i=0;i<d.numberObjects;i++){
2235                 d.maxTickets[i] = 0;
2236         }
2237
2238         //Check if any of the retained objects need to be recreated
2239         //If they have not been recreated on the restarted processor
2240         //they need to be recreated on this processor
2241         int count=0;
2242         for(int i=0;i<retainedObjectList.size();i++){
2243                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2244                         count++;
2245                         int recreate=1;
2246                         for(int j=0;j<d.numberObjects;j++){
2247                                 if(d.listObjects[j].recver.type != TypeArray ){
2248                                         continue;
2249                                 }
2250                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2251                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2252                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2253                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2254                                                 recreate = 0;
2255                                                 break;
2256                                         }
2257                                 }
2258                         }
2259                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2260                         if(recreate){
2261                                 donotCountMigration=1;
2262                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2263                                 donotCountMigration=0;
2264                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2265                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2266                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2267
2268                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2269                                 
2270                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2271                                 CmiAssert(rec->type() == CkLocRec::local);
2272                                 CkVec<CkMigratable *> eltList;
2273                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2274                                 for(int j=0;j<eltList.size();j++){
2275                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2276                                                 CpvAccess(_currentObj) = eltList[j];
2277                                                 eltList[j]->ResumeFromSync();
2278                                         }
2279                                 }
2280
2281
2282                                 
2283                                 retainedObjectList[i]->msg=NULL;
2284                                 
2285                                         
2286                         }
2287                 }
2288         }
2289         
2290         if(count > 0){
2291 //              CmiAbort("retainedObjectList for restarted processor not empty");
2292         }
2293
2294
2295         
2296         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2297         forAllCharesDo(resendMessageForChare,&d);
2298
2299         //send back the maximum ticket number for a message sent to each object on the 
2300         //restarted processor
2301         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2302         
2303         int totalTNStored=0;
2304         for(int i=0;i<d.numberObjects;i++){
2305                 totalTNStored += d.ticketVecs[i].size();
2306         }
2307         
2308         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2309         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2310         
2311         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2312         resendReply->PE = CkMyPe();
2313         resendReply->numberObjects = d.numberObjects;
2314         
2315         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2316         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2317         for(int i=0;i<d.numberObjects;i++){
2318                 replyObjects[i] = d.listObjects[i].recver;
2319         }
2320         
2321         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2322         for(int i=0;i<d.numberObjects;i++){
2323                 int vecsize = d.ticketVecs[i].size();
2324                 memcpy(ticketList,&vecsize,sizeof(int));
2325                 ticketList = &ticketList[sizeof(int)];
2326                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2327                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2328         }       
2329
2330         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2331         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2332         
2333 /*      
2334         if(verifyAckRequestsUnacked){
2335                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2336                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2337                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2338                         LDObjHandle h;
2339                         lb->Migrated(h,1);
2340                 }
2341         }
2342         
2343         verifyAckRequestsUnacked=0;*/
2344
2345
2346         
2347         delete [] d.maxTickets;
2348         delete [] d.ticketVecs;
2349         if(resendReq->PE != CkMyPe()){
2350                 CmiFree(msg);
2351         }       
2352 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2353         lastRestart = CmiWallTimer();
2354 }
2355
2356 void sortVec(CkVec<MCount> *TNvec);
2357 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2358
2359 void _resendReplyHandler(char *msg){    
2360         /**
2361                 need to rewrite this method to deal with parallel restart
2362         */
2363         ResendRequest *resendReply = (ResendRequest *)msg;
2364         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2365
2366         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2367         
2368         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2369         for(int i =0; i< resendReply->numberObjects;i++){       
2370                 Chare *obj = (Chare *)listObjects[i].getObject();
2371                 
2372                 int vecsize;
2373                 memcpy(&vecsize,listTickets,sizeof(int));
2374                 listTickets = &listTickets[sizeof(int)];
2375                 MCount *listTNs = (MCount *)listTickets;
2376                 
2377                 
2378                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2379                 
2380                 if(obj != NULL){
2381                         //the object was restarted on the processor on which it existed
2382                         processReceivedTN(obj,vecsize,listTNs);
2383                 }else{
2384                 //pack up objID vecsize and listTNs and send it to the correct processor
2385                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2386                         char *TNMsg = (char *)CmiAlloc(totalSize);
2387                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2388                         receivedTNData->recver = listObjects[i];
2389                         receivedTNData->numTNs = vecsize;
2390                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2391                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2392
2393                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2394                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2395                 }
2396                 
2397         }
2398 };
2399
2400 void _receivedTNDataHandler(ReceivedTNData *msg){
2401         char objName[100];
2402         Chare *obj = (Chare *) msg->recver.getObject();
2403         if(obj){                
2404                 char *_msg = (char *)msg;
2405                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2406                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2407                 processReceivedTN(obj,msg->numTNs,listTNs);
2408         }else{
2409                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2410                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2411         }
2412 };
2413
2414
2415 void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
2416         obj->mlogData->resendReplyRecvd++;
2417
2418         
2419         for(int j=0;j<listSize;j++){
2420                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2421         }
2422         
2423         //if this object has received all the replies find the ticket numbers
2424         //that senders know about. Those less than the ticket number processed 
2425         //by the receiver can be thrown away. The rest need not be consecutive
2426         // ie there can be holes in the list of ticket numbers seen by senders
2427         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2428                 obj->mlogData->resendReplyRecvd = 0;
2429                 //sort the received TNS
2430                 sortVec(obj->mlogData->receivedTNs);
2431         
2432                 //after all the received tickets are in we need to sort them and then 
2433                 // calculate the holes
2434                 
2435                 if(obj->mlogData->receivedTNs->size() > 0){
2436                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2437                         int vecsize = obj->mlogData->receivedTNs->size();
2438                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2439                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2440                         if(numberHoles == 0){
2441                         }else{
2442                                 char objName[100];                                      
2443                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2444                                 obj->mlogData->numberHoles = numberHoles;
2445                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2446                                 int countHoles=0;
2447                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2448                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2449                                                 //the TNs are not consecutive at this point
2450                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2451                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2452                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2453                                                         countHoles++;
2454                                                 }       
2455                                         }
2456                                 }
2457                                 //Holes have been given new TN
2458                                 if(countHoles != numberHoles){
2459                                         char str[100];
2460                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2461                                 }
2462                                 CkAssert(countHoles == numberHoles);                                    
2463                                 obj->mlogData->currentHoles = numberHoles;
2464                         }
2465                 }       
2466                 
2467                 delete obj->mlogData->receivedTNs;
2468                 obj->mlogData->receivedTNs = NULL;
2469                 obj->mlogData->restartFlag = 0;
2470                 char objString[100];
2471                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2472         }
2473
2474 }
2475
2476
2477 void sortVec(CkVec<MCount> *TNvec){
2478         //sort it ->its bloddy bubble sort
2479         //TODO: use quicksort
2480         for(int i=0;i<TNvec->size();i++){
2481                 for(int j=i+1;j<TNvec->size();j++){
2482                         if((*TNvec)[j] < (*TNvec)[i]){
2483                                 MCount temp;
2484                                 temp = (*TNvec)[i];
2485                                 (*TNvec)[i] = (*TNvec)[j];
2486                                 (*TNvec)[j] = temp;
2487                         }
2488                 }
2489         }
2490         //make it unique .. since its sorted all equal units will be consecutive
2491         MCount *tempArray = new MCount[TNvec->size()];
2492         int     uniqueCount=-1;
2493         for(int i=0;i<TNvec->size();i++){
2494                 tempArray[i] = 0;
2495                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2496                         uniqueCount++;
2497                         tempArray[uniqueCount] = (*TNvec)[i];
2498                 }
2499         }
2500         uniqueCount++;
2501         TNvec->removeAll();
2502         for(int i=0;i<uniqueCount;i++){
2503                 TNvec->push_back(tempArray[i]);
2504         }
2505         delete [] tempArray;
2506 }       
2507
2508 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2509         if(TNVec->size() == 0){
2510                 return -1; //not found in an empty vec
2511         }
2512         //binary search to find 
2513         int left=0;
2514         int right = TNVec->size();
2515         int mid = (left +right)/2;
2516         while(searchTN != (*TNVec)[mid] && left < right){
2517                 if((*TNVec)[mid] > searchTN){
2518                         right = mid-1;
2519                 }else{
2520                         left = mid+1;
2521                 }
2522                 mid = (left + right)/2;
2523         }
2524         if(left < right){
2525                 //mid is the element to be returned
2526                 return mid;
2527         }else{
2528                 if(mid < TNVec->size() && mid >=0){
2529                         if((*TNVec)[mid] == searchTN){
2530                                 return mid;
2531                         }else{
2532                                 return -1;
2533                         }
2534                 }else{
2535                         return -1;
2536                 }
2537         }
2538 };
2539
2540
2541 /*
2542         Method to do parallel restart. Distribute some of the array elements to other processors.
2543         The problem is that we cant use to charm entry methods to do migration as it will get
2544         stuck in the protocol that is going to restart
2545 */
2546
2547 class ElementDistributor: public CkLocIterator{
2548         CkLocMgr *locMgr;
2549         int *targetPE;
2550         void pupLocation(CkLocation &loc,PUP::er &p){
2551                 CkArrayIndexMax idx=loc.getIndex();
2552                 CkGroupID gID = locMgr->ckGetGroupID();
2553                 p|gID;      // store loc mgr's GID as well for easier restore
2554                 p|idx;
2555                 p|loc;
2556         };
2557         public:
2558                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2559                 void addLocation(CkLocation &loc){
2560                         if(*targetPE == CkMyPe()){
2561                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2562                                 return;
2563                         }
2564                         
2565                         CkArrayIndexMax idx=loc.getIndex();
2566                         CkLocRec_local *rec = loc.getLocalRecord();
2567                         
2568                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2569                         idx.print();
2570                         
2571
2572                         //TODO: an element that is being moved should leave some trace behind so that
2573                         // the arraybroadcaster can forward messages to it
2574                         
2575                         //pack up this location and send it across
2576                         PUP::sizer psizer;
2577                         pupLocation(loc,psizer);
2578                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2579                         char *msg = (char *)CmiAlloc(totalSize);
2580                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2581                         PUP::toMem pmem(buf);
2582                         pmem.becomeDeleting();
2583                         pupLocation(loc,pmem);
2584                         
2585                         locMgr->setDuringMigration(CmiTrue);                    
2586                         delete rec;
2587                         locMgr->setDuringMigration(CmiFalse);                   
2588                         locMgr->inform(idx,*targetPE);
2589
2590                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2591                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2592
2593                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2594                         //decide on the target processor for the next object
2595                         *targetPE = (*targetPE +1)%CkNumPes();
2596                 }
2597                 
2598 };
2599
2600 void distributeRestartedObjects(){
2601         int numGroups = CkpvAccess(_groupIDTable)->size();      
2602         int i;
2603         int targetPE=CkMyPe();
2604         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2605 };
2606
2607 void _distributedLocationHandler(char *receivedMsg){
2608         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2609         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2610         PUP::fromMem pmem(buf);
2611         CkGroupID gID;
2612         CkArrayIndexMax idx;
2613         pmem |gID;
2614         pmem |idx;
2615         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2616         donotCountMigration=1;
2617         mgr->resume(idx,pmem);
2618         donotCountMigration=0;
2619         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2620         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2621         idx.print();
2622
2623         CkLocRec *rec = mgr->elementRec(idx);
2624         CmiAssert(rec->type() == CkLocRec::local);
2625         
2626         CkVec<CkMigratable *> eltList;
2627         mgr->migratableList((CkLocRec_local *)rec,eltList);
2628         for(int i=0;i<eltList.size();i++){
2629                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2630                         CpvAccess(_currentObj) = eltList[i];
2631                         eltList[i]->ResumeFromSync();
2632                 }
2633         }
2634         
2635         
2636 }
2637
2638
2639 /** this method is used to send messages to a restarted processor to tell
2640  * it that a particular expected object is not going to get to it */
2641 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2642         DummyMigrationMsg buf;
2643         buf.flag = MLOG_OBJECT;
2644         buf.lbID = lbID;
2645         buf.mgrID = locMgrID;
2646         buf.idx = idx;
2647         buf.locationPE = locationPE;
2648         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2649         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2650 };
2651
2652
2653 /**this method is used by a restarted processor to tell other processors
2654  * that they are not going to receive these many objects.. just the count
2655  * not the objects themselves ***/
2656
2657 void sendDummyMigrationCounts(int *dummyCounts){
2658         DummyMigrationMsg buf;
2659         buf.flag = MLOG_COUNT;
2660         buf.lbID = globalLBID;
2661         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2662         for(int i=0;i<CmiNumPes();i++){
2663                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2664                         buf.count = dummyCounts[i];
2665                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2666                 }
2667         }
2668 }
2669
2670
2671 /** this handler is used to process a dummy migration msg.
2672  * it looks up the load balancer and calls migrated for it */
2673
2674 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2675         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2676         if(msg->flag == MLOG_OBJECT){
2677                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2678                 LDObjHandle h;
2679                 lb->Migrated(h,1);
2680         }
2681         if(msg->flag == MLOG_COUNT){
2682                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2683                 msg->count -= verifyAckedRequests;
2684                 for(int i=0;i<msg->count;i++){
2685                         LDObjHandle h;
2686                         lb->Migrated(h,1);
2687                 }
2688         }
2689         verifyAckedRequests=0;
2690         CmiFree(msg);
2691 };
2692
2693
2694
2695
2696
2697
2698
2699 /*****************************************************
2700         Implementation of a method that can be used to call
2701         any method on the ChareMlogData of all the chares on
2702         a processor currently
2703 ******************************************************/
2704
2705
2706 class ElementCaller :  public CkLocIterator {
2707 private:
2708         CkLocMgr *locMgr;
2709         MlogFn fnPointer;
2710         void *data;
2711 public:
2712         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2713                 locMgr = _locMgr;
2714                 fnPointer = _fnPointer;
2715                 data = _data;
2716         };
2717         void addLocation(CkLocation &loc){
2718                 CkVec<CkMigratable *> list;
2719                 CkLocRec_local *local = loc.getLocalRecord();
2720                 locMgr->migratableList (local,list);
2721                 for(int i=0;i<list.size();i++){
2722                         CkMigratable *migratableElement = list[i];
2723                         fnPointer(data,migratableElement->mlogData);
2724                 }
2725         }
2726 };
2727
2728 void forAllCharesDo(MlogFn fnPointer,void *data){
2729         int numGroups = CkpvAccess(_groupIDTable)->size();
2730         for(int i=0;i<numGroups;i++){
2731                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2732                 fnPointer(data,obj->mlogData);
2733         }
2734         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2735         for(int i=0;i<numNodeGroups;i++){
2736                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
2737                 fnPointer(data,obj->mlogData);
2738         }
2739         int i;
2740         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
2741         
2742         
2743 };
2744
2745
2746 /******************************************************************
2747  Load Balancing
2748 ******************************************************************/
2749
2750 void initMlogLBStep(CkGroupID gid){
2751         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
2752         countLBMigratedAway = 0;
2753         countLBToMigrate=0;
2754         onGoingLoadBalancing=1;
2755         migrationDoneCalled=0;
2756         checkpointBarrierCount=0;
2757         if(globalLBID.idx != 0){
2758                 CmiAssert(globalLBID.idx == gid.idx);
2759         }
2760         globalLBID = gid;
2761 }
2762
2763 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
2764         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
2765         
2766         resumeLbFnPtr = _fnPtr;
2767         centralLb = _centralLb;
2768         migrationDoneCalled = 1;
2769         if(countLBToMigrate == countLBMigratedAway){
2770                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2771                 startMlogCheckpoint(NULL,CmiWallTimer());       
2772         }
2773 };
2774
2775 void finishedCheckpointLoadBalancing(){
2776         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
2777         CheckpointBarrierMsg msg;
2778         msg.fromPE = CmiMyPe();
2779         msg.checkpointCount = checkpointCount;
2780
2781         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
2782         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
2783         
2784 };
2785
2786
2787 void sendMlogLocation(int targetPE,envelope *env){
2788         void *_msg = EnvToUsr(env);
2789         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2790
2791
2792         int existing = 0;
2793         //if this object is already in the retainedobjectlust destined for this
2794         //processor it should not be sent
2795         
2796         for(int i=0;i<retainedObjectList.size();i++){
2797                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
2798                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
2799                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
2800                         existing = 1;
2801                         break;
2802                 }
2803         }
2804
2805         if(existing){
2806                 return;
2807         }
2808         
2809         
2810         countLBToMigrate++;
2811         
2812         MigrationNotice migMsg;
2813         migMsg.migRecord.gID = msg->gid;
2814         migMsg.migRecord.idx = msg->idx;
2815         migMsg.migRecord.fromPE = CkMyPe();
2816         migMsg.migRecord.toPE =  targetPE;
2817         
2818         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
2819         
2820         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
2821         retainedObject->migRecord = migMsg.migRecord;
2822         retainedObject->acked  = 0;
2823         
2824         CkPackMessage(&env);
2825         
2826         migMsg.record = retainedObject;
2827         retainedObject->msg = env;
2828         int size = retainedObject->size = env->getTotalsize();
2829         
2830         retainedObjectList.push_back(retainedObject);
2831         
2832         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2833         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2834         
2835         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
2836
2837 }
2838
2839 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
2840         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
2841         migratedNoticeList.push_back(msg->migRecord);
2842
2843         MigrationNoticeAck buf;
2844         buf.record = msg->record;
2845         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
2846         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
2847 }
2848
2849 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
2850         
2851         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
2852         retainedObject->acked = 1;
2853
2854         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
2855         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
2856
2857         //inform home about the new location of this object
2858         CkGroupID gID = retainedObject->migRecord.gID ;
2859         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2860         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
2861         
2862         countLBMigratedAway++;
2863         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
2864                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2865                 startMlogCheckpoint(NULL,CmiWallTimer());
2866         }
2867 };
2868
2869 void _receiveMlogLocationHandler(void *buf){
2870         envelope *env = (envelope *)buf;
2871         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
2872         CkUnpackMessage(&env);
2873         void *_msg = EnvToUsr(env);
2874         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2875         CkGroupID gID= msg->gid;
2876         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
2877         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2878         CpvAccess(_currentObj)=mgr;
2879         mgr->immigrate(msg);
2880 };
2881
2882
2883 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
2884 /*      if(mlogData->objID.type == TypeArray){
2885                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
2886         //      TODO: make sure later that atSync has been called and it needs 
2887         //      to be resumed from sync
2888         //
2889                 CpvAccess(_currentObj) = elt;
2890                 elt->ResumeFromSync();
2891         }*/
2892 }
2893
2894 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
2895         if(checkpointBarrierCount == CmiNumPes()){
2896                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
2897                 for(int i=0;i<CmiNumPes();i++){
2898                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
2899                 }
2900         }
2901 }
2902
2903 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
2904         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
2905         if(msg->checkpointCount == checkpointCount){
2906                 checkpointBarrierCount++;
2907                 checkAndSendCheckpointBarrierAcks(msg);
2908         }else{
2909                 if(msg->checkpointCount-1 == checkpointCount){
2910                         checkpointBarrierCount++;
2911                         checkAndSendCheckpointBarrierAcks(msg);
2912                 }else{
2913                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
2914                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
2915                 }
2916         }
2917         CmiFree(msg);
2918 }
2919
2920 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
2921         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
2922         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
2923         sendRemoveLogRequests();
2924         (*resumeLbFnPtr)(centralLb);
2925         CmiFree(msg);
2926 }
2927
2928 /**
2929         method that informs an array elements home processor of its current location
2930         It is a converse method to bypass the charm++ message logging framework
2931 */
2932
2933 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
2934         double _startTime = CmiWallTimer();
2935         CurrentLocationMsg msg;
2936         msg.mgrID = locMgrID;
2937         msg.idx = idx;
2938         msg.locationPE = currentPE;
2939         msg.fromPE = CkMyPe();
2940
2941         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
2942         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
2943         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
2944         traceUserBracketEvent(37,_startTime,CmiWallTimer());
2945 }
2946
2947
2948 void _receiveLocationHandler(CurrentLocationMsg *data){
2949         double _startTime = CmiWallTimer();
2950         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
2951         if(mgr == NULL){
2952                 CmiFree(data);
2953                 return;
2954         }
2955         CkLocRec *rec = mgr->elementNrec(data->idx);
2956         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
2957         if(rec != NULL){
2958                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
2959                         if(data->fromPE == data->locationPE){
2960                                 CmiAbort("Another processor has the same object");
2961                         }
2962                 }
2963         }
2964         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
2965                 int targetPE = data->fromPE;
2966                 data->fromPE = CmiMyPe();
2967                 data->locationPE = CmiMyPe();
2968                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
2969                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
2970         }else{
2971                 mgr->inform(data->idx,data->locationPE);
2972         }
2973         CmiFree(data);
2974         traceUserBracketEvent(38,_startTime,CmiWallTimer());
2975 }
2976
2977
2978
2979 void getGlobalStep(CkGroupID gID){
2980         LBStepMsg msg;
2981         int destPE = 0;
2982         msg.lbID = gID;
2983         msg.fromPE = CmiMyPe();
2984         msg.step = -1;
2985         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
2986         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
2987 };
2988
2989 void _getGlobalStepHandler(LBStepMsg *msg){
2990         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2991         msg->step = lb->step();
2992         CmiAssert(msg->fromPE != CmiMyPe());
2993         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
2994         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
2995         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
2996 };
2997
2998 void _recvGlobalStepHandler(LBStepMsg *msg){
2999         
3000         restartDecisionNumber=msg->step;
3001         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3002         _updateHomeAckHandler(dummyAck);
3003 };
3004
3005
3006
3007
3008
3009
3010
3011
3012 void _messageLoggingExit(){
3013 /*      if(CkMyPe() == 0){
3014                 if(countBuffered != 0){
3015                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3016                 }
3017
3018 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3019         }
3020         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3021         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3022 }
3023 /**********************************
3024         * The methods of the message logging
3025         * data structure stored in each chare
3026         ********************************/
3027
3028 MCount ChareMlogData::nextSN(const CkObjID &recver){
3029 /*      MCount SN = snTable.get(recver);
3030         snTable.put(recver) = SN+1;
3031         return SN+1;*/
3032         double _startTime = CmiWallTimer();
3033         MCount *SN = snTable.getPointer(recver);
3034         if(SN==NULL){
3035                 snTable.put(recver) = 1;
3036                 return 1;
3037         }else{
3038                 (*SN)++;
3039                 return *SN;
3040         }
3041 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3042 };
3043
3044
3045 MCount ChareMlogData::newTN(){
3046         MCount TN;
3047         if(currentHoles > 0){
3048                 int holeidx = numberHoles-currentHoles;
3049                 TN = ticketHoles[holeidx];
3050                 currentHoles--;
3051                 if(currentHoles == 0){
3052                         delete []ticketHoles;
3053                         numberHoles = 0;
3054                 }
3055         }else{
3056                 TN = ++tCount;
3057         }       
3058         return TN;
3059 };
3060
3061 Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
3062         char senderName[100];
3063         char recverName[100];
3064         double _startTime =CmiWallTimer();
3065         Ticket ticket;
3066         if(restartFlag){
3067                 ticket.TN = 0;
3068                 return ticket;
3069         }
3070 /*      SNToTicket &ticketRow = ticketTable.put(sender);
3071         Ticket earlierTicket = ticketRow.get(SN);
3072         if(earlierTicket.TN == 0){
3073                 //This SN has not been ever alloted a ticket
3074                 ticket.TN = newTN();
3075                 ticketRow.put(SN)=ticket;
3076         }else{
3077                 ticket.TN = earlierTicket.TN;
3078         }*/
3079         
3080
3081         SNToTicket *ticketRow = ticketTable.get(sender);
3082         if(ticketRow != NULL){
3083                 Ticket earlierTicket = ticketRow->get(SN);
3084                 if(earlierTicket.TN == 0){
3085                         ticket.TN = newTN();
3086                         ticketRow->put(SN) = ticket;
3087                         DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
3088                 }else{
3089                         ticket.TN = earlierTicket.TN;
3090                         if(ticket.TN > tCount){