5883200b7d134e97c3d899af2ab69b8bec8da6b8
[charm.git] / src / ck-core / ckmessagelogging.C
1
2 //ERASE this line
3 #ifdef _FAULT_MLOG_
4
5 #include "ck.h"
6 #include "ckmessagelogging.h"
7 #include "queueing.h"
8 #include <sys/types.h>
9 #include <signal.h>
10 #include "CentralLB.h"
11
12 //#define DEBUG(x)  if(_restartFlag) {x;}
13 #define DEBUG(x)  //x
14 #define DEBUGRESTART(x)  //x
15 #define DEBUGLB(x) //x
16
17 #define BUFFERED_LOCAL
18 #define BUFFERED_REMOTE
19
20 extern const char *idx2str(const CkArrayIndex &ind);
21 extern const char *idx2str(const ArrayElement *el);
22 const char *idx2str(const CkArrayIndexMax &ind){
23         return idx2str((const CkArrayIndex &)ind);
24 };
25
26 void getGlobalStep(CkGroupID gID);
27
28 bool fault_aware(CkObjID &recver);
29
30 int _restartFlag=0;
31 int restarted=0;
32
33 //TODO: remove for perf runs
34 int countHashRefs=0; //count the number of gets
35 int countHashCollisions=0;
36
37
38 //#define CHECKPOINT_DISK
39 char *checkpointDirectory=".";
40 int unAckedCheckpoint=0;
41
42 int countLocal=0,countBuffered=0;
43 int countPiggy=0;
44 int countClearBufferedLocalCalls=0;
45
46 int countUpdateHomeAcks=0;
47
48 extern int chkptPeriod;
49 extern bool parallelRestart;
50
51 char *killFile;
52 int killFlag=0;
53 int restartingMlogFlag=0;
54 void readKillFile();
55 double killTime=0.0;
56 int checkpointCount=0;
57
58
59 CpvDeclare(Chare *,_currentObj);
60 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
61 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
62 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
63 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
64 CpvDeclare(Queue, _outOfOrderMessageQueue);
65 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
66 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
67 CpvDeclare(char **,_bufferedTicketRequests);
68 CpvDeclare(int *,_numBufferedTicketRequests);
69 CpvDeclare(char *,_bufferTicketReply);
70
71
72
73 static double adjustChkptPeriod=0.0; //in ms
74 static double nextCheckpointTime=0.0;//in seconds
75
76 double lastBufferedLocalMessageCopyTime;
77
78 int _maxBufferedMessages;
79 int _maxBufferedTicketRequests;
80 int BUFFER_TIME=2; // in ms
81
82
83 int _ticketRequestHandlerIdx;
84 int _ticketHandlerIdx;
85 int _localMessageCopyHandlerIdx;
86 int _localMessageAckHandlerIdx;
87 int _pingHandlerIdx;
88 int _bufferedLocalMessageCopyHandlerIdx;
89 int _bufferedLocalMessageAckHandlerIdx;
90 int _bufferedTicketRequestHandlerIdx;
91 int _bufferedTicketHandlerIdx;
92
93
94 char objString[100];
95 int _checkpointRequestHandlerIdx;
96 int _storeCheckpointHandlerIdx;
97 int _checkpointAckHandlerIdx;
98 int _getCheckpointHandlerIdx;
99 int _recvCheckpointHandlerIdx;
100 int _removeProcessedLogHandlerIdx;
101
102 int _verifyAckRequestHandlerIdx;
103 int _verifyAckHandlerIdx;
104 int _dummyMigrationHandlerIdx;
105
106
107 int     _getGlobalStepHandlerIdx;
108 int     _recvGlobalStepHandlerIdx;
109
110 int _updateHomeRequestHandlerIdx;
111 int _updateHomeAckHandlerIdx;
112 int _resendMessagesHandlerIdx;
113 int _resendReplyHandlerIdx;
114 int _receivedTNDataHandlerIdx;
115 int _distributedLocationHandlerIdx;
116
117
118 int verifyAckTotal;
119 int verifyAckCount;
120
121 int verifyAckedRequests=0;
122
123 RestartRequest *storedRequest;
124
125
126
127 int _falseRestart =0; /**
128                                                                                                         For testing on clusters we might carry out restarts on 
129                                                                                                         a porcessor without actually starting it
130                                                                                                         1 -> false restart
131                                                                                                         0 -> restart after an actual crash
132                                                                                                 */                                                                                                                              
133
134 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
135 int _lockNewTicket=0;
136
137
138 //Load balancing globals
139 int onGoingLoadBalancing=0;
140 void *centralLb;
141 void (*resumeLbFnPtr)(void *);
142 int _receiveMlogLocationHandlerIdx;
143 int _receiveMigrationNoticeHandlerIdx;
144 int _receiveMigrationNoticeAckHandlerIdx;
145 int _checkpointBarrierHandlerIdx;
146 int _checkpointBarrierAckHandlerIdx;
147
148 CkVec<MigrationRecord> migratedNoticeList;
149 CkVec<RetainedMigratedObject *> retainedObjectList;
150 int donotCountMigration=0;
151 int countLBMigratedAway=0;
152 int countLBToMigrate=0;
153 int migrationDoneCalled=0;
154 int checkpointBarrierCount=0;
155 int globalResumeCount=0;
156 CkGroupID globalLBID;
157 int restartDecisionNumber=-1;
158
159
160 double lastCompletedAlarm=0;
161 double lastRestart=0;
162
163
164 //update location globals
165 int _receiveLocationHandlerIdx;
166
167
168
169 // initialize message logging datastructures and register handlers
170 void _messageLoggingInit(){
171         //current object
172         CpvInitialize(Chare *,_currentObj);
173         
174         //registering handlers for message logging
175         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
176         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
177         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
178         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
179         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
180         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
181         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
182   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
183         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
184
185                 
186         //handlers for checkpointing
187         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
188         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
189         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
190         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
191
192
193         //handlers for restart
194         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
195         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
196         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
197         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
198         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
199         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
200         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
201         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
202         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
203         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
204         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
205
206         
207         //handlers for load balancing
208         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
209         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
210         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
211         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
212         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
213         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
214         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
215         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
216         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
217
218         
219         //handlers for updating locations
220         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
221         
222         //Cpv variables for message logging
223         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
224         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
225         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
226         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
227         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
228         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
229         CpvInitialize(Queue, _outOfOrderMessageQueue);
230         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
231         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
232         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
233         
234         CpvInitialize(char **,_bufferedTicketRequests);
235         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
236         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
237         for(int i=0;i<CkNumPes();i++){
238                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
239                 CpvAccess(_numBufferedTicketRequests)[i]=0;
240         }
241   CpvInitialize(char *,_bufferTicketReply);
242         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
243         
244 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
245         CcdCallFnAfter(retryTicketRequest,NULL,100);    
246         
247         
248         //Cpv variables for checkpoint
249         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
250         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
251         
252 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
253 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
254 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
255         if(CkMyPe() == 0){
256 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
257 #ifdef  BUFFERED_LOCAL
258                 if(CmiMyPe() == 0){
259                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
260                 }
261 #endif
262         }
263 #ifdef  BUFFERED_REMOTE
264         if(CmiMyPe() == 0){
265                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
266         }
267 #endif
268
269         traceRegisterUserEvent("Remove Logs", 20);
270         traceRegisterUserEvent("Ticket Request Handler", 21);
271         traceRegisterUserEvent("Ticket Handler", 22);
272         traceRegisterUserEvent("Local Message Copy Handler", 23);
273         traceRegisterUserEvent("Local Message Ack Handler", 24);        
274         traceRegisterUserEvent("Preprocess current message",25);
275         traceRegisterUserEvent("Preprocess past message",26);
276         traceRegisterUserEvent("Preprocess future message",27);
277         traceRegisterUserEvent("Checkpoint",28);
278         traceRegisterUserEvent("Checkpoint Store",29);
279         traceRegisterUserEvent("Checkpoint Ack",30);
280         
281         traceRegisterUserEvent("Send Ticket Request",31);
282         traceRegisterUserEvent("Generalticketrequest1",32);
283         traceRegisterUserEvent("TicketLogLocal",33);
284         traceRegisterUserEvent("next_ticket and SN",34);
285         traceRegisterUserEvent("Timeout for buffered remote messages",35);
286         traceRegisterUserEvent("Timeout for buffered local messages",36);
287         traceRegisterUserEvent("Inform Location Home",37);
288         traceRegisterUserEvent("Receive Location Handler",38);
289         
290         lastCompletedAlarm=CmiWallTimer();
291         lastRestart = CmiWallTimer();
292 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
293         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
294 //      printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
295         if(killFlag){
296                 readKillFile();
297         }
298 }
299
300 void killLocal(void *_dummy,double curWallTime);        
301
302 void readKillFile(){
303         FILE *fp=fopen(killFile,"r");
304         if(!fp){
305                 return;
306         }
307         int proc;
308         double sec;
309         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
310                 if(proc == CkMyPe()){
311                         killTime = CmiWallTimer()+sec;
312                         printf("[%d] To be killed after %.6lf s \n",CkMyPe(),sec);
313                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
314                 }
315         }
316         fclose(fp);
317 }
318
319 void killLocal(void *_dummy,double curWallTime){
320         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
321         if(CmiWallTimer()<killTime-1){
322                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
323         }else{  
324                 kill(getpid(),SIGKILL);
325         }
326 }
327
328
329
330 /************************ Message logging methods ****************/
331
332 // send a ticket request to a group on a processor
333 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
334         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
335                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
336                 void *origMsg = EnvToUsr(env);
337                 for(int i=0;i<CmiNumPes();i++){
338                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
339                                 void *copyMsg = CkCopyMsg(&origMsg);
340                                 envelope *copyEnv = UsrToEnv(copyMsg);
341                                 copyEnv->SN=0;
342                                 copyEnv->TN=0;
343                                 copyEnv->sender.type = TypeInvalid;
344                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
345                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
346                         }
347                 }
348                 return;
349         }
350         CkObjID recver;
351         recver.type = TypeGroup;
352         recver.data.group.id = env->getGroupNum();
353         recver.data.group.onPE = destPE;
354 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
355                 CmiPrintStackTrace(0);
356         }*/
357         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
358 }
359
360 //send a ticket request to a nodegroup
361 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
362         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
363                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
364                 void *origMsg = EnvToUsr(env);
365                 for(int i=0;i<CmiNumNodes();i++){
366                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
367                                 void *copyMsg = CkCopyMsg(&origMsg);
368                                 envelope *copyEnv = UsrToEnv(copyMsg);
369                                 copyEnv->SN=0;
370                                 copyEnv->TN=0;
371                                 copyEnv->sender.type = TypeInvalid;
372                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
373                         }
374                 }
375                 return;
376         }
377         CkObjID recver;
378         recver.type = TypeNodeGroup;
379         recver.data.group.id = env->getGroupNum();
380         recver.data.group.onPE = destNode;
381         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
382 }
383
384 //send a ticket request to an array element
385 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
386         CkObjID recver;
387         recver.type = TypeArray;
388         recver.data.array.id = env->getsetArrayMgr();
389         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
390
391         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
392                 char recverString[100],senderString[100];
393                 
394                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
395         }
396
397         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
398 };
399
400 //a method to generate the actual ticket requests for groups,nodegroups or arrays
401 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
402         envelope *env = _env;
403         int resend=0; //is it a resend
404         char recverName[100];
405         double _startTime=CkWallTimer();
406         
407         if(CpvAccess(_currentObj) == NULL){
408 //              CkAssert(0);
409                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
410                 generalCldEnqueue(destPE,env,_infoIdx);
411                 return;
412         }
413         
414         if(env->sender.type == TypeInvalid){
415                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
416                 //Set message logging data in the envelope
417         }else{
418                 envelope *copyEnv = copyEnvelope(env);
419                 env = copyEnv;
420                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
421                 env->SN = 0;
422         }
423         
424         
425         CkObjID &sender = env->sender;
426         env->recver = recver;
427
428         Chare *obj = (Chare *)env->sender.getObject();
429           
430         if(env->SN == 0){
431                 env->SN = obj->mlogData->nextSN(recver);
432         }else{
433                 resend = 1;
434         }
435         
436         
437         char senderString[100];
438 //      if(env->SN != 1){
439                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
440         //      CmiPrintStackTrace(0);
441 /*      }else{
442                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
443         }*/
444                 
445         MlogEntry * mEntry = new MlogEntry(env,destPE,_infoIdx);
446 //      CkPackMessage(&(mEntry->env));
447 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
448         
449         
450
451         _startTime = CkWallTimer();
452         if(destPE == CkMyPe()){
453                 ticketLogLocalMessage(mEntry);
454         }else{
455                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
456 //              traceUserBracketEvent(31,_startTime,CkWallTimer());                     
457         }
458 }
459
460 //method that does the actual send by creating a ticketrequest filling it up and sending it
461 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
462         char recverString[100],senderString[100];
463         envelope *env = entry->env;
464         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
465 /*      envelope *env = entry->env;
466         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
467
468         Chare *obj = (Chare *)entry->env->sender.getObject();
469         if(!resend){
470                 obj->mlogData->addLogEntry(entry);
471         }
472         
473
474 #ifdef BUFFERED_REMOTE
475         //buffer the ticket request 
476         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
477                 //first message to this processor, buffer needs to be created
478                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
479                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
480                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
481         }
482         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
483         //Buffer the ticketrequests
484         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
485         ticketRequest->sender = sender;
486         ticketRequest->recver = recver;
487         ticketRequest->logEntry = entry;
488         ticketRequest->SN = SN;
489         ticketRequest->senderPE = CkMyPe();
490
491         CpvAccess(_numBufferedTicketRequests)[destPE]++;
492         
493         
494         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
495                 sendBufferedTicketRequests(destPE);
496         }else{
497                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
498                         int *checkPE = new int;
499                         *checkPE = destPE;
500                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
501                 }
502         }
503 #else
504
505         TicketRequest ticketRequest;
506         ticketRequest.sender = sender;
507         ticketRequest.recver = recver;
508         ticketRequest.logEntry = entry;
509         ticketRequest.SN = SN;
510         ticketRequest.senderPE = CkMyPe();
511         
512         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
513 //      CmiBecomeImmediate(&ticketRequest);
514         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
515 #endif
516         DEBUG(CmiMemoryCheck());
517 };
518
519 /**
520  * Send the ticket requests buffered for processor PE
521  **/
522 void sendBufferedTicketRequests(int destPE){
523         DEBUG(CmiMemoryCheck());
524         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
525         if(numberRequests == 0){
526                 return;
527         }
528         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
529         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
530         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
531         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
532         header->numberLogs = numberRequests;
533         
534         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
535         CmiSyncSend(destPE,totalSize,(char *)buf);
536         
537         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
538         DEBUG(CmiMemoryCheck());
539 };
540
541 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
542         int destPE = *(int *)_destPE;
543   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
544                 sendBufferedTicketRequests(destPE);
545 //              traceUserEvent(35);
546         }
547         delete (int *)_destPE;
548         DEBUG(CmiMemoryCheck());
549 };
550
551
552
553
554 //get a ticket for a local message and then send a copy to the buddy
555 void ticketLogLocalMessage(MlogEntry *entry){
556         //this method is always in the main thread(not interrupt).. so it should 
557         //never find itself locked out of a newTicket
558 //      CkAssert(_lockNewTicket==0);
559         double _startTime=CkWallTimer();
560         
561 //      _lockNewTicket=1;
562         
563                 DEBUG(CmiMemoryCheck());
564         
565
566         Chare *recverObj = (Chare *)entry->env->recver.getObject();
567         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
568         if(recverObj){
569                 //Consider the case, after a restart when this message has already been allotted a ticket number
570                 // and should get the same one as the old one.
571                 Ticket ticket;
572                 if(recverObj->mlogData->mapTable.numObjects() > 0){
573                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
574                 }else{
575                         ticket.TN = 0;
576                 }
577                 
578                 char senderString[100], recverString[100] ;
579                 
580                 if(ticket.TN == 0){
581                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
582         
583                         if(ticket.TN == 0){
584                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
585                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
586                                 
587         //              _lockNewTicket = 0;
588 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
589                                 return;
590                         }
591                 }       
592                 //TODO: check for the case when an invalid ticket is returned
593                 //TODO: check for OLD or RECEIVED TICKETS
594                 entry->env->TN = ticket.TN;
595                 CkAssert(entry->env->TN > 0);
596                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
597                 
598
599                 sendLocalMessageCopy(entry);
600                 
601                 
602                 DEBUG(CmiMemoryCheck());
603                 entry->unackedLocal = 1;
604                 DEBUG(CmiMemoryCheck());
605                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
606                 DEBUG(CmiMemoryCheck());
607         }else{
608                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
609         }
610         _lockNewTicket=0;
611 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
612 };
613
614
615 void sendLocalMessageCopy(MlogEntry *entry){
616         LocalMessageLog msgLog;
617         msgLog.sender = entry->env->sender;
618         msgLog.recver = entry->env->recver;
619         msgLog.SN = entry->env->SN;
620         msgLog.TN = entry->env->TN;
621         msgLog.entry = entry;
622         msgLog.PE = CkMyPe();
623         
624         char recvString[100];
625         char senderString[100];
626         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
627
628 #ifdef BUFFERED_LOCAL
629         countLocal++;
630         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
631         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
632                 sendBufferedLocalMessageCopy();
633         }else{
634                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
635                         lastBufferedLocalMessageCopyTime = CkWallTimer();
636                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
637                         countClearBufferedLocalCalls++;
638                 }       
639         }
640 #else   
641         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
642         
643         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
644 #endif
645         DEBUG(CmiMemoryCheck());
646 };
647
648
649 void sendBufferedLocalMessageCopy(){
650         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
651         if(numberLogs == 0){
652                 return;
653         }
654         countBuffered++;
655         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
656         void *buf=CmiAlloc(totalSize);
657         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
658         header->numberLogs=numberLogs;
659
660         DEBUG(CmiMemoryCheck());
661         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
662         
663         char *ptr = (char *)buf;
664         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
665         
666         for(int i=0;i<numberLogs;i++){
667                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
668                 memcpy(ptr,&log,sizeof(LocalMessageLog));
669                 ptr = &ptr[sizeof(LocalMessageLog)];
670         }
671
672         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
673
674         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
675         DEBUG(CmiMemoryCheck());
676 };
677
678 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
679         countClearBufferedLocalCalls--;
680         if(countClearBufferedLocalCalls > 10){
681                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
682         }
683         DEBUG(CmiMemoryCheck());
684         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
685         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
686                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
687                         sendBufferedLocalMessageCopy();
688 //                      traceUserEvent(36);
689                 }
690         }
691         DEBUG(CmiMemoryCheck());
692 }
693
694 /****
695         The handler functions
696 *****/
697
698
699 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
700 /**
701  *  If there are any delayed requests, process them first before 
702  *  processing this request
703  * */
704 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
705         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
706         double  _startTime = CkWallTimer();
707         if(CpvAccess(_delayedTicketRequests)->length() > 0){
708                 retryTicketRequest(NULL,_startTime);
709         }
710         _processTicketRequest(ticketRequest);
711         CmiFree(ticketRequest);
712 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
713 }
714 /** Handler used for dealing with a bunch of ticket requests
715  * from one processor. The replies are also bunched together
716  * Does not use _ticketRequestHandler
717  * */
718 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
719         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
720         DEBUG(CmiMemoryCheck());
721         double _startTime = CkWallTimer();
722         if(CpvAccess(_delayedTicketRequests)->length() > 0){
723                 retryTicketRequest(NULL,_startTime);
724         }
725         DEBUG(CmiMemoryCheck());
726   int numRequests = recvdHeader->numberLogs;
727         char *msg = (char *)recvdHeader;
728         msg = &msg[sizeof(BufferedTicketRequestHeader)];
729         int senderPE=((TicketRequest *)msg)->senderPE;
730
731         
732         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
733         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
734         
735         char *ptr = (char *)buf;
736         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
737         header->numberLogs = 0;
738
739         DEBUG(CmiMemoryCheck());
740         
741         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
742         
743         for(int i=0;i<numRequests;i++){
744                 TicketRequest *request = (TicketRequest *)msg;
745                 msg = &msg[sizeof(TicketRequest)];
746                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
747
748                 if(replied){
749                         //the ticket request has been processed and 
750                         //the reply will be stored in the ptr
751                         header->numberLogs++;
752                         ptr = &ptr[sizeof(TicketReply)];
753                 }
754         }
755 /*      if(header->numberLogs == 0){
756                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
757         }*/
758
759         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
760         CmiSyncSend(senderPE,totalSize,(char *)buf);
761         CmiFree(recvdHeader);
762 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
763         DEBUG(CmiMemoryCheck());
764 };
765
766 /**Process the ticket request. 
767  * If it is processed and a reply is being sent 
768  * by this processor return true
769  * else return false.
770  * If a reply buffer is specified put the reply into that
771  * else send the reply
772  * */
773 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
774
775 /*      if(_lockNewTicket){
776                 printf("ddeded %d\n",CkMyPe());
777                 if(CmiIsImmediate(ticketRequest)){
778                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
779                 }
780                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
781                 
782         }else{
783                 _lockNewTicket = 1;
784         }*/
785
786         DEBUG(CmiMemoryCheck());
787         CkObjID sender = ticketRequest->sender;
788         CkObjID recver = ticketRequest->recver;
789         MCount SN = ticketRequest->SN;
790         
791         
792         Chare *recverObj = (Chare *)recver.getObject();
793         
794         
795         char recverName[100];
796         DEBUG(recver.toString(recverName);)
797         if(recverObj == NULL){
798                 int estPE = recver.guessPE();
799                 if(estPE == CkMyPe() || estPE == -1){           
800                         //try to fulfill the request after some time
801                         char senderString[100];
802                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
803                         if(estPE == CkMyPe() && recver.type == TypeArray){
804                                 CkArrayID aid(recver.data.array.id);            
805                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
806                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
807
808                         }
809                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
810                         *delayed = *ticketRequest;
811                         CpvAccess(_delayedTicketRequests)->enq(delayed);
812                         
813                 }else{
814                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
815                         TicketRequest forward = *ticketRequest;
816                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
817                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
818                         
819                         
820                 }
821         DEBUG(CmiMemoryCheck());
822                 return false; // if the receverObj does not exist the ticket request cannot have been 
823                               // processed successfully
824         }else{
825                 char senderString[100];
826                 Ticket ticket;
827                 //check if a ticket for this has been already handed out to an object that used to be local but 
828                 // is no longer so.. need for parallel restart
829                 
830                 if(recverObj->mlogData->mapTable.numObjects() > 0){
831                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
832                 }
833                 
834                 if(ticket.TN == 0){
835                         ticket = recverObj->mlogData->next_ticket(sender,SN);
836                 }
837                 if(ticket.TN > recverObj->mlogData->tProcessed){
838                         ticket.state = NEW_TICKET;
839                 }else{
840                         ticket.state = OLD_TICKET;
841                 }
842                 //TODO: check for the case when an invalid ticket is returned
843                 if(ticket.TN == 0){
844                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
845                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
846                         *delayed = *ticketRequest;
847                         CpvAccess(_delayedTicketRequests)->enq(delayed);
848                         return false;
849                 }
850 /*              if(ticket.TN < SN){ //error state this really should not happen
851                         recver.toString(recverName);
852                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
853                 }*/
854 //              CkAssert(ticket.TN >= SN);
855                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
856
857 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
858     if(reply == NULL){ 
859                         //There is no reply buffer and the ticketreply is going to be 
860                         //sent immediately
861                         TicketReply ticketReply;
862                         ticketReply.request = *ticketRequest;
863                         ticketReply.ticket = ticket;
864                         ticketReply.recverPE = CkMyPe();
865                 
866                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
867 //              CmiBecomeImmediate(&ticketReply);
868                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
869          }else{ // Store ticket reply in the buffer provided
870                  reply->request = *ticketRequest;
871                  reply->ticket = ticket;
872                  reply->recverPE = CkMyPe();
873                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
874                                                          // in case the ticket needs to be forwarded or something
875
876          }
877                 DEBUG(CmiMemoryCheck());
878                 return true;
879         }
880 //      _lockNewTicket=0;
881 };
882
883 inline void _ticketHandler(TicketReply *ticketReply){
884
885         double _startTime = CkWallTimer();
886         DEBUG(CmiMemoryCheck());
887         
888         
889         char senderString[100];
890         CkObjID sender = ticketReply->request.sender;
891         CkObjID recver = ticketReply->request.recver;
892         
893         if(sender.guessPE() != CkMyPe()){               
894                 DEBUG(CkAssert(sender.guessPE()>= 0));
895                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
896         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
897                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
898                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
899 #ifdef BUFFERED_REMOTE
900                 //will be freed by the buffered ticket handler most of the time
901                 //this might lead to a leak just after migration
902                 //when the ticketHandler is directly used without going through the buffered handler
903                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
904 #else
905                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
906 #endif  
907         }else{
908                 char recverName[100];
909                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
910                 MlogEntry *logEntry=NULL;
911                 if(ticketReply->ticket.state & FORWARDED_TICKET){
912                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
913                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
914                         Chare *senderObj = (Chare *)sender.getObject();
915                         if(senderObj){
916                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
917                                 for(int i=0;i<mlog->length();i++){
918                                         MlogEntry *tempEntry = (*mlog)[i];
919                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
920                                                 logEntry = tempEntry;
921                                                 break;
922                                         }
923                                 }
924                                 if(logEntry == NULL){
925 #ifdef BUFFERED_REMOTE
926 #else
927                                         CmiFree(ticketReply);
928 #endif                                  
929                                         return;
930                                 }
931                         }else{
932                                 CmiAbort("This processor thinks it should have the sender\n");
933                         }
934                         ticketReply->ticket.state ^= FORWARDED_TICKET;
935                 }else{
936                         logEntry = ticketReply->request.logEntry;
937                 }
938                 if(logEntry->env->TN <= 0){
939                         //This logEntry has not received a TN earlier
940                         char recverString[100];
941                         logEntry->env->TN = ticketReply->ticket.TN;
942                         logEntry->env->setSrcPe(CkMyPe());
943                         if(ticketReply->ticket.state == NEW_TICKET){
944                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
945                                 if(ticketReply->recverPE != CkMyPe()){
946                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
947                                 }else{
948                                         //It is now a local message use the local message protocol
949                                         sendLocalMessageCopy(logEntry);
950                                 }               
951                         }
952                 }else{
953                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
954                 }
955                 recver.updatePosition(ticketReply->recverPE);
956 #ifdef BUFFERED_REMOTE
957 #else
958                 CmiFree(ticketReply);
959 #endif
960         }
961         CmiMemoryCheck();
962 #ifdef BUFFERED_REMOTE
963 #else
964 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
965 #endif
966         
967 };
968
969 /**
970  * Message to handle the bunch of tickets 
971  * that we get from one processor. We send 
972  * the tickets to be handled one at a time
973  * */
974
975 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
976         double _startTime=CmiWallTimer();
977         int numTickets = recvdHeader->numberLogs;
978         char *msg = (char *)recvdHeader;
979         msg = &msg[sizeof(BufferedTicketRequestHeader)];
980         DEBUG(CmiMemoryCheck());
981         
982         TicketReply *_reply = (TicketReply *)msg;
983
984         
985         for(int i=0;i<numTickets;i++){
986                 TicketReply *reply = (TicketReply *)msg;
987                 _ticketHandler(reply);
988                 
989                 msg = &msg[sizeof(TicketReply)];
990         }
991         
992         CmiFree(recvdHeader);
993 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
994         DEBUG(CmiMemoryCheck());
995 };
996
997
998 void _localMessageCopyHandler(LocalMessageLog *msgLog){
999         double _startTime = CkWallTimer();
1000         
1001         char senderString[100],recverString[100];
1002         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1003 /*      if(!fault_aware(msgLog->recver)){
1004                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1005         }*/
1006         CpvAccess(_localMessageLog)->enq(*msgLog);
1007         
1008         LocalMessageLogAck ack;
1009         ack.entry = msgLog->entry;
1010         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1011         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1012         CmiSyncSend(msgLog->PE,sizeof(LocalMessageLogAck),(char *)&ack);
1013         
1014 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1015 };
1016
1017 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1018         double _startTime = CkWallTimer();
1019         DEBUG(CmiMemoryCheck());
1020         
1021         int numLogs = recvdHeader->numberLogs;
1022         char *msg = (char *)recvdHeader;
1023
1024         //piggy back the logs already stored on this processor
1025         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1026         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1027 /*      if(numPiggyLogs > 0){
1028                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1029                         CmiAssert(0);
1030                 }
1031         }*/
1032         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1033         
1034         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1035         void *buf = CmiAlloc(totalSize);
1036         char *ptr = (char *)buf;
1037         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1038         
1039         msg = &msg[sizeof(BufferedLocalLogHeader)];
1040         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1041
1042         DEBUG(CmiMemoryCheck());
1043         int PE;
1044         for(int i=0;i<numLogs;i++){
1045                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1046                 CpvAccess(_localMessageLog)->enq(*msgLog);
1047                 PE = msgLog->PE;
1048                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1049
1050                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1051                 ack->entry = msgLog->entry;
1052                 
1053                 msg = &msg[sizeof(LocalMessageLog)];
1054                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1055         }
1056         DEBUG(CmiMemoryCheck());
1057
1058         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1059         piggyHeader->numberLogs = numPiggyLogs;
1060         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1061         if(numPiggyLogs > 0){
1062                 countPiggy++;
1063         }
1064
1065         for(int i=0;i<numPiggyLogs;i++){
1066                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1067                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1068                 ptr = &ptr[sizeof(LocalMessageLog)];
1069         }
1070         DEBUG(CmiMemoryCheck());
1071         
1072         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1073         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1074                 
1075 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1076                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1077                         if(!fault_aware(localLogEntry.recver)){
1078                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1079                         }
1080         }*/
1081         if(freeHeader){
1082                 CmiFree(recvdHeader);
1083         }
1084         DEBUG(CmiMemoryCheck());
1085 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1086 }
1087
1088
1089 void _localMessageAckHandler(LocalMessageLogAck *ack){
1090         
1091         double _startTime = CkWallTimer();
1092         
1093         MlogEntry *entry = ack->entry;
1094         if(entry == NULL){
1095                 CkExit();
1096         }
1097         entry->unackedLocal = 0;
1098         envelope *env = entry->env;
1099         char recverName[100];
1100         char senderString[100];
1101         DEBUG(CmiMemoryCheck());
1102         
1103         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1104         if(env == NULL)
1105                 return;
1106         CkAssert(env->SN > 0);
1107         CkAssert(env->TN > 0);
1108         env->sender.toString(senderString);
1109         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1110         env->recver.toString(recverName);
1111
1112         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1113         
1114 /*      
1115         void *origMsg = EnvToUsr(env);
1116         void *copyMsg = CkCopyMsg(&origMsg);
1117         envelope *copyEnv = UsrToEnv(copyMsg);
1118         entry->env = UsrToEnv(origMsg);*/
1119
1120 //      envelope *copyEnv = copyEnvelope(env);
1121
1122         envelope *copyEnv = env;
1123         copyEnv->localMlogEntry = entry;
1124
1125         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1126         if(CmiMyPe() != entry->destPE){
1127                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1128         }
1129 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1130         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1131         
1132         
1133 #ifdef BUFFERED_LOCAL
1134 #else
1135         CmiFree(ack);
1136 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1137 #endif
1138         
1139         DEBUG(CmiMemoryCheck());
1140         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1141 }
1142
1143
1144 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1145
1146         double _startTime=CkWallTimer();
1147         DEBUG(CmiMemoryCheck());
1148
1149         int numLogs = recvdHeader->numberLogs;
1150         char *msg = (char *)recvdHeader;
1151         msg = &msg[sizeof(BufferedLocalLogHeader)];
1152
1153         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1154         
1155         for(int i=0;i<numLogs;i++){
1156                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1157                 _localMessageAckHandler(ack);
1158                 
1159                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1160         }
1161
1162         //deal with piggy backed local logs
1163         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1164         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1165         if(piggyHeader->numberLogs > 0){
1166                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1167         }
1168         
1169         CmiFree(recvdHeader);
1170         DEBUG(CmiMemoryCheck());
1171 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1172 }
1173
1174
1175
1176
1177
1178
1179 bool fault_aware(CkObjID &recver){
1180         switch(recver.type){
1181                 case TypeChare:
1182                         return false;
1183                 case TypeGroup:
1184                 case TypeNodeGroup:
1185                 case TypeArray:
1186                         return true;
1187                 default:
1188                         return false;
1189         }
1190 };
1191
1192 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1193         char recverString[100];
1194         char senderString[100];
1195         
1196         DEBUG(CmiMemoryCheck());
1197         CkObjID recver = env->recver;
1198         if(!fault_aware(recver))
1199                 return 1;
1200
1201
1202         Chare *obj = (Chare *)recver.getObject();
1203         *objPointer = obj;
1204         if(obj == NULL){
1205                 int possiblePE = recver.guessPE();
1206                 if(possiblePE != CkMyPe()){
1207                         int totalSize = env->getTotalsize();                    
1208                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1209                 }
1210                 return 0;
1211         }
1212
1213
1214         double _startTime = CkWallTimer();
1215 //env->sender.updatePosition(env->getSrcPe());
1216         if(env->TN == obj->mlogData->tProcessed+1){
1217                 //the message that needs to be processed now
1218                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1219                 // once we find a message that we can process we put back all the messages in the out of order queue
1220                 // back into the main scheduler queue. 
1221                 if(env->sender.guessPE() == CkMyPe()){
1222                         *logEntryPointer = env->localMlogEntry;
1223                 }
1224         DEBUG(CmiMemoryCheck());
1225                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1226                         void *qMsgPtr;
1227                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1228                         envelope *qEnv = (envelope *)qMsgPtr;
1229                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1230         DEBUG(CmiMemoryCheck());
1231                 }
1232 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1233                 //TODO: this might be a problem.. change made for leanMD
1234 //              CpvAccess(_currentObj) = obj;
1235         DEBUG(CmiMemoryCheck());
1236                 return 1;
1237         }
1238         if(env->TN <= obj->mlogData->tProcessed){
1239                 //message already processed
1240                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1241 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1242         DEBUG(CmiMemoryCheck());
1243                 return 0;
1244         }
1245         //message that needs to be processed in the future
1246
1247 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1248         //the message cant be processed now put it back in the out of order message Q, 
1249         //It will be transferred to the main queue later
1250         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1251 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1252         DEBUG(CmiMemoryCheck());
1253         
1254         return 0;
1255 }
1256
1257 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1258         char senderString[100];
1259         if(obj){
1260                 if(sender.guessPE() == CkMyPe()){
1261                         if(entry != NULL){
1262                                 entry->env = NULL;
1263                         }
1264                 }
1265                 obj->mlogData->tProcessed++;
1266 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1267                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1268 //              CpvAccess(_currentObj)= NULL;
1269         }
1270         DEBUG(CmiMemoryCheck());
1271 }
1272
1273 /***
1274         Helpers for the handlers and message logging methods
1275 ***/
1276
1277 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1278 //      double _startTime = CkWallTimer();
1279         if(env->recver.type != TypeNodeGroup){
1280         //This repeats a step performed in skipCldEnq for messages sent to
1281         //other processors. I do this here so that messages to local processors
1282         //undergo the same transformation.. It lets the resend be uniform for 
1283         //all messages
1284 //              CmiSetXHandler(env,CmiGetHandler(env));
1285                 _skipCldEnqueue(destPE,env,_infoIdx);
1286         }else{
1287                 _noCldNodeEnqueue(destPE,env);
1288         }
1289 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1290 }
1291 //extern "C" int CmiGetNonLocalLength();
1292 /** This method is used to retry the ticket requests
1293  * that had been queued up earlier
1294  * */
1295
1296 int calledRetryTicketRequest=0;
1297
1298 void retryTicketRequestTimer(void *_dummy,double _time){
1299                 calledRetryTicketRequest=0;
1300                 retryTicketRequest(_dummy,_time);
1301 }
1302
1303 void retryTicketRequest(void *_dummy,double curWallTime){       
1304         double start = CkWallTimer();
1305         DEBUG(CmiMemoryCheck());
1306         int length = CpvAccess(_delayedTicketRequests)->length();
1307         for(int i=0;i<length;i++){
1308                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1309                 if(ticketRequest){
1310                         char senderString[100],recverString[100];
1311                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1312                         DEBUG(CmiMemoryCheck());
1313                         _processTicketRequest(ticketRequest);
1314                   CmiFree(ticketRequest);
1315                         DEBUG(CmiMemoryCheck());
1316                 }       
1317         }       
1318         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1319                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1320                 ticketLogLocalMessage(entry);
1321         }
1322         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1323 //      int converse_qLength = CmiGetNonLocalLength();
1324         
1325 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1326
1327 /*      PingMsg pingMsg;
1328         pingMsg.PE = CkMyPe();
1329         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1330         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1331                 for(int i=0;i<CkNumPes();i++){
1332                         if(i != CkMyPe()){
1333                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1334                         }
1335                 }
1336         }*/     
1337         //TODO: change this back to 100
1338         if(calledRetryTicketRequest == 0){
1339                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1340                 calledRetryTicketRequest =1;
1341         }
1342         DEBUG(CmiMemoryCheck());
1343 }
1344
1345 void _pingHandler(PingMsg *msg){
1346         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1347         CmiFree(msg);
1348 }
1349
1350
1351 /*****************************************************************************
1352         Checkpointing methods..
1353                 Pack all the data on a processor and send it to the buddy periodically
1354                 Also used to throw away message logs
1355 *****************************************************************************/
1356 CkVec<TProcessedLog> processedTicketLog;
1357 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1358 void clearUpMigratedRetainedLists(int PE);
1359
1360 void checkpointAlarm(void *_dummy,double curWallTime){
1361         double diff = curWallTime-lastCompletedAlarm;
1362         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1363 /*      if(CkWallTimer()-lastRestart < 50){
1364                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1365                 return;
1366         }*/
1367         if(diff < ((chkptPeriod) - 2)){
1368                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1369                 return;
1370         }
1371         CheckpointRequest request;
1372         request.PE = CkMyPe();
1373         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1374         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1375 };
1376
1377 void _checkpointRequestHandler(CheckpointRequest *request){
1378         startMlogCheckpoint(NULL,CmiWallTimer());       
1379 }
1380
1381 void startMlogCheckpoint(void *_dummy,double curWallTime){
1382         double _startTime = CkWallTimer();
1383         checkpointCount++;
1384 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1385                 kill(getpid(),SIGKILL);
1386         }*/
1387         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1388                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1389         }
1390         PUP::sizer psizer;
1391         DEBUG(CmiMemoryCheck());
1392
1393         psizer | checkpointCount;
1394         
1395         CkPupROData(psizer);
1396         DEBUG(CmiMemoryCheck());
1397         CkPupGroupData(psizer);
1398         DEBUG(CmiMemoryCheck());
1399         CkPupNodeGroupData(psizer);
1400         DEBUG(CmiMemoryCheck());
1401         pupArrayElementsSkip(psizer,NULL);
1402         DEBUG(CmiMemoryCheck());
1403
1404         int dataSize = psizer.size();
1405         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1406         char *msg = (char *)CmiAlloc(totalSize);
1407         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1408         chkMsg->PE = CkMyPe();
1409         chkMsg->dataSize = dataSize;
1410
1411         
1412         char *buf = &msg[sizeof(CheckPointDataMsg)];
1413         PUP::toMem pBuf(buf);   
1414
1415         pBuf | checkpointCount;
1416         
1417         CkPupROData(pBuf);
1418         CkPupGroupData(pBuf);
1419         CkPupNodeGroupData(pBuf);
1420         pupArrayElementsSkip(pBuf,NULL);
1421
1422         unAckedCheckpoint=1;
1423         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1424         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1425         
1426         /*
1427                 Store the highest Ticket number processed for each chare on this processor
1428         */
1429         processedTicketLog.removeAll();
1430         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1431         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1432                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1433         }
1434
1435         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1436                 lastCompletedAlarm = curWallTime;
1437                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1438         }
1439         traceUserBracketEvent(28,_startTime,CkWallTimer());
1440 };
1441
1442 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1443         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1444         TProcessedLog logEntry;
1445         logEntry.recver = mlogData->objID;
1446         logEntry.tProcessed = mlogData->tProcessed;
1447         log->push_back(logEntry);
1448         char objString[100];
1449         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1450 }
1451
1452 class ElementPacker : public CkLocIterator {
1453 private:
1454         CkLocMgr *locMgr;
1455         PUP::er &p;
1456 public:
1457                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1458                 void addLocation(CkLocation &loc) {
1459                         CkArrayIndexMax idx=loc.getIndex();
1460                         CkGroupID gID = locMgr->ckGetGroupID();
1461                         p|gID;      // store loc mgr's GID as well for easier restore
1462                         p|idx;
1463                         p|loc;
1464     }
1465 };
1466
1467
1468 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1469         int numElements,i;
1470   int numGroups = CkpvAccess(_groupIDTable)->size();    
1471         if(!p.isUnpacking()){
1472                 numElements = CkCountArrayElements();
1473         }       
1474         p | numElements;
1475         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1476         if(!p.isUnpacking()){
1477                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1478         }else{
1479                 //Flush all recs of all LocMgrs before putting in new elements
1480 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1481                 for(int j=0;j<listsize;j++){
1482                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1483                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1484                                 listToSkip[j].idx.print();
1485                         }
1486                 }
1487                         
1488           for (int i=0; i<numElements; i++) {
1489                         CkGroupID gID;
1490                         CkArrayIndexMax idx;
1491                         p|gID;
1492             p|idx;
1493                         int flag=0;
1494                         int matchedIdx=0;
1495                         for(int j=0;j<listsize;j++){
1496                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1497                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1498                                                 matchedIdx = j;
1499                                                 flag = 1;
1500                                                 break;
1501                                         }
1502                                 }
1503                         }
1504                         if(flag == 1){
1505                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1506                         }else{
1507                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1508                         }
1509                                 
1510                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1511                         mgr->resume(idx,p,flag);
1512                         if(flag == 1){
1513                                 int homePE = mgr->homePe(idx);
1514                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1515                         }
1516           }
1517         }
1518 };
1519
1520
1521 void writeCheckpointToDisk(int size,char *chkpt){
1522         char fNameTemp[100];
1523         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1524         int fd = creat(fNameTemp,S_IRWXU);
1525         int ret = write(fd,chkpt,size);
1526         CkAssert(ret == size);
1527         close(fd);
1528         
1529         char fName[100];
1530         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1531         unlink(fName);
1532
1533         rename(fNameTemp,fName);
1534         
1535 }
1536
1537 //handler that receives the checkpoint from a processor
1538 //it stores it and acks it
1539 void _storeCheckpointHandler(char *msg){
1540         
1541         double _startTime=CkWallTimer();
1542                 
1543         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1544         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1545         
1546         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1547         
1548         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1549         if(oldChkpt != NULL){
1550                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1551                 CmiFree(oldmsg);
1552         }
1553         //turning off storing checkpoints
1554         
1555         int sendingPE = chkMsg->PE;
1556         
1557         CpvAccess(_storedCheckpointData)->buf = chkpt;
1558         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1559         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1560
1561 #ifdef CHECKPOINT_DISK
1562         //store the checkpoint on disk
1563         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1564         CpvAccess(_storedCheckpointData)->buf = NULL;
1565         CmiFree(msg);
1566 #endif
1567
1568         int count=0;
1569         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1570                 if(migratedNoticeList[j].fromPE == sendingPE){
1571                         migratedNoticeList[j].ackFrom = 1;
1572                 }else{
1573                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1574                 }
1575                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1576                         migratedNoticeList.remove(j);
1577                         count++;
1578                 }
1579                 
1580         }
1581         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1582         
1583         CheckPointAck ackMsg;
1584         ackMsg.PE = CkMyPe();
1585         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1586         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1587         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1588         
1589         
1590         
1591         traceUserBracketEvent(29,_startTime,CkWallTimer());
1592 };
1593
1594
1595 void sendRemoveLogRequests(){
1596         double _startTime = CkWallTimer();      
1597         //send out the messages asking senders to throw away message logs below a certain ticket number
1598         /*
1599                 The remove log request message looks like
1600                 |RemoveLogRequest||List of TProcessedLog|
1601         */
1602         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1603         char *requestMsg = (char *)CmiAlloc(totalSize);
1604         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1605         request->PE = CkMyPe();
1606         request->numberObjects = processedTicketLog.size();
1607         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1608         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1609         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1610         
1611         DEBUG(CmiMemoryCheck());
1612         for(int i=0;i<CkNumPes();i++){
1613                 CmiSyncSend(i,totalSize,requestMsg);
1614         }
1615         CmiFree(requestMsg);
1616
1617         clearUpMigratedRetainedLists(CmiMyPe());
1618         //TODO: clear ticketTable
1619         
1620         traceUserBracketEvent(30,_startTime,CkWallTimer());
1621         DEBUG(CmiMemoryCheck());
1622 }
1623
1624
1625 void _checkpointAckHandler(CheckPointAck *ackMsg){
1626         DEBUG(CmiMemoryCheck());
1627         unAckedCheckpoint=0;
1628         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));    
1629         if(onGoingLoadBalancing){
1630                 onGoingLoadBalancing = 0;
1631                 finishedCheckpointLoadBalancing();
1632         }else{
1633                 sendRemoveLogRequests();
1634         }
1635         CmiFree(ackMsg);
1636         
1637 };
1638
1639 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1640         DEBUG(CmiMemoryCheck());
1641         CmiMemoryCheck();
1642         char *data = (char *)_data;
1643         RemoveLogRequest *request = (RemoveLogRequest *)data;
1644         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1645         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1646
1647         int count=0;
1648         for(int i=0;i<mlog->length();i++){
1649                 MlogEntry *logEntry = mlog->deq();
1650                 int match=0;
1651                 for(int j=0;j<request->numberObjects;j++){
1652                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1653                                 //this log Entry should be removed
1654                                 match = 1;
1655                                 break;
1656                         }
1657                 }
1658                 char senderString[100],recverString[100];
1659 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1660                 if(match){
1661                         count++;
1662                         delete logEntry;
1663                 }else{
1664                         mlog->enq(logEntry);
1665                 }
1666         }
1667         if(count > 0){
1668                 char nameString[100];
1669                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1670         }
1671         DEBUG(CmiMemoryCheck());
1672         CmiMemoryCheck();
1673 };
1674
1675 void _removeProcessedLogHandler(char *requestMsg){
1676         double start = CkWallTimer();
1677         forAllCharesDo(removeProcessedLogs,requestMsg);
1678         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1679         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1680         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1681         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1682         if(request->PE == getCheckPointPE()){
1683                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1684                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1685                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1686                 int count=0;
1687 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1688                 DEBUG(char nameString[100];)
1689                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1690                 DEBUG(})*/
1691                 for(int i=0;i<localQ->length();i++){
1692                         LocalMessageLog localLogEntry = (*localQ)[i];
1693                         if(!fault_aware(localLogEntry.recver)){
1694                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1695                         }
1696                         bool keep = true;
1697                         for(int j=0;j<request->numberObjects;j++){                              
1698                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1699                                         keep = false;
1700                                         break;
1701                                 }
1702                         }       
1703                         if(keep){
1704                                 tempQ->enq(localLogEntry);
1705                         }else{
1706                                 count++;
1707                         }
1708                 }
1709                 delete localQ;
1710                 CpvAccess(_localMessageLog) = tempQ;
1711                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1712         }
1713
1714         /*
1715                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1716         */
1717         CmiMemoryCheck();
1718         clearUpMigratedRetainedLists(request->PE);
1719         
1720         traceUserBracketEvent(20,start,CkWallTimer());
1721         CmiFree(requestMsg);    
1722 };
1723
1724
1725 void clearUpMigratedRetainedLists(int PE){
1726         int count=0;
1727         CmiMemoryCheck();
1728         
1729         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1730                 if(migratedNoticeList[j].toPE == PE){
1731                         migratedNoticeList[j].ackTo = 1;
1732                 }
1733                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1734                         migratedNoticeList.remove(j);
1735                         count++;
1736                 }
1737         }
1738         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1739         
1740         for(int j=retainedObjectList.size()-1;j>=0;j--){
1741                 if(retainedObjectList[j]->migRecord.toPE == PE){
1742                         RetainedMigratedObject *obj = retainedObjectList[j];
1743                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1744                         retainedObjectList.remove(j);
1745                         if(obj->msg != NULL){
1746                                 CmiMemoryCheck();
1747                                 CmiFree(obj->msg);
1748                         }
1749                         delete obj;
1750                 }
1751         }
1752 }
1753
1754 /***************************************************************
1755         Restart Methods and handlers
1756 ***************************************************************/        
1757
1758 /**
1759  * Function for restarting an object with message logging
1760  */
1761 void CkMlogRestart(const char * dummy){
1762         printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1763         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1764         _restartFlag = 1;
1765         RestartRequest msg;
1766         msg.PE = CkMyPe();
1767         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1768         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1769 };
1770
1771 void CkMlogRestartDouble(void *,double){
1772         CkMlogRestart(NULL);
1773 };
1774
1775
1776 void readCheckpointFromDisk(int size,char *buf){
1777         char fName[100];
1778         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1779
1780         int fd = open(fName,O_RDONLY);
1781         int count=0;
1782         while(count < size){
1783                 count += read(fd,&buf[count],size-count);
1784         }
1785         close(fd);
1786         
1787 };
1788
1789
1790 void sendCheckpointData();
1791
1792 void _getCheckpointHandler(RestartRequest *restartMsg){
1793         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1794         CkAssert(restartMsg->PE == storedChkpt->PE);
1795         storedRequest = restartMsg;
1796         
1797         verifyAckTotal = 0;
1798         for(int i=0;i<migratedNoticeList.size();i++){
1799                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1800 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1801                         if(migratedNoticeList[i].ackFrom == 0){
1802                                 //need to verify if the object actually exists .. it might not
1803                                 //have been acked but it might exist on it
1804                                 VerifyAckMsg msg;
1805                                 msg.migRecord = migratedNoticeList[i];
1806                                 msg.index = i;
1807                                 msg.fromPE = CmiMyPe();
1808                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1809                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1810                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1811                                 verifyAckTotal++;
1812                         }
1813                 }
1814         }
1815         
1816         if(verifyAckTotal == 0){
1817                 sendCheckpointData();
1818         }
1819         verifyAckCount = 0;
1820 }
1821
1822
1823 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1824         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1825         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1826         if(rec != NULL && rec->type() == CkLocRec::local){
1827                         //this location exists on this processor
1828                         //and needs to be removed       
1829                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1830                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1831                         
1832                         int localIdx = localRec->getLocalIndex();
1833                         LBDatabase *lbdb = localRec->getLBDB();
1834                         LDObjHandle ldHandle = localRec->getLdHandle();
1835                                 
1836                         locMgr->setDuringMigration(true);
1837                         
1838                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1839                         lbdb->UnregisterObj(ldHandle);
1840                         
1841                         locMgr->setDuringMigration(false);
1842                         
1843                         verifyAckedRequests++;
1844
1845         }
1846         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1847         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1848 };
1849
1850
1851 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1852         int index =     verifyReply->index;
1853         migratedNoticeList[index] = verifyReply->migRecord;
1854         verifyAckCount++;
1855         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1856         if(verifyAckCount == verifyAckTotal){
1857                 sendCheckpointData();
1858         }
1859 }
1860
1861
1862
1863 void sendCheckpointData(){      
1864         RestartRequest *restartMsg = storedRequest;
1865         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1866         int numMigratedAwayElements = migratedNoticeList.size();
1867         if(migratedNoticeList.size() != 0){
1868                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1869 //                      CkAssert(migratedNoticeList.size() == 0);
1870         }
1871         
1872         
1873         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1874         
1875         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1876         
1877         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
1878         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
1879         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1880         
1881         char *msg = (char *)CmiAlloc(totalSize);
1882         
1883         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1884         dataMsg->PE = CkMyPe();
1885         dataMsg->restartWallTime = CmiTimer();
1886         dataMsg->checkPointSize = storedChkpt->bufSize;
1887         
1888         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1889 //      dataMsg->numMigratedAwayElements = 0;
1890         
1891         dataMsg->numMigratedInElements = 0;
1892         dataMsg->migratedElementSize = 0;
1893         dataMsg->lbGroupID = globalLBID;
1894         /*msg layout 
1895                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1896                 Local MessageLog|
1897         */
1898         //store checkpoint data
1899         char *buf = &msg[sizeof(RestartProcessorData)];
1900
1901         if(dataMsg->numMigratedAwayElements != 0){
1902                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1903                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1904         }
1905         
1906
1907 #ifdef CHECKPOINT_DISK
1908         readCheckpointFromDisk(storedChkpt->bufSize,buf);
1909 #else   
1910         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1911 #endif
1912         buf = &buf[storedChkpt->bufSize];
1913
1914
1915         //store localmessage Log
1916         dataMsg->numLocalMessages = localMsgQ->length();
1917         for(int i=0;i<localMsgQ->length();i++){
1918                 if(!fault_aware(((*localMsgQ)[i]).recver )){
1919                         CmiAbort("Non fault aware localMsgQ");
1920                 }
1921                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
1922                 buf = &buf[sizeof(LocalMessageLog)];
1923         }
1924         
1925         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1926         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1927         CmiFree(restartMsg);
1928 };
1929
1930
1931 // this list is used to create a vector of the object ids of all
1932 //the chares on this processor currently and the highest TN processed by them 
1933 //the first argument is actually a CkVec<TProcessedLog> *
1934 void createObjIDList(void *data,ChareMlogData *mlogData){
1935         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
1936         TProcessedLog entry;
1937         entry.recver = mlogData->objID;
1938         entry.tProcessed = mlogData->tProcessed;
1939         list->push_back(entry);
1940         char objString[100];
1941         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
1942 }
1943
1944
1945
1946 void _recvCheckpointHandler(char *_restartData){
1947         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1948         MigrationRecord *migratedAwayElements;
1949
1950         globalLBID = restartData->lbGroupID;
1951         
1952         restartData->restartWallTime *= 1000;
1953         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1954         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1955         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1956
1957         
1958         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1959         char *buf = &_restartData[sizeof(RestartProcessorData)];
1960         
1961         if(restartData->numMigratedAwayElements != 0){
1962                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1963                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1964                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1965                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1966         }
1967         
1968         PUP::fromMem pBuf(buf);
1969
1970         pBuf | checkpointCount;
1971
1972         CkPupROData(pBuf);
1973         CkPupGroupData(pBuf);
1974         CkPupNodeGroupData(pBuf);
1975 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1976         pupArrayElementsSkip(pBuf,NULL);
1977         CkAssert(pBuf.size() == restartData->checkPointSize);
1978         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1979         
1980         forAllCharesDo(initializeRestart,NULL);
1981         
1982         //store the restored local message log in a vector
1983         buf = &buf[restartData->checkPointSize];        
1984         for(int i=0;i<restartData->numLocalMessages;i++){
1985                 LocalMessageLog logEntry;
1986                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
1987                 
1988                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
1989                 if(recverObj!=NULL){
1990                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
1991                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
1992                         char senderString[100];
1993                         char recverString[100];
1994                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
1995                 }else{
1996 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
1997                 }
1998                 buf = &buf[sizeof(LocalMessageLog)];
1999         }
2000
2001         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2002
2003         CmiFree(_restartData);
2004         
2005         
2006         _initDone();
2007
2008         getGlobalStep(globalLBID);
2009         
2010         countUpdateHomeAcks = 0;
2011         RestartRequest updateHomeRequest;
2012         updateHomeRequest.PE = CmiMyPe();
2013         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2014         for(int i=0;i<CmiNumPes();i++){
2015                 if(i != CmiMyPe()){
2016                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2017                 }
2018         }
2019
2020 }
2021
2022 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2023         countUpdateHomeAcks++;
2024         CmiFree(updateHomeAck);
2025         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2026         if(countUpdateHomeAcks != CmiNumPes()){
2027                 return;
2028         }
2029
2030         // Send out the request to resend logged messages to all other processors
2031         CkVec<TProcessedLog> objectVec;
2032         forAllCharesDo(createObjIDList, (void *)&objectVec);
2033         int numberObjects = objectVec.size();
2034         
2035         /*
2036                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2037         */
2038         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2039         char *resendMsg = (char *)CmiAlloc(totalSize);
2040         
2041
2042         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2043         resendReq->PE =CkMyPe(); 
2044         resendReq->numberObjects = numberObjects;
2045         char *objList = &resendMsg[sizeof(ResendRequest)];
2046         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2047         
2048
2049         
2050
2051         /* test for parallel restart migrate away object**/
2052         if(parallelRestart){
2053                 distributeRestartedObjects();
2054                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2055         }
2056         
2057         /*      To make restart work for load balancing.. should only
2058         be used when checkpoint happens along with load balancing
2059         **/
2060 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2061
2062         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2063         CpvAccess(_currentObj) = lb;
2064         lb->ReceiveDummyMigration(restartDecisionNumber);
2065
2066         sleep(10);
2067         
2068         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2069         for(int i=0;i<CkNumPes();i++){
2070                 if(i != CkMyPe()){
2071                         CmiSyncSend(i,totalSize,resendMsg);
2072                 }       
2073         }
2074         _resendMessagesHandler(resendMsg);
2075         CmiFree(resendMsg);
2076 };
2077
2078 void initializeRestart(void *data,ChareMlogData *mlogData){
2079         mlogData->resendReplyRecvd = 0;
2080         mlogData->receivedTNs = new CkVec<MCount>;
2081         mlogData->restartFlag = 1;
2082         mlogData->restoredLocalMsgLog.removeAll();
2083         mlogData->mapTable.empty();
2084 };
2085
2086
2087 void updateHomePE(void *data,ChareMlogData *mlogData){
2088         RestartRequest *updateRequest = (RestartRequest *)data;
2089         int PE = updateRequest->PE; //restarted PE
2090         //if this object is an array Element and its home is the restarted processor
2091         // the home processor needs to know its current location
2092         if(mlogData->objID.type == TypeArray){
2093                 //it is an array element
2094                 CkGroupID myGID = mlogData->objID.data.array.id;
2095                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2096                 CkArrayID aid(mlogData->objID.data.array.id);           
2097                 //check if the restarted processor is the home processor for this object
2098                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2099                 if(locMgr->homePe(myIdx) == PE){
2100                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2101                         DEBUGRESTART(myIdx.print());
2102                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2103                 }
2104         }
2105 };
2106
2107
2108 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2109         
2110         int sender = updateRequest->PE;
2111         
2112         forAllCharesDo(updateHomePE,updateRequest);
2113         
2114         updateRequest->PE = CmiMyPe();
2115         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2116         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2117         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2118                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2119                 checkpointCount--;
2120                 startMlogCheckpoint(NULL,0);
2121         }
2122         if(sender == getCheckPointPE()){
2123                 for(int i=0;i<retainedObjectList.size();i++){
2124                         if(retainedObjectList[i]->acked == 0){
2125                                 MigrationNotice migMsg;
2126                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2127                                 migMsg.record = retainedObjectList[i];
2128                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2129                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2130                         }
2131                 }
2132
2133         }
2134 }
2135
2136
2137 //the data argument is of type ResendData which contains the 
2138 //array of objects on  the restartedProcessor
2139 //this method resends the messages stored in this chare's message log 
2140 //to the restarted processor. It also accumulates the maximum TN
2141 //for all the objects on the restarted processor
2142 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2143         char nameString[100];
2144         ResendData *resendData = (ResendData *)data;
2145         int PE = resendData->PE; //restarted PE
2146         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2147         int count=0;
2148         int ticketRequests=0;
2149         CkQ<MlogEntry *> *log = mlogData->getMlog();
2150
2151         
2152         
2153         
2154         for(int i=0;i<log->length();i++){
2155                 MlogEntry *logEntry = (*log)[i];
2156                 
2157                 // if we sent out the logs of a local message to buddy and he crashed
2158                 //before acking
2159                 envelope *env = logEntry->env;
2160                 if(env == NULL){
2161                         continue;
2162                 }
2163                 if(logEntry->unackedLocal){
2164                         char recverString[100];
2165                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2166                         sendLocalMessageCopy(logEntry);
2167                 }
2168                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2169                 if(env->TN <= 0){
2170                         //ticket not yet replied send it out again
2171                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2172                 }
2173                 
2174                 if(env->recver.type != TypeInvalid){
2175                         int flag = 0;//marks if any of the restarted objects matched this log entry
2176                         for(int j=0;j<resendData->numberObjects;j++){
2177                                 if(env->recver == (resendData->listObjects)[j].recver){
2178                                         flag = 1;
2179                                         //message has a valid TN
2180                                         if(env->TN > 0){
2181                                                 //store maxTicket
2182                                                 if(env->TN > resendData->maxTickets[j]){
2183                                                         resendData->maxTickets[j] = env->TN;
2184                                                 }
2185                                                 //if the TN for this entry is more than the TN processed, send the message out
2186                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2187                                                         //store the TNs that have been since the recver last checkpointed
2188                                                         resendData->ticketVecs[j].push_back(env->TN);
2189                                                         
2190                                                         if(PE != CkMyPe()){
2191                                                                 if(env->recver.type == TypeNodeGroup){
2192                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2193                                                                 }else{
2194                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2195                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2196                                                                 }
2197                                                         }else{
2198                                                                 envelope *copyEnv = copyEnvelope(env);
2199                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2200                                                         }
2201                                                         char senderString[100];
2202                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2203                                                         count++;
2204                                                 }       
2205                                         }else{
2206 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2207                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2208                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2209                                                 CkAssert(logEntry->destPE != CkMyPe());
2210                                                 
2211                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2212                                                 
2213                                                 ticketRequests++;*/
2214                                         }
2215                                 }
2216                         }//end of for loop of objects
2217                         
2218                 }       
2219         }
2220         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2221 }
2222
2223 void _resendMessagesHandler(char *msg){
2224         ResendRequest *resendReq = (ResendRequest *)msg;
2225         char *listObjects = &msg[sizeof(ResendRequest)];
2226         ResendData d;
2227         d.numberObjects = resendReq->numberObjects;
2228         d.PE = resendReq->PE;
2229         d.listObjects = (TProcessedLog *)listObjects;
2230         d.maxTickets = new MCount[d.numberObjects];
2231         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2232         for(int i=0;i<d.numberObjects;i++){
2233                 d.maxTickets[i] = 0;
2234         }
2235
2236         //Check if any of the retained objects need to be recreated
2237         //If they have not been recreated on the restarted processor
2238         //they need to be recreated on this processor
2239         int count=0;
2240         for(int i=0;i<retainedObjectList.size();i++){
2241                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2242                         count++;
2243                         int recreate=1;
2244                         for(int j=0;j<d.numberObjects;j++){
2245                                 if(d.listObjects[j].recver.type != TypeArray ){
2246                                         continue;
2247                                 }
2248                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2249                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2250                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2251                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2252                                                 recreate = 0;
2253                                                 break;
2254                                         }
2255                                 }
2256                         }
2257                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2258                         if(recreate){
2259                                 donotCountMigration=1;
2260                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2261                                 donotCountMigration=0;
2262                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2263                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2264                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2265
2266                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2267                                 
2268                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2269                                 CmiAssert(rec->type() == CkLocRec::local);
2270                                 CkVec<CkMigratable *> eltList;
2271                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2272                                 for(int j=0;j<eltList.size();j++){
2273                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2274                                                 CpvAccess(_currentObj) = eltList[j];
2275                                                 eltList[j]->ResumeFromSync();
2276                                         }
2277                                 }
2278
2279
2280                                 
2281                                 retainedObjectList[i]->msg=NULL;
2282                                 
2283                                         
2284                         }
2285                 }
2286         }
2287         
2288         if(count > 0){
2289 //              CmiAbort("retainedObjectList for restarted processor not empty");
2290         }
2291
2292
2293         
2294         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2295         forAllCharesDo(resendMessageForChare,&d);
2296
2297         //send back the maximum ticket number for a message sent to each object on the 
2298         //restarted processor
2299         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2300         
2301         int totalTNStored=0;
2302         for(int i=0;i<d.numberObjects;i++){
2303                 totalTNStored += d.ticketVecs[i].size();
2304         }
2305         
2306         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2307         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2308         
2309         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2310         resendReply->PE = CkMyPe();
2311         resendReply->numberObjects = d.numberObjects;
2312         
2313         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2314         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2315         for(int i=0;i<d.numberObjects;i++){
2316                 replyObjects[i] = d.listObjects[i].recver;
2317         }
2318         
2319         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2320         for(int i=0;i<d.numberObjects;i++){
2321                 int vecsize = d.ticketVecs[i].size();
2322                 memcpy(ticketList,&vecsize,sizeof(int));
2323                 ticketList = &ticketList[sizeof(int)];
2324                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2325                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2326         }       
2327
2328         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2329         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2330         
2331 /*      
2332         if(verifyAckRequestsUnacked){
2333                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2334                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2335                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2336                         LDObjHandle h;
2337                         lb->Migrated(h,1);
2338                 }
2339         }
2340         
2341         verifyAckRequestsUnacked=0;*/
2342
2343
2344         
2345         delete [] d.maxTickets;
2346         delete [] d.ticketVecs;
2347         if(resendReq->PE != CkMyPe()){
2348                 CmiFree(msg);
2349         }       
2350 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2351         lastRestart = CmiWallTimer();
2352 }
2353
2354 void sortVec(CkVec<MCount> *TNvec);
2355 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2356
2357 void _resendReplyHandler(char *msg){    
2358         /**
2359                 need to rewrite this method to deal with parallel restart
2360         */
2361         ResendRequest *resendReply = (ResendRequest *)msg;
2362         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2363
2364         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2365         
2366         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2367         for(int i =0; i< resendReply->numberObjects;i++){       
2368                 Chare *obj = (Chare *)listObjects[i].getObject();
2369                 
2370                 int vecsize;
2371                 memcpy(&vecsize,listTickets,sizeof(int));
2372                 listTickets = &listTickets[sizeof(int)];
2373                 MCount *listTNs = (MCount *)listTickets;
2374                 
2375                 
2376                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2377                 
2378                 if(obj != NULL){
2379                         //the object was restarted on the processor on which it existed
2380                         processReceivedTN(obj,vecsize,listTNs);
2381                 }else{
2382                 //pack up objID vecsize and listTNs and send it to the correct processor
2383                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2384                         char *TNMsg = (char *)CmiAlloc(totalSize);
2385                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2386                         receivedTNData->recver = listObjects[i];
2387                         receivedTNData->numTNs = vecsize;
2388                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2389                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2390
2391                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2392                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2393                 }
2394                 
2395         }
2396 };
2397
2398 void _receivedTNDataHandler(ReceivedTNData *msg){
2399         char objName[100];
2400         Chare *obj = (Chare *) msg->recver.getObject();
2401         if(obj){                
2402                 char *_msg = (char *)msg;
2403                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2404                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2405                 processReceivedTN(obj,msg->numTNs,listTNs);
2406         }else{
2407                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2408                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2409         }
2410 };
2411
2412
2413 void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
2414         obj->mlogData->resendReplyRecvd++;
2415
2416         
2417         for(int j=0;j<listSize;j++){
2418                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2419         }
2420         
2421         //if this object has received all the replies find the ticket numbers
2422         //that senders know about. Those less than the ticket number processed 
2423         //by the receiver can be thrown away. The rest need not be consecutive
2424         // ie there can be holes in the list of ticket numbers seen by senders
2425         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2426                 obj->mlogData->resendReplyRecvd = 0;
2427                 //sort the received TNS
2428                 sortVec(obj->mlogData->receivedTNs);
2429         
2430                 //after all the received tickets are in we need to sort them and then 
2431                 // calculate the holes
2432                 
2433                 if(obj->mlogData->receivedTNs->size() > 0){
2434                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2435                         int vecsize = obj->mlogData->receivedTNs->size();
2436                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2437                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2438                         if(numberHoles == 0){
2439                         }else{
2440                                 char objName[100];                                      
2441                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2442                                 obj->mlogData->numberHoles = numberHoles;
2443                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2444                                 int countHoles=0;
2445                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2446                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2447                                                 //the TNs are not consecutive at this point
2448                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2449                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2450                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2451                                                         countHoles++;
2452                                                 }       
2453                                         }
2454                                 }
2455                                 //Holes have been given new TN
2456                                 if(countHoles != numberHoles){
2457                                         char str[100];
2458                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2459                                 }
2460                                 CkAssert(countHoles == numberHoles);                                    
2461                                 obj->mlogData->currentHoles = numberHoles;
2462                         }
2463                 }       
2464                 
2465                 delete obj->mlogData->receivedTNs;
2466                 obj->mlogData->receivedTNs = NULL;
2467                 obj->mlogData->restartFlag = 0;
2468                 char objString[100];
2469                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2470         }
2471
2472 }
2473
2474
2475 void sortVec(CkVec<MCount> *TNvec){
2476         //sort it ->its bloddy bubble sort
2477         //TODO: use quicksort
2478         for(int i=0;i<TNvec->size();i++){
2479                 for(int j=i+1;j<TNvec->size();j++){
2480                         if((*TNvec)[j] < (*TNvec)[i]){
2481                                 MCount temp;
2482                                 temp = (*TNvec)[i];
2483                                 (*TNvec)[i] = (*TNvec)[j];
2484                                 (*TNvec)[j] = temp;
2485                         }
2486                 }
2487         }
2488         //make it unique .. since its sorted all equal units will be consecutive
2489         MCount *tempArray = new MCount[TNvec->size()];
2490         int     uniqueCount=-1;
2491         for(int i=0;i<TNvec->size();i++){
2492                 tempArray[i] = 0;
2493                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2494                         uniqueCount++;
2495                         tempArray[uniqueCount] = (*TNvec)[i];
2496                 }
2497         }
2498         uniqueCount++;
2499         TNvec->removeAll();
2500         for(int i=0;i<uniqueCount;i++){
2501                 TNvec->push_back(tempArray[i]);
2502         }
2503         delete [] tempArray;
2504 }       
2505
2506 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2507         if(TNVec->size() == 0){
2508                 return -1; //not found in an empty vec
2509         }
2510         //binary search to find 
2511         int left=0;
2512         int right = TNVec->size();
2513         int mid = (left +right)/2;
2514         while(searchTN != (*TNVec)[mid] && left < right){
2515                 if((*TNVec)[mid] > searchTN){
2516                         right = mid-1;
2517                 }else{
2518                         left = mid+1;
2519                 }
2520                 mid = (left + right)/2;
2521         }
2522         if(left < right){
2523                 //mid is the element to be returned
2524                 return mid;
2525         }else{
2526                 if(mid < TNVec->size() && mid >=0){
2527                         if((*TNVec)[mid] == searchTN){
2528                                 return mid;
2529                         }else{
2530                                 return -1;
2531                         }
2532                 }else{
2533                         return -1;
2534                 }
2535         }
2536 };
2537
2538
2539 /*
2540         Method to do parallel restart. Distribute some of the array elements to other processors.
2541         The problem is that we cant use to charm entry methods to do migration as it will get
2542         stuck in the protocol that is going to restart
2543 */
2544
2545 class ElementDistributor: public CkLocIterator{
2546         CkLocMgr *locMgr;
2547         int *targetPE;
2548         void pupLocation(CkLocation &loc,PUP::er &p){
2549                 CkArrayIndexMax idx=loc.getIndex();
2550                 CkGroupID gID = locMgr->ckGetGroupID();
2551                 p|gID;      // store loc mgr's GID as well for easier restore
2552                 p|idx;
2553                 p|loc;
2554         };
2555         public:
2556                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2557                 void addLocation(CkLocation &loc){
2558                         if(*targetPE == CkMyPe()){
2559                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2560                                 return;
2561                         }
2562                         
2563                         CkArrayIndexMax idx=loc.getIndex();
2564                         CkLocRec_local *rec = loc.getLocalRecord();
2565                         
2566                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2567                         idx.print();
2568                         
2569
2570                         //TODO: an element that is being moved should leave some trace behind so that
2571                         // the arraybroadcaster can forward messages to it
2572                         
2573                         //pack up this location and send it across
2574                         PUP::sizer psizer;
2575                         pupLocation(loc,psizer);
2576                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2577                         char *msg = (char *)CmiAlloc(totalSize);
2578                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2579                         PUP::toMem pmem(buf);
2580                         pmem.becomeDeleting();
2581                         pupLocation(loc,pmem);
2582                         
2583                         locMgr->setDuringMigration(CmiTrue);                    
2584                         delete rec;
2585                         locMgr->setDuringMigration(CmiFalse);                   
2586                         locMgr->inform(idx,*targetPE);
2587
2588                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2589                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2590
2591                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2592                         //decide on the target processor for the next object
2593                         *targetPE = (*targetPE +1)%CkNumPes();
2594                 }
2595                 
2596 };
2597
2598 void distributeRestartedObjects(){
2599         int numGroups = CkpvAccess(_groupIDTable)->size();      
2600         int i;
2601         int targetPE=CkMyPe();
2602         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2603 };
2604
2605 void _distributedLocationHandler(char *receivedMsg){
2606         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2607         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2608         PUP::fromMem pmem(buf);
2609         CkGroupID gID;
2610         CkArrayIndexMax idx;
2611         pmem |gID;
2612         pmem |idx;
2613         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2614         donotCountMigration=1;
2615         mgr->resume(idx,pmem);
2616         donotCountMigration=0;
2617         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2618         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2619         idx.print();
2620
2621         CkLocRec *rec = mgr->elementRec(idx);
2622         CmiAssert(rec->type() == CkLocRec::local);
2623         
2624         CkVec<CkMigratable *> eltList;
2625         mgr->migratableList((CkLocRec_local *)rec,eltList);
2626         for(int i=0;i<eltList.size();i++){
2627                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2628                         CpvAccess(_currentObj) = eltList[i];
2629                         eltList[i]->ResumeFromSync();
2630                 }
2631         }
2632         
2633         
2634 }
2635
2636
2637 /** this method is used to send messages to a restarted processor to tell
2638  * it that a particular expected object is not going to get to it */
2639 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2640         DummyMigrationMsg buf;
2641         buf.flag = MLOG_OBJECT;
2642         buf.lbID = lbID;
2643         buf.mgrID = locMgrID;
2644         buf.idx = idx;
2645         buf.locationPE = locationPE;
2646         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2647         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2648 };
2649
2650
2651 /**this method is used by a restarted processor to tell other processors
2652  * that they are not going to receive these many objects.. just the count
2653  * not the objects themselves ***/
2654
2655 void sendDummyMigrationCounts(int *dummyCounts){
2656         DummyMigrationMsg buf;
2657         buf.flag = MLOG_COUNT;
2658         buf.lbID = globalLBID;
2659         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2660         for(int i=0;i<CmiNumPes();i++){
2661                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2662                         buf.count = dummyCounts[i];
2663                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2664                 }
2665         }
2666 }
2667
2668
2669 /** this handler is used to process a dummy migration msg.
2670  * it looks up the load balancer and calls migrated for it */
2671
2672 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2673         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2674         if(msg->flag == MLOG_OBJECT){
2675                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2676                 LDObjHandle h;
2677                 lb->Migrated(h,1);
2678         }
2679         if(msg->flag == MLOG_COUNT){
2680                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2681                 msg->count -= verifyAckedRequests;
2682                 for(int i=0;i<msg->count;i++){
2683                         LDObjHandle h;
2684                         lb->Migrated(h,1);
2685                 }
2686         }
2687         verifyAckedRequests=0;
2688         CmiFree(msg);
2689 };
2690
2691
2692
2693
2694
2695
2696
2697 /*****************************************************
2698         Implementation of a method that can be used to call
2699         any method on the ChareMlogData of all the chares on
2700         a processor currently
2701 ******************************************************/
2702
2703
2704 class ElementCaller :  public CkLocIterator {
2705 private:
2706         CkLocMgr *locMgr;
2707         MlogFn fnPointer;
2708         void *data;
2709 public:
2710         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2711                 locMgr = _locMgr;
2712                 fnPointer = _fnPointer;
2713                 data = _data;
2714         };
2715         void addLocation(CkLocation &loc){
2716                 CkVec<CkMigratable *> list;
2717                 CkLocRec_local *local = loc.getLocalRecord();
2718                 locMgr->migratableList (local,list);
2719                 for(int i=0;i<list.size();i++){
2720                         CkMigratable *migratableElement = list[i];
2721                         fnPointer(data,migratableElement->mlogData);
2722                 }
2723         }
2724 };
2725
2726 void forAllCharesDo(MlogFn fnPointer,void *data){
2727         int numGroups = CkpvAccess(_groupIDTable)->size();
2728         for(int i=0;i<numGroups;i++){
2729                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2730                 fnPointer(data,obj->mlogData);
2731         }
2732         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2733         for(int i=0;i<numNodeGroups;i++){
2734                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
2735                 fnPointer(data,obj->mlogData);
2736         }
2737         int i;
2738         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
2739         
2740         
2741 };
2742
2743
2744 /******************************************************************
2745  Load Balancing
2746 ******************************************************************/
2747
2748 void initMlogLBStep(CkGroupID gid){
2749         countLBMigratedAway = 0;
2750         countLBToMigrate=0;
2751         onGoingLoadBalancing=1;
2752         migrationDoneCalled=0;
2753         checkpointBarrierCount=0;
2754         if(globalLBID.idx != 0){
2755                 CmiAssert(globalLBID.idx == gid.idx);
2756         }
2757         globalLBID = gid;
2758 }
2759
2760 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
2761         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
2762         
2763         resumeLbFnPtr = _fnPtr;
2764         centralLb = _centralLb;
2765         migrationDoneCalled = 1;
2766         if(countLBToMigrate == countLBMigratedAway){
2767                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2768                 startMlogCheckpoint(NULL,CmiWallTimer());       
2769         }
2770 };
2771
2772 void finishedCheckpointLoadBalancing(){
2773         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
2774         CheckpointBarrierMsg msg;
2775         msg.fromPE = CmiMyPe();
2776         msg.checkpointCount = checkpointCount;
2777
2778         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
2779         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
2780         
2781 };
2782
2783
2784 void sendMlogLocation(int targetPE,envelope *env){
2785         void *_msg = EnvToUsr(env);
2786         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2787
2788
2789         int existing = 0;
2790         //if this object is already in the retainedobjectlust destined for this
2791         //processor it should not be sent
2792         
2793         for(int i=0;i<retainedObjectList.size();i++){
2794                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
2795                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
2796                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
2797                         existing = 1;
2798                         break;
2799                 }
2800         }
2801
2802         if(existing){
2803                 return;
2804         }
2805         
2806         
2807         countLBToMigrate++;
2808         
2809         MigrationNotice migMsg;
2810         migMsg.migRecord.gID = msg->gid;
2811         migMsg.migRecord.idx = msg->idx;
2812         migMsg.migRecord.fromPE = CkMyPe();
2813         migMsg.migRecord.toPE =  targetPE;
2814         
2815         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
2816         
2817         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
2818         retainedObject->migRecord = migMsg.migRecord;
2819         retainedObject->acked  = 0;
2820         
2821         CkPackMessage(&env);
2822         
2823         migMsg.record = retainedObject;
2824         retainedObject->msg = env;
2825         int size = retainedObject->size = env->getTotalsize();
2826         
2827         retainedObjectList.push_back(retainedObject);
2828         
2829         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2830         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2831         
2832         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
2833
2834 }
2835
2836 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
2837         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
2838         migratedNoticeList.push_back(msg->migRecord);
2839
2840         MigrationNoticeAck buf;
2841         buf.record = msg->record;
2842         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
2843         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
2844 }
2845
2846 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
2847         
2848         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
2849         retainedObject->acked = 1;
2850
2851         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
2852         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
2853
2854         //inform home about the new location of this object
2855         CkGroupID gID = retainedObject->migRecord.gID ;
2856         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2857         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
2858         
2859         countLBMigratedAway++;
2860         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
2861                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2862                 startMlogCheckpoint(NULL,CmiWallTimer());
2863         }
2864 };
2865
2866 void _receiveMlogLocationHandler(void *buf){
2867         envelope *env = (envelope *)buf;
2868         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
2869         CkUnpackMessage(&env);
2870         void *_msg = EnvToUsr(env);
2871         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2872         CkGroupID gID= msg->gid;
2873         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
2874         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2875         CpvAccess(_currentObj)=mgr;
2876         mgr->immigrate(msg);
2877 };
2878
2879
2880 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
2881 /*      if(mlogData->objID.type == TypeArray){
2882                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
2883         //      TODO: make sure later that atSync has been called and it needs 
2884         //      to be resumed from sync
2885         //
2886                 CpvAccess(_currentObj) = elt;
2887                 elt->ResumeFromSync();
2888         }*/
2889 }
2890
2891 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
2892         if(checkpointBarrierCount == CmiNumPes()){
2893                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
2894                 for(int i=0;i<CmiNumPes();i++){
2895                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
2896                 }
2897         }
2898 }
2899
2900 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
2901         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
2902         if(msg->checkpointCount == checkpointCount){
2903                 checkpointBarrierCount++;
2904                 checkAndSendCheckpointBarrierAcks(msg);
2905         }else{
2906                 if(msg->checkpointCount-1 == checkpointCount){
2907                         checkpointBarrierCount++;
2908                         checkAndSendCheckpointBarrierAcks(msg);
2909                 }else{
2910                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
2911                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
2912                 }
2913         }
2914         CmiFree(msg);
2915 }
2916
2917 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
2918         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
2919         sendRemoveLogRequests();
2920         (*resumeLbFnPtr)(centralLb);
2921         CmiFree(msg);
2922 }
2923
2924 /**
2925         method that informs an array elements home processor of its current location
2926         It is a converse method to bypass the charm++ message logging framework
2927 */
2928
2929 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
2930         double _startTime = CmiWallTimer();
2931         CurrentLocationMsg msg;
2932         msg.mgrID = locMgrID;
2933         msg.idx = idx;
2934         msg.locationPE = currentPE;
2935         msg.fromPE = CkMyPe();
2936
2937         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
2938         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
2939         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
2940         traceUserBracketEvent(37,_startTime,CmiWallTimer());
2941 }
2942
2943
2944 void _receiveLocationHandler(CurrentLocationMsg *data){
2945         double _startTime = CmiWallTimer();
2946         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
2947         if(mgr == NULL){
2948                 CmiFree(data);
2949                 return;
2950         }
2951         CkLocRec *rec = mgr->elementNrec(data->idx);
2952         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
2953         if(rec != NULL){
2954                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
2955                         if(data->fromPE == data->locationPE){
2956                                 CmiAbort("Another processor has the same object");
2957                         }
2958                 }
2959         }
2960         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
2961                 int targetPE = data->fromPE;
2962                 data->fromPE = CmiMyPe();
2963                 data->locationPE = CmiMyPe();
2964                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
2965                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
2966         }else{
2967                 mgr->inform(data->idx,data->locationPE);
2968         }
2969         CmiFree(data);
2970         traceUserBracketEvent(38,_startTime,CmiWallTimer());
2971 }
2972
2973
2974
2975 void getGlobalStep(CkGroupID gID){
2976         LBStepMsg msg;
2977         int destPE = 0;
2978         msg.lbID = gID;
2979         msg.fromPE = CmiMyPe();
2980         msg.step = -1;
2981         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
2982         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
2983 };
2984
2985 void _getGlobalStepHandler(LBStepMsg *msg){
2986         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2987         msg->step = lb->step();
2988         CmiAssert(msg->fromPE != CmiMyPe());
2989         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
2990         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
2991         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
2992 };
2993
2994 void _recvGlobalStepHandler(LBStepMsg *msg){
2995         
2996         restartDecisionNumber=msg->step;
2997         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
2998         _updateHomeAckHandler(dummyAck);
2999 };
3000
3001
3002
3003
3004
3005
3006
3007
3008 void _messageLoggingExit(){
3009 /*      if(CkMyPe() == 0){
3010                 if(countBuffered != 0){
3011                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3012                 }
3013
3014 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3015         }
3016         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3017         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3018 }
3019 /**********************************
3020         * The methods of the message logging
3021         * data structure stored in each chare
3022         ********************************/
3023
3024 MCount ChareMlogData::nextSN(const CkObjID &recver){
3025 /*      MCount SN = snTable.get(recver);
3026         snTable.put(recver) = SN+1;
3027         return SN+1;*/
3028         double _startTime = CmiWallTimer();
3029         MCount *SN = snTable.getPointer(recver);
3030         if(SN==NULL){
3031                 snTable.put(recver) = 1;
3032                 return 1;
3033         }else{
3034                 (*SN)++;
3035                 return *SN;
3036         }
3037 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3038 };
3039
3040
3041 MCount ChareMlogData::newTN(){
3042         MCount TN;
3043         if(currentHoles > 0){
3044                 int holeidx = numberHoles-currentHoles;
3045                 TN = ticketHoles[holeidx];
3046                 currentHoles--;
3047                 if(currentHoles == 0){
3048                         delete []ticketHoles;
3049                         numberHoles = 0;
3050                 }
3051         }else{
3052                 TN = ++tCount;
3053         }       
3054         return TN;
3055 };
3056
3057 Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
3058         char senderName[100];
3059         char recverName[100];
3060         double _startTime =CmiWallTimer();
3061         Ticket ticket;
3062         if(restartFlag){
3063                 ticket.TN = 0;
3064                 return ticket;
3065         }
3066 /*      SNToTicket &ticketRow = ticketTable.put(sender);
3067         Ticket earlierTicket = ticketRow.get(SN);
3068         if(earlierTicket.TN == 0){
3069                 //This SN has not been ever alloted a ticket
3070                 ticket.TN = newTN();
3071                 ticketRow.put(SN)=ticket;
3072         }else{
3073                 ticket.TN = earlierTicket.TN;
3074         }*/
3075         
3076
3077         SNToTicket *ticketRow = ticketTable.get(sender);
3078         if(ticketRow != NULL){
3079                 Ticket earlierTicket = ticketRow->get(SN);
3080                 if(earlierTicket.TN == 0){
3081                         ticket.TN = newTN();
3082                         ticketRow->put(SN) = ticket;
3083                         DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
3084                 }else{
3085                         ticket.TN = earlierTicket.TN;
3086                         if(ticket.TN > tCount){
3087                                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
3088                         }
3089                                 CmiAssert(ticket.TN <= tCount);