Changes due to the messagelogging machine.
[charm.git] / src / ck-core / ckmessagelogging.C
1 #ifdef _FAULT_MLOG_
2
3 #include "ck.h"
4 #include "ckmessagelogging.h"
5 #include "queueing.h"
6 #include <sys/types.h>
7 #include <signal.h>
8 #include "CentralLB.h"
9
10 //#define DEBUG(x)  if(_restartFlag) {x;}
11 #define DEBUG(x)  //x
12 #define DEBUGRESTART(x)  //x
13 #define DEBUGLB(x) //x
14
15 #define BUFFERED_LOCAL
16 #define BUFFERED_REMOTE
17
18 extern const char *idx2str(const CkArrayIndex &ind);
19 extern const char *idx2str(const ArrayElement *el);
20 const char *idx2str(const CkArrayIndexMax &ind){
21         return idx2str((const CkArrayIndex &)ind);
22 };
23
24 void getGlobalStep(CkGroupID gID);
25
26 bool fault_aware(CkObjID &recver);
27
28 int _restartFlag=0;
29 int restarted=0;
30
31 //TODO: remove for perf runs
32 int countHashRefs=0; //count the number of gets
33 int countHashCollisions=0;
34
35
36 //#define CHECKPOINT_DISK
37 char *checkpointDirectory=".";
38 int unAckedCheckpoint=0;
39
40 int countLocal=0,countBuffered=0;
41 int countPiggy=0;
42 int countClearBufferedLocalCalls=0;
43
44 int countUpdateHomeAcks=0;
45
46 extern int chkptPeriod;
47 extern bool parallelRestart;
48
49 char *killFile;
50 int killFlag=0;
51 int restartingMlogFlag=0;
52 void readKillFile();
53 double killTime=0.0;
54 int checkpointCount=0;
55
56
57 CpvDeclare(Chare *,_currentObj);
58 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
59 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
60 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
61 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
62 CpvDeclare(Queue, _outOfOrderMessageQueue);
63 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
64 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
65 CpvDeclare(char **,_bufferedTicketRequests);
66 CpvDeclare(int *,_numBufferedTicketRequests);
67 CpvDeclare(char *,_bufferTicketReply);
68
69
70
71 static double adjustChkptPeriod=0.0; //in ms
72 static double nextCheckpointTime=0.0;//in seconds
73
74 double lastBufferedLocalMessageCopyTime;
75
76 int _maxBufferedMessages;
77 int _maxBufferedTicketRequests;
78 int BUFFER_TIME=2; // in ms
79
80
81 int _ticketRequestHandlerIdx;
82 int _ticketHandlerIdx;
83 int _localMessageCopyHandlerIdx;
84 int _localMessageAckHandlerIdx;
85 int _pingHandlerIdx;
86 int _bufferedLocalMessageCopyHandlerIdx;
87 int _bufferedLocalMessageAckHandlerIdx;
88 int _bufferedTicketRequestHandlerIdx;
89 int _bufferedTicketHandlerIdx;
90
91
92 char objString[100];
93 int _checkpointRequestHandlerIdx;
94 int _storeCheckpointHandlerIdx;
95 int _checkpointAckHandlerIdx;
96 int _getCheckpointHandlerIdx;
97 int _recvCheckpointHandlerIdx;
98 int _removeProcessedLogHandlerIdx;
99
100 int _verifyAckRequestHandlerIdx;
101 int _verifyAckHandlerIdx;
102 int _dummyMigrationHandlerIdx;
103
104
105 int     _getGlobalStepHandlerIdx;
106 int     _recvGlobalStepHandlerIdx;
107
108 int _updateHomeRequestHandlerIdx;
109 int _updateHomeAckHandlerIdx;
110 int _resendMessagesHandlerIdx;
111 int _resendReplyHandlerIdx;
112 int _receivedTNDataHandlerIdx;
113 int _distributedLocationHandlerIdx;
114
115
116 int verifyAckTotal;
117 int verifyAckCount;
118
119 int verifyAckedRequests=0;
120
121 RestartRequest *storedRequest;
122
123
124
125 int _falseRestart =0; /**
126                                                                                                         For testing on clusters we might carry out restarts on 
127                                                                                                         a porcessor without actually starting it
128                                                                                                         1 -> false restart
129                                                                                                         0 -> restart after an actual crash
130                                                                                                 */                                                                                                                              
131
132 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
133 int _lockNewTicket=0;
134
135
136 //Load balancing globals
137 int onGoingLoadBalancing=0;
138 void *centralLb;
139 void (*resumeLbFnPtr)(void *);
140 int _receiveMlogLocationHandlerIdx;
141 int _receiveMigrationNoticeHandlerIdx;
142 int _receiveMigrationNoticeAckHandlerIdx;
143 int _checkpointBarrierHandlerIdx;
144 int _checkpointBarrierAckHandlerIdx;
145
146 CkVec<MigrationRecord> migratedNoticeList;
147 CkVec<RetainedMigratedObject *> retainedObjectList;
148 int donotCountMigration=0;
149 int countLBMigratedAway=0;
150 int countLBToMigrate=0;
151 int migrationDoneCalled=0;
152 int checkpointBarrierCount=0;
153 int globalResumeCount=0;
154 CkGroupID globalLBID;
155 int restartDecisionNumber=-1;
156
157
158 double lastCompletedAlarm=0;
159 double lastRestart=0;
160
161
162 //update location globals
163 int _receiveLocationHandlerIdx;
164
165
166
167 // initialize message logging datastructures and register handlers
168 void _messageLoggingInit(){
169         //current object
170         CpvInitialize(Chare *,_currentObj);
171         
172         //registering handlers for message logging
173         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
174         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
175         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
176         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
177         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
178         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
179         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
180   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
181         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
182
183                 
184         //handlers for checkpointing
185         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
186         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
187         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
188         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
189
190
191         //handlers for restart
192         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
193         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
194         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
195         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
196         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
197         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
198         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
199         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
200         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
201         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
202         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
203
204         
205         //handlers for load balancing
206         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
207         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
208         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
209         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
210         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
211         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
212         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
213         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
214         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
215
216         
217         //handlers for updating locations
218         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
219         
220         //Cpv variables for message logging
221         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
222         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
223         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
224         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
225         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
226         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
227         CpvInitialize(Queue, _outOfOrderMessageQueue);
228         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
229         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
230         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
231         
232         CpvInitialize(char **,_bufferedTicketRequests);
233         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
234         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
235         for(int i=0;i<CkNumPes();i++){
236                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
237                 CpvAccess(_numBufferedTicketRequests)[i]=0;
238         }
239   CpvInitialize(char *,_bufferTicketReply);
240         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
241         
242 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
243         CcdCallFnAfter(retryTicketRequest,NULL,100);    
244         
245         
246         //Cpv variables for checkpoint
247         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
248         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
249         
250 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
251 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
252 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
253         if(CkMyPe() == 0){
254 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
255 #ifdef  BUFFERED_LOCAL
256                 if(CmiMyPe() == 0){
257                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
258                 }
259 #endif
260         }
261 #ifdef  BUFFERED_REMOTE
262         if(CmiMyPe() == 0){
263                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
264         }
265 #endif
266
267         traceRegisterUserEvent("Remove Logs", 20);
268         traceRegisterUserEvent("Ticket Request Handler", 21);
269         traceRegisterUserEvent("Ticket Handler", 22);
270         traceRegisterUserEvent("Local Message Copy Handler", 23);
271         traceRegisterUserEvent("Local Message Ack Handler", 24);        
272         traceRegisterUserEvent("Preprocess current message",25);
273         traceRegisterUserEvent("Preprocess past message",26);
274         traceRegisterUserEvent("Preprocess future message",27);
275         traceRegisterUserEvent("Checkpoint",28);
276         traceRegisterUserEvent("Checkpoint Store",29);
277         traceRegisterUserEvent("Checkpoint Ack",30);
278         
279         traceRegisterUserEvent("Send Ticket Request",31);
280         traceRegisterUserEvent("Generalticketrequest1",32);
281         traceRegisterUserEvent("TicketLogLocal",33);
282         traceRegisterUserEvent("next_ticket and SN",34);
283         traceRegisterUserEvent("Timeout for buffered remote messages",35);
284         traceRegisterUserEvent("Timeout for buffered local messages",36);
285         traceRegisterUserEvent("Inform Location Home",37);
286         traceRegisterUserEvent("Receive Location Handler",38);
287         
288         lastCompletedAlarm=CmiWallTimer();
289         lastRestart = CmiWallTimer();
290 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
291         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
292 //      printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
293         if(killFlag){
294                 readKillFile();
295         }
296 }
297
298 void killLocal(void *_dummy,double curWallTime);        
299
300 void readKillFile(){
301         FILE *fp=fopen(killFile,"r");
302         if(!fp){
303                 return;
304         }
305         int proc;
306         double sec;
307         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
308                 if(proc == CkMyPe()){
309                         killTime = CmiWallTimer()+sec;
310                         printf("[%d] To be killed after %.6lf s \n",CkMyPe(),sec);
311                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
312                 }
313         }
314         fclose(fp);
315 }
316
317 void killLocal(void *_dummy,double curWallTime){
318         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
319         if(CmiWallTimer()<killTime-1){
320                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
321         }else{  
322                 kill(getpid(),SIGKILL);
323         }
324 }
325
326
327
328 /************************ Message logging methods ****************/
329
330 // send a ticket request to a group on a processor
331 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
332         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
333                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
334                 void *origMsg = EnvToUsr(env);
335                 for(int i=0;i<CmiNumPes();i++){
336                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
337                                 void *copyMsg = CkCopyMsg(&origMsg);
338                                 envelope *copyEnv = UsrToEnv(copyMsg);
339                                 copyEnv->SN=0;
340                                 copyEnv->TN=0;
341                                 copyEnv->sender.type = TypeInvalid;
342                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
343                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
344                         }
345                 }
346                 return;
347         }
348         CkObjID recver;
349         recver.type = TypeGroup;
350         recver.data.group.id = env->getGroupNum();
351         recver.data.group.onPE = destPE;
352 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
353                 CmiPrintStackTrace(0);
354         }*/
355         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
356 }
357
358 //send a ticket request to a nodegroup
359 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
360         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
361                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
362                 void *origMsg = EnvToUsr(env);
363                 for(int i=0;i<CmiNumNodes();i++){
364                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
365                                 void *copyMsg = CkCopyMsg(&origMsg);
366                                 envelope *copyEnv = UsrToEnv(copyMsg);
367                                 copyEnv->SN=0;
368                                 copyEnv->TN=0;
369                                 copyEnv->sender.type = TypeInvalid;
370                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
371                         }
372                 }
373                 return;
374         }
375         CkObjID recver;
376         recver.type = TypeNodeGroup;
377         recver.data.group.id = env->getGroupNum();
378         recver.data.group.onPE = destNode;
379         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
380 }
381
382 //send a ticket request to an array element
383 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
384         CkObjID recver;
385         recver.type = TypeArray;
386         recver.data.array.id = env->getsetArrayMgr();
387         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
388
389         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
390                 char recverString[100],senderString[100];
391                 
392                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
393         }
394
395         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
396 };
397
398 //a method to generate the actual ticket requests for groups,nodegroups or arrays
399 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
400         envelope *env = _env;
401         int resend=0; //is it a resend
402         char recverName[100];
403         double _startTime=CkWallTimer();
404         
405         if(CpvAccess(_currentObj) == NULL){
406 //              CkAssert(0);
407                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
408                 generalCldEnqueue(destPE,env,_infoIdx);
409                 return;
410         }
411         
412         if(env->sender.type == TypeInvalid){
413                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
414                 //Set message logging data in the envelope
415         }else{
416                 envelope *copyEnv = copyEnvelope(env);
417                 env = copyEnv;
418                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
419                 env->SN = 0;
420         }
421         
422         
423         CkObjID &sender = env->sender;
424         env->recver = recver;
425
426         Chare *obj = (Chare *)env->sender.getObject();
427           
428         if(env->SN == 0){
429                 env->SN = obj->mlogData->nextSN(recver);
430         }else{
431                 resend = 1;
432         }
433         
434         
435         char senderString[100];
436 //      if(env->SN != 1){
437                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
438         //      CmiPrintStackTrace(0);
439 /*      }else{
440                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
441         }*/
442                 
443         MlogEntry * mEntry = new MlogEntry(env,destPE,_infoIdx);
444 //      CkPackMessage(&(mEntry->env));
445 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
446         
447         
448
449         _startTime = CkWallTimer();
450         if(destPE == CkMyPe()){
451                 ticketLogLocalMessage(mEntry);
452         }else{
453                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
454 //              traceUserBracketEvent(31,_startTime,CkWallTimer());                     
455         }
456 }
457
458 //method that does the actual send by creating a ticketrequest filling it up and sending it
459 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
460         char recverString[100],senderString[100];
461         envelope *env = entry->env;
462         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
463 /*      envelope *env = entry->env;
464         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
465
466         Chare *obj = (Chare *)entry->env->sender.getObject();
467         if(!resend){
468                 obj->mlogData->addLogEntry(entry);
469         }
470         
471
472 #ifdef BUFFERED_REMOTE
473         //buffer the ticket request 
474         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
475                 //first message to this processor, buffer needs to be created
476                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
477                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
478                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
479         }
480         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
481         //Buffer the ticketrequests
482         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
483         ticketRequest->sender = sender;
484         ticketRequest->recver = recver;
485         ticketRequest->logEntry = entry;
486         ticketRequest->SN = SN;
487         ticketRequest->senderPE = CkMyPe();
488
489         CpvAccess(_numBufferedTicketRequests)[destPE]++;
490         
491         
492         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
493                 sendBufferedTicketRequests(destPE);
494         }else{
495                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
496                         int *checkPE = new int;
497                         *checkPE = destPE;
498                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
499                 }
500         }
501 #else
502
503         TicketRequest ticketRequest;
504         ticketRequest.sender = sender;
505         ticketRequest.recver = recver;
506         ticketRequest.logEntry = entry;
507         ticketRequest.SN = SN;
508         ticketRequest.senderPE = CkMyPe();
509         
510         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
511 //      CmiBecomeImmediate(&ticketRequest);
512         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
513 #endif
514         DEBUG(CmiMemoryCheck());
515 };
516
517 /**
518  * Send the ticket requests buffered for processor PE
519  **/
520 void sendBufferedTicketRequests(int destPE){
521         DEBUG(CmiMemoryCheck());
522         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
523         if(numberRequests == 0){
524                 return;
525         }
526         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
527         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
528         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
529         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
530         header->numberLogs = numberRequests;
531         
532         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
533         CmiSyncSend(destPE,totalSize,(char *)buf);
534         
535         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
536         DEBUG(CmiMemoryCheck());
537 };
538
539 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
540         int destPE = *(int *)_destPE;
541   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
542                 sendBufferedTicketRequests(destPE);
543 //              traceUserEvent(35);
544         }
545         delete (int *)_destPE;
546         DEBUG(CmiMemoryCheck());
547 };
548
549
550
551
552 //get a ticket for a local message and then send a copy to the buddy
553 void ticketLogLocalMessage(MlogEntry *entry){
554         //this method is always in the main thread(not interrupt).. so it should 
555         //never find itself locked out of a newTicket
556 //      CkAssert(_lockNewTicket==0);
557         double _startTime=CkWallTimer();
558         
559 //      _lockNewTicket=1;
560         
561                 DEBUG(CmiMemoryCheck());
562         
563
564         Chare *recverObj = (Chare *)entry->env->recver.getObject();
565         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
566         if(recverObj){
567                 //Consider the case, after a restart when this message has already been allotted a ticket number
568                 // and should get the same one as the old one.
569                 Ticket ticket;
570                 if(recverObj->mlogData->mapTable.numObjects() > 0){
571                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
572                 }else{
573                         ticket.TN = 0;
574                 }
575                 
576                 char senderString[100], recverString[100] ;
577                 
578                 if(ticket.TN == 0){
579                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
580         
581                         if(ticket.TN == 0){
582                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
583                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
584                                 
585         //              _lockNewTicket = 0;
586 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
587                                 return;
588                         }
589                 }       
590                 //TODO: check for the case when an invalid ticket is returned
591                 //TODO: check for OLD or RECEIVED TICKETS
592                 entry->env->TN = ticket.TN;
593                 CkAssert(entry->env->TN > 0);
594                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
595                 
596
597                 sendLocalMessageCopy(entry);
598                 
599                 
600                 DEBUG(CmiMemoryCheck());
601                 entry->unackedLocal = 1;
602                 DEBUG(CmiMemoryCheck());
603                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
604                 DEBUG(CmiMemoryCheck());
605         }else{
606                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
607         }
608         _lockNewTicket=0;
609 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
610 };
611
612
613 void sendLocalMessageCopy(MlogEntry *entry){
614         LocalMessageLog msgLog;
615         msgLog.sender = entry->env->sender;
616         msgLog.recver = entry->env->recver;
617         msgLog.SN = entry->env->SN;
618         msgLog.TN = entry->env->TN;
619         msgLog.entry = entry;
620         msgLog.PE = CkMyPe();
621         
622         char recvString[100];
623         char senderString[100];
624         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
625
626 #ifdef BUFFERED_LOCAL
627         countLocal++;
628         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
629         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
630                 sendBufferedLocalMessageCopy();
631         }else{
632                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
633                         lastBufferedLocalMessageCopyTime = CkWallTimer();
634                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
635                         countClearBufferedLocalCalls++;
636                 }       
637         }
638 #else   
639         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
640         
641         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
642 #endif
643         DEBUG(CmiMemoryCheck());
644 };
645
646
647 void sendBufferedLocalMessageCopy(){
648         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
649         if(numberLogs == 0){
650                 return;
651         }
652         countBuffered++;
653         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
654         void *buf=CmiAlloc(totalSize);
655         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
656         header->numberLogs=numberLogs;
657
658         DEBUG(CmiMemoryCheck());
659         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
660         
661         char *ptr = (char *)buf;
662         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
663         
664         for(int i=0;i<numberLogs;i++){
665                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
666                 memcpy(ptr,&log,sizeof(LocalMessageLog));
667                 ptr = &ptr[sizeof(LocalMessageLog)];
668         }
669
670         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
671
672         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
673         DEBUG(CmiMemoryCheck());
674 };
675
676 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
677         countClearBufferedLocalCalls--;
678         if(countClearBufferedLocalCalls > 10){
679                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
680         }
681         DEBUG(CmiMemoryCheck());
682         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
683         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
684                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
685                         sendBufferedLocalMessageCopy();
686 //                      traceUserEvent(36);
687                 }
688         }
689         DEBUG(CmiMemoryCheck());
690 }
691
692 /****
693         The handler functions
694 *****/
695
696
697 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
698 /**
699  *  If there are any delayed requests, process them first before 
700  *  processing this request
701  * */
702 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
703         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
704         double  _startTime = CkWallTimer();
705         if(CpvAccess(_delayedTicketRequests)->length() > 0){
706                 retryTicketRequest(NULL,_startTime);
707         }
708         _processTicketRequest(ticketRequest);
709         CmiFree(ticketRequest);
710 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
711 }
712 /** Handler used for dealing with a bunch of ticket requests
713  * from one processor. The replies are also bunched together
714  * Does not use _ticketRequestHandler
715  * */
716 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
717         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
718         DEBUG(CmiMemoryCheck());
719         double _startTime = CkWallTimer();
720         if(CpvAccess(_delayedTicketRequests)->length() > 0){
721                 retryTicketRequest(NULL,_startTime);
722         }
723         DEBUG(CmiMemoryCheck());
724   int numRequests = recvdHeader->numberLogs;
725         char *msg = (char *)recvdHeader;
726         msg = &msg[sizeof(BufferedTicketRequestHeader)];
727         int senderPE=((TicketRequest *)msg)->senderPE;
728
729         
730         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
731         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
732         
733         char *ptr = (char *)buf;
734         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
735         header->numberLogs = 0;
736
737         DEBUG(CmiMemoryCheck());
738         
739         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
740         
741         for(int i=0;i<numRequests;i++){
742                 TicketRequest *request = (TicketRequest *)msg;
743                 msg = &msg[sizeof(TicketRequest)];
744                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
745
746                 if(replied){
747                         //the ticket request has been processed and 
748                         //the reply will be stored in the ptr
749                         header->numberLogs++;
750                         ptr = &ptr[sizeof(TicketReply)];
751                 }
752         }
753 /*      if(header->numberLogs == 0){
754                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
755         }*/
756
757         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
758         CmiSyncSend(senderPE,totalSize,(char *)buf);
759         CmiFree(recvdHeader);
760 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
761         DEBUG(CmiMemoryCheck());
762 };
763
764 /**Process the ticket request. 
765  * If it is processed and a reply is being sent 
766  * by this processor return true
767  * else return false.
768  * If a reply buffer is specified put the reply into that
769  * else send the reply
770  * */
771 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
772
773 /*      if(_lockNewTicket){
774                 printf("ddeded %d\n",CkMyPe());
775                 if(CmiIsImmediate(ticketRequest)){
776                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
777                 }
778                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
779                 
780         }else{
781                 _lockNewTicket = 1;
782         }*/
783
784         DEBUG(CmiMemoryCheck());
785         CkObjID sender = ticketRequest->sender;
786         CkObjID recver = ticketRequest->recver;
787         MCount SN = ticketRequest->SN;
788         
789         
790         Chare *recverObj = (Chare *)recver.getObject();
791         
792         
793         char recverName[100];
794         DEBUG(recver.toString(recverName);)
795         if(recverObj == NULL){
796                 int estPE = recver.guessPE();
797                 if(estPE == CkMyPe() || estPE == -1){           
798                         //try to fulfill the request after some time
799                         char senderString[100];
800                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
801                         if(estPE == CkMyPe() && recver.type == TypeArray){
802                                 CkArrayID aid(recver.data.array.id);            
803                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
804                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
805
806                         }
807                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
808                         *delayed = *ticketRequest;
809                         CpvAccess(_delayedTicketRequests)->enq(delayed);
810                         
811                 }else{
812                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
813                         TicketRequest forward = *ticketRequest;
814                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
815                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
816                         
817                         
818                 }
819         DEBUG(CmiMemoryCheck());
820                 return false; // if the receverObj does not exist the ticket request cannot have been 
821                               // processed successfully
822         }else{
823                 char senderString[100];
824                 Ticket ticket;
825                 //check if a ticket for this has been already handed out to an object that used to be local but 
826                 // is no longer so.. need for parallel restart
827                 
828                 if(recverObj->mlogData->mapTable.numObjects() > 0){
829                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
830                 }
831                 
832                 if(ticket.TN == 0){
833                         ticket = recverObj->mlogData->next_ticket(sender,SN);
834                 }
835                 if(ticket.TN > recverObj->mlogData->tProcessed){
836                         ticket.state = NEW_TICKET;
837                 }else{
838                         ticket.state = OLD_TICKET;
839                 }
840                 //TODO: check for the case when an invalid ticket is returned
841                 if(ticket.TN == 0){
842                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
843                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
844                         *delayed = *ticketRequest;
845                         CpvAccess(_delayedTicketRequests)->enq(delayed);
846                         return false;
847                 }
848 /*              if(ticket.TN < SN){ //error state this really should not happen
849                         recver.toString(recverName);
850                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
851                 }*/
852 //              CkAssert(ticket.TN >= SN);
853                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
854
855 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
856     if(reply == NULL){ 
857                         //There is no reply buffer and the ticketreply is going to be 
858                         //sent immediately
859                         TicketReply ticketReply;
860                         ticketReply.request = *ticketRequest;
861                         ticketReply.ticket = ticket;
862                         ticketReply.recverPE = CkMyPe();
863                 
864                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
865 //              CmiBecomeImmediate(&ticketReply);
866                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
867          }else{ // Store ticket reply in the buffer provided
868                  reply->request = *ticketRequest;
869                  reply->ticket = ticket;
870                  reply->recverPE = CkMyPe();
871                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
872                                                          // in case the ticket needs to be forwarded or something
873
874          }
875                 DEBUG(CmiMemoryCheck());
876                 return true;
877         }
878 //      _lockNewTicket=0;
879 };
880
881 inline void _ticketHandler(TicketReply *ticketReply){
882
883         double _startTime = CkWallTimer();
884         DEBUG(CmiMemoryCheck());
885         
886         
887         char senderString[100];
888         CkObjID sender = ticketReply->request.sender;
889         CkObjID recver = ticketReply->request.recver;
890         
891         if(sender.guessPE() != CkMyPe()){               
892                 DEBUG(CkAssert(sender.guessPE()>= 0));
893                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
894         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
895                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
896                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
897 #ifdef BUFFERED_REMOTE
898                 //will be freed by the buffered ticket handler most of the time
899                 //this might lead to a leak just after migration
900                 //when the ticketHandler is directly used without going through the buffered handler
901                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
902 #else
903                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
904 #endif  
905         }else{
906                 char recverName[100];
907                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
908                 MlogEntry *logEntry=NULL;
909                 if(ticketReply->ticket.state & FORWARDED_TICKET){
910                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
911                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
912                         Chare *senderObj = (Chare *)sender.getObject();
913                         if(senderObj){
914                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
915                                 for(int i=0;i<mlog->length();i++){
916                                         MlogEntry *tempEntry = (*mlog)[i];
917                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
918                                                 logEntry = tempEntry;
919                                                 break;
920                                         }
921                                 }
922                                 if(logEntry == NULL){
923 #ifdef BUFFERED_REMOTE
924 #else
925                                         CmiFree(ticketReply);
926 #endif                                  
927                                         return;
928                                 }
929                         }else{
930                                 CmiAbort("This processor thinks it should have the sender\n");
931                         }
932                         ticketReply->ticket.state ^= FORWARDED_TICKET;
933                 }else{
934                         logEntry = ticketReply->request.logEntry;
935                 }
936                 if(logEntry->env->TN <= 0){
937                         //This logEntry has not received a TN earlier
938                         char recverString[100];
939                         logEntry->env->TN = ticketReply->ticket.TN;
940                         logEntry->env->setSrcPe(CkMyPe());
941                         if(ticketReply->ticket.state == NEW_TICKET){
942                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
943                                 if(ticketReply->recverPE != CkMyPe()){
944                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
945                                 }else{
946                                         //It is now a local message use the local message protocol
947                                         sendLocalMessageCopy(logEntry);
948                                 }               
949                         }
950                 }else{
951                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
952                 }
953                 recver.updatePosition(ticketReply->recverPE);
954 #ifdef BUFFERED_REMOTE
955 #else
956                 CmiFree(ticketReply);
957 #endif
958         }
959         CmiMemoryCheck();
960 #ifdef BUFFERED_REMOTE
961 #else
962 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
963 #endif
964         
965 };
966
967 /**
968  * Message to handle the bunch of tickets 
969  * that we get from one processor. We send 
970  * the tickets to be handled one at a time
971  * */
972
973 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
974         double _startTime=CmiWallTimer();
975         int numTickets = recvdHeader->numberLogs;
976         char *msg = (char *)recvdHeader;
977         msg = &msg[sizeof(BufferedTicketRequestHeader)];
978         DEBUG(CmiMemoryCheck());
979         
980         TicketReply *_reply = (TicketReply *)msg;
981
982         
983         for(int i=0;i<numTickets;i++){
984                 TicketReply *reply = (TicketReply *)msg;
985                 _ticketHandler(reply);
986                 
987                 msg = &msg[sizeof(TicketReply)];
988         }
989         
990         CmiFree(recvdHeader);
991 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
992         DEBUG(CmiMemoryCheck());
993 };
994
995
996 void _localMessageCopyHandler(LocalMessageLog *msgLog){
997         double _startTime = CkWallTimer();
998         
999         char senderString[100],recverString[100];
1000         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1001 /*      if(!fault_aware(msgLog->recver)){
1002                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1003         }*/
1004         CpvAccess(_localMessageLog)->enq(*msgLog);
1005         
1006         LocalMessageLogAck ack;
1007         ack.entry = msgLog->entry;
1008         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1009         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1010         CmiSyncSend(msgLog->PE,sizeof(LocalMessageLogAck),(char *)&ack);
1011         
1012 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1013 };
1014
1015 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1016         double _startTime = CkWallTimer();
1017         DEBUG(CmiMemoryCheck());
1018         
1019         int numLogs = recvdHeader->numberLogs;
1020         char *msg = (char *)recvdHeader;
1021
1022         //piggy back the logs already stored on this processor
1023         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1024         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1025 /*      if(numPiggyLogs > 0){
1026                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1027                         CmiAssert(0);
1028                 }
1029         }*/
1030         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1031         
1032         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1033         void *buf = CmiAlloc(totalSize);
1034         char *ptr = (char *)buf;
1035         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1036         
1037         msg = &msg[sizeof(BufferedLocalLogHeader)];
1038         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1039
1040         DEBUG(CmiMemoryCheck());
1041         int PE;
1042         for(int i=0;i<numLogs;i++){
1043                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1044                 CpvAccess(_localMessageLog)->enq(*msgLog);
1045                 PE = msgLog->PE;
1046                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1047
1048                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1049                 ack->entry = msgLog->entry;
1050                 
1051                 msg = &msg[sizeof(LocalMessageLog)];
1052                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1053         }
1054         DEBUG(CmiMemoryCheck());
1055
1056         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1057         piggyHeader->numberLogs = numPiggyLogs;
1058         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1059         if(numPiggyLogs > 0){
1060                 countPiggy++;
1061         }
1062
1063         for(int i=0;i<numPiggyLogs;i++){
1064                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1065                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1066                 ptr = &ptr[sizeof(LocalMessageLog)];
1067         }
1068         DEBUG(CmiMemoryCheck());
1069         
1070         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1071         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1072                 
1073 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1074                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1075                         if(!fault_aware(localLogEntry.recver)){
1076                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1077                         }
1078         }*/
1079         if(freeHeader){
1080                 CmiFree(recvdHeader);
1081         }
1082         DEBUG(CmiMemoryCheck());
1083 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1084 }
1085
1086
1087 void _localMessageAckHandler(LocalMessageLogAck *ack){
1088         
1089         double _startTime = CkWallTimer();
1090         
1091         MlogEntry *entry = ack->entry;
1092         if(entry == NULL){
1093                 CkExit();
1094         }
1095         entry->unackedLocal = 0;
1096         envelope *env = entry->env;
1097         char recverName[100];
1098         char senderString[100];
1099         DEBUG(CmiMemoryCheck());
1100         
1101         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1102         if(env == NULL)
1103                 return;
1104         CkAssert(env->SN > 0);
1105         CkAssert(env->TN > 0);
1106         env->sender.toString(senderString);
1107         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1108         env->recver.toString(recverName);
1109
1110         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1111         
1112 /*      
1113         void *origMsg = EnvToUsr(env);
1114         void *copyMsg = CkCopyMsg(&origMsg);
1115         envelope *copyEnv = UsrToEnv(copyMsg);
1116         entry->env = UsrToEnv(origMsg);*/
1117
1118 //      envelope *copyEnv = copyEnvelope(env);
1119
1120         envelope *copyEnv = env;
1121         copyEnv->localMlogEntry = entry;
1122
1123         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1124         if(CmiMyPe() != entry->destPE){
1125                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1126         }
1127 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1128         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1129         
1130         
1131 #ifdef BUFFERED_LOCAL
1132 #else
1133         CmiFree(ack);
1134 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1135 #endif
1136         
1137         DEBUG(CmiMemoryCheck());
1138         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1139 }
1140
1141
1142 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1143
1144         double _startTime=CkWallTimer();
1145         DEBUG(CmiMemoryCheck());
1146
1147         int numLogs = recvdHeader->numberLogs;
1148         char *msg = (char *)recvdHeader;
1149         msg = &msg[sizeof(BufferedLocalLogHeader)];
1150
1151         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1152         
1153         for(int i=0;i<numLogs;i++){
1154                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1155                 _localMessageAckHandler(ack);
1156                 
1157                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1158         }
1159
1160         //deal with piggy backed local logs
1161         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1162         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1163         if(piggyHeader->numberLogs > 0){
1164                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1165         }
1166         
1167         CmiFree(recvdHeader);
1168         DEBUG(CmiMemoryCheck());
1169 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1170 }
1171
1172
1173
1174
1175
1176
1177 bool fault_aware(CkObjID &recver){
1178         switch(recver.type){
1179                 case TypeChare:
1180                         return false;
1181                 case TypeGroup:
1182                 case TypeNodeGroup:
1183                 case TypeArray:
1184                         return true;
1185                 default:
1186                         return false;
1187         }
1188 };
1189
1190 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1191         char recverString[100];
1192         char senderString[100];
1193         
1194         DEBUG(CmiMemoryCheck());
1195         CkObjID recver = env->recver;
1196         if(!fault_aware(recver))
1197                 return 1;
1198
1199
1200         Chare *obj = (Chare *)recver.getObject();
1201         *objPointer = obj;
1202         if(obj == NULL){
1203                 int possiblePE = recver.guessPE();
1204                 if(possiblePE != CkMyPe()){
1205                         int totalSize = env->getTotalsize();                    
1206                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1207                 }
1208                 return 0;
1209         }
1210
1211
1212         double _startTime = CkWallTimer();
1213 //env->sender.updatePosition(env->getSrcPe());
1214         if(env->TN == obj->mlogData->tProcessed+1){
1215                 //the message that needs to be processed now
1216                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1217                 // once we find a message that we can process we put back all the messages in the out of order queue
1218                 // back into the main scheduler queue. 
1219                 if(env->sender.guessPE() == CkMyPe()){
1220                         *logEntryPointer = env->localMlogEntry;
1221                 }
1222         DEBUG(CmiMemoryCheck());
1223                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1224                         void *qMsgPtr;
1225                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1226                         envelope *qEnv = (envelope *)qMsgPtr;
1227                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1228         DEBUG(CmiMemoryCheck());
1229                 }
1230 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1231                 //TODO: this might be a problem.. change made for leanMD
1232 //              CpvAccess(_currentObj) = obj;
1233         DEBUG(CmiMemoryCheck());
1234                 return 1;
1235         }
1236         if(env->TN <= obj->mlogData->tProcessed){
1237                 //message already processed
1238                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1239 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1240         DEBUG(CmiMemoryCheck());
1241                 return 0;
1242         }
1243         //message that needs to be processed in the future
1244
1245 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1246         //the message cant be processed now put it back in the out of order message Q, 
1247         //It will be transferred to the main queue later
1248         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1249 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1250         DEBUG(CmiMemoryCheck());
1251         
1252         return 0;
1253 }
1254
1255 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1256         char senderString[100];
1257         if(obj){
1258                 if(sender.guessPE() == CkMyPe()){
1259                         if(entry != NULL){
1260                                 entry->env = NULL;
1261                         }
1262                 }
1263                 obj->mlogData->tProcessed++;
1264 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1265                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1266 //              CpvAccess(_currentObj)= NULL;
1267         }
1268         DEBUG(CmiMemoryCheck());
1269 }
1270
1271 /***
1272         Helpers for the handlers and message logging methods
1273 ***/
1274
1275 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1276 //      double _startTime = CkWallTimer();
1277         if(env->recver.type != TypeNodeGroup){
1278         //This repeats a step performed in skipCldEnq for messages sent to
1279         //other processors. I do this here so that messages to local processors
1280         //undergo the same transformation.. It lets the resend be uniform for 
1281         //all messages
1282 //              CmiSetXHandler(env,CmiGetHandler(env));
1283                 _skipCldEnqueue(destPE,env,_infoIdx);
1284         }else{
1285                 _noCldNodeEnqueue(destPE,env);
1286         }
1287 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1288 }
1289 //extern "C" int CmiGetNonLocalLength();
1290 /** This method is used to retry the ticket requests
1291  * that had been queued up earlier
1292  * */
1293
1294 int calledRetryTicketRequest=0;
1295
1296 void retryTicketRequestTimer(void *_dummy,double _time){
1297                 calledRetryTicketRequest=0;
1298                 retryTicketRequest(_dummy,_time);
1299 }
1300
1301 void retryTicketRequest(void *_dummy,double curWallTime){       
1302         double start = CkWallTimer();
1303         DEBUG(CmiMemoryCheck());
1304         int length = CpvAccess(_delayedTicketRequests)->length();
1305         for(int i=0;i<length;i++){
1306                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1307                 if(ticketRequest){
1308                         char senderString[100],recverString[100];
1309                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1310                         DEBUG(CmiMemoryCheck());
1311                         _processTicketRequest(ticketRequest);
1312                   CmiFree(ticketRequest);
1313                         DEBUG(CmiMemoryCheck());
1314                 }       
1315         }       
1316         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1317                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1318                 ticketLogLocalMessage(entry);
1319         }
1320         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1321 //      int converse_qLength = CmiGetNonLocalLength();
1322         
1323 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1324
1325 /*      PingMsg pingMsg;
1326         pingMsg.PE = CkMyPe();
1327         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1328         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1329                 for(int i=0;i<CkNumPes();i++){
1330                         if(i != CkMyPe()){
1331                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1332                         }
1333                 }
1334         }*/     
1335         //TODO: change this back to 100
1336         if(calledRetryTicketRequest == 0){
1337                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1338                 calledRetryTicketRequest =1;
1339         }
1340         DEBUG(CmiMemoryCheck());
1341 }
1342
1343 void _pingHandler(PingMsg *msg){
1344         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1345         CmiFree(msg);
1346 }
1347
1348
1349 /*****************************************************************************
1350         Checkpointing methods..
1351                 Pack all the data on a processor and send it to the buddy periodically
1352                 Also used to throw away message logs
1353 *****************************************************************************/
1354 CkVec<TProcessedLog> processedTicketLog;
1355 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1356 void clearUpMigratedRetainedLists(int PE);
1357
1358 void checkpointAlarm(void *_dummy,double curWallTime){
1359         double diff = curWallTime-lastCompletedAlarm;
1360         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1361 /*      if(CkWallTimer()-lastRestart < 50){
1362                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1363                 return;
1364         }*/
1365         if(diff < ((chkptPeriod) - 2)){
1366                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1367                 return;
1368         }
1369         CheckpointRequest request;
1370         request.PE = CkMyPe();
1371         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1372         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1373 };
1374
1375 void _checkpointRequestHandler(CheckpointRequest *request){
1376         startMlogCheckpoint(NULL,CmiWallTimer());       
1377 }
1378
1379 void startMlogCheckpoint(void *_dummy,double curWallTime){
1380         double _startTime = CkWallTimer();
1381         checkpointCount++;
1382 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1383                 kill(getpid(),SIGKILL);
1384         }*/
1385         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1386                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1387         }
1388         PUP::sizer psizer;
1389         DEBUG(CmiMemoryCheck());
1390
1391         psizer | checkpointCount;
1392         
1393         CkPupROData(psizer);
1394         DEBUG(CmiMemoryCheck());
1395         CkPupGroupData(psizer);
1396         DEBUG(CmiMemoryCheck());
1397         CkPupNodeGroupData(psizer);
1398         DEBUG(CmiMemoryCheck());
1399         pupArrayElementsSkip(psizer,NULL);
1400         DEBUG(CmiMemoryCheck());
1401
1402         int dataSize = psizer.size();
1403         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1404         char *msg = (char *)CmiAlloc(totalSize);
1405         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1406         chkMsg->PE = CkMyPe();
1407         chkMsg->dataSize = dataSize;
1408
1409         
1410         char *buf = &msg[sizeof(CheckPointDataMsg)];
1411         PUP::toMem pBuf(buf);   
1412
1413         pBuf | checkpointCount;
1414         
1415         CkPupROData(pBuf);
1416         CkPupGroupData(pBuf);
1417         CkPupNodeGroupData(pBuf);
1418         pupArrayElementsSkip(pBuf,NULL);
1419
1420         unAckedCheckpoint=1;
1421         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1422         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1423         
1424         /*
1425                 Store the highest Ticket number processed for each chare on this processor
1426         */
1427         processedTicketLog.removeAll();
1428         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1429         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1430                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1431         }
1432
1433         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1434                 lastCompletedAlarm = curWallTime;
1435                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1436         }
1437         traceUserBracketEvent(28,_startTime,CkWallTimer());
1438 };
1439
1440 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1441         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1442         TProcessedLog logEntry;
1443         logEntry.recver = mlogData->objID;
1444         logEntry.tProcessed = mlogData->tProcessed;
1445         log->push_back(logEntry);
1446         char objString[100];
1447         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1448 }
1449
1450 class ElementPacker : public CkLocIterator {
1451 private:
1452         CkLocMgr *locMgr;
1453         PUP::er &p;
1454 public:
1455                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1456                 void addLocation(CkLocation &loc) {
1457                         CkArrayIndexMax idx=loc.getIndex();
1458                         CkGroupID gID = locMgr->ckGetGroupID();
1459                         p|gID;      // store loc mgr's GID as well for easier restore
1460                         p|idx;
1461                         p|loc;
1462     }
1463 };
1464
1465
1466 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1467         int numElements,i;
1468   int numGroups = CkpvAccess(_groupIDTable)->size();    
1469         if(!p.isUnpacking()){
1470                 numElements = CkCountArrayElements();
1471         }       
1472         p | numElements;
1473         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1474         if(!p.isUnpacking()){
1475                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1476         }else{
1477                 //Flush all recs of all LocMgrs before putting in new elements
1478 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1479                 for(int j=0;j<listsize;j++){
1480                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1481                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1482                                 listToSkip[j].idx.print();
1483                         }
1484                 }
1485                         
1486           for (int i=0; i<numElements; i++) {
1487                         CkGroupID gID;
1488                         CkArrayIndexMax idx;
1489                         p|gID;
1490             p|idx;
1491                         int flag=0;
1492                         int matchedIdx=0;
1493                         for(int j=0;j<listsize;j++){
1494                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1495                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1496                                                 matchedIdx = j;
1497                                                 flag = 1;
1498                                                 break;
1499                                         }
1500                                 }
1501                         }
1502                         if(flag == 1){
1503                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1504                         }else{
1505                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1506                         }
1507                                 
1508                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1509                         mgr->resume(idx,p,flag);
1510                         if(flag == 1){
1511                                 int homePE = mgr->homePe(idx);
1512                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1513                         }
1514           }
1515         }
1516 };
1517
1518
1519 void writeCheckpointToDisk(int size,char *chkpt){
1520         char fNameTemp[100];
1521         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1522         int fd = creat(fNameTemp,S_IRWXU);
1523         int ret = write(fd,chkpt,size);
1524         CkAssert(ret == size);
1525         close(fd);
1526         
1527         char fName[100];
1528         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1529         unlink(fName);
1530
1531         rename(fNameTemp,fName);
1532         
1533 }
1534
1535 //handler that receives the checkpoint from a processor
1536 //it stores it and acks it
1537 void _storeCheckpointHandler(char *msg){
1538         
1539         double _startTime=CkWallTimer();
1540                 
1541         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1542         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1543         
1544         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1545         
1546         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1547         if(oldChkpt != NULL){
1548                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1549                 CmiFree(oldmsg);
1550         }
1551         //turning off storing checkpoints
1552         
1553         int sendingPE = chkMsg->PE;
1554         
1555         CpvAccess(_storedCheckpointData)->buf = chkpt;
1556         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1557         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1558
1559 #ifdef CHECKPOINT_DISK
1560         //store the checkpoint on disk
1561         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1562         CpvAccess(_storedCheckpointData)->buf = NULL;
1563         CmiFree(msg);
1564 #endif
1565
1566         int count=0;
1567         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1568                 if(migratedNoticeList[j].fromPE == sendingPE){
1569                         migratedNoticeList[j].ackFrom = 1;
1570                 }else{
1571                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1572                 }
1573                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1574                         migratedNoticeList.remove(j);
1575                         count++;
1576                 }
1577                 
1578         }
1579         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1580         
1581         CheckPointAck ackMsg;
1582         ackMsg.PE = CkMyPe();
1583         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1584         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1585         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1586         
1587         
1588         
1589         traceUserBracketEvent(29,_startTime,CkWallTimer());
1590 };
1591
1592
1593 void sendRemoveLogRequests(){
1594         double _startTime = CkWallTimer();      
1595         //send out the messages asking senders to throw away message logs below a certain ticket number
1596         /*
1597                 The remove log request message looks like
1598                 |RemoveLogRequest||List of TProcessedLog|
1599         */
1600         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1601         char *requestMsg = (char *)CmiAlloc(totalSize);
1602         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1603         request->PE = CkMyPe();
1604         request->numberObjects = processedTicketLog.size();
1605         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1606         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1607         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1608         
1609         DEBUG(CmiMemoryCheck());
1610         for(int i=0;i<CkNumPes();i++){
1611                 CmiSyncSend(i,totalSize,requestMsg);
1612         }
1613         CmiFree(requestMsg);
1614
1615         clearUpMigratedRetainedLists(CmiMyPe());
1616         //TODO: clear ticketTable
1617         
1618         traceUserBracketEvent(30,_startTime,CkWallTimer());
1619         DEBUG(CmiMemoryCheck());
1620 }
1621
1622
1623 void _checkpointAckHandler(CheckPointAck *ackMsg){
1624         DEBUG(CmiMemoryCheck());
1625         unAckedCheckpoint=0;
1626         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));    
1627         if(onGoingLoadBalancing){
1628                 onGoingLoadBalancing = 0;
1629                 finishedCheckpointLoadBalancing();
1630         }else{
1631                 sendRemoveLogRequests();
1632         }
1633         CmiFree(ackMsg);
1634         
1635 };
1636
1637 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1638         DEBUG(CmiMemoryCheck());
1639         CmiMemoryCheck();
1640         char *data = (char *)_data;
1641         RemoveLogRequest *request = (RemoveLogRequest *)data;
1642         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1643         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1644
1645         int count=0;
1646         for(int i=0;i<mlog->length();i++){
1647                 MlogEntry *logEntry = mlog->deq();
1648                 int match=0;
1649                 for(int j=0;j<request->numberObjects;j++){
1650                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1651                                 //this log Entry should be removed
1652                                 match = 1;
1653                                 break;
1654                         }
1655                 }
1656                 char senderString[100],recverString[100];
1657 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1658                 if(match){
1659                         count++;
1660                         delete logEntry;
1661                 }else{
1662                         mlog->enq(logEntry);
1663                 }
1664         }
1665         if(count > 0){
1666                 char nameString[100];
1667                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1668         }
1669         DEBUG(CmiMemoryCheck());
1670         CmiMemoryCheck();
1671 };
1672
1673 void _removeProcessedLogHandler(char *requestMsg){
1674         double start = CkWallTimer();
1675         forAllCharesDo(removeProcessedLogs,requestMsg);
1676         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1677         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1678         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1679         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1680         if(request->PE == getCheckPointPE()){
1681                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1682                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1683                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1684                 int count=0;
1685 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1686                 DEBUG(char nameString[100];)
1687                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1688                 DEBUG(})*/
1689                 for(int i=0;i<localQ->length();i++){
1690                         LocalMessageLog localLogEntry = (*localQ)[i];
1691                         if(!fault_aware(localLogEntry.recver)){
1692                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1693                         }
1694                         bool keep = true;
1695                         for(int j=0;j<request->numberObjects;j++){                              
1696                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1697                                         keep = false;
1698                                         break;
1699                                 }
1700                         }       
1701                         if(keep){
1702                                 tempQ->enq(localLogEntry);
1703                         }else{
1704                                 count++;
1705                         }
1706                 }
1707                 delete localQ;
1708                 CpvAccess(_localMessageLog) = tempQ;
1709                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1710         }
1711
1712         /*
1713                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1714         */
1715         CmiMemoryCheck();
1716         clearUpMigratedRetainedLists(request->PE);
1717         
1718         traceUserBracketEvent(20,start,CkWallTimer());
1719         CmiFree(requestMsg);    
1720 };
1721
1722
1723 void clearUpMigratedRetainedLists(int PE){
1724         int count=0;
1725         CmiMemoryCheck();
1726         
1727         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1728                 if(migratedNoticeList[j].toPE == PE){
1729                         migratedNoticeList[j].ackTo = 1;
1730                 }
1731                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1732                         migratedNoticeList.remove(j);
1733                         count++;
1734                 }
1735         }
1736         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1737         
1738         for(int j=retainedObjectList.size()-1;j>=0;j--){
1739                 if(retainedObjectList[j]->migRecord.toPE == PE){
1740                         RetainedMigratedObject *obj = retainedObjectList[j];
1741                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1742                         retainedObjectList.remove(j);
1743                         if(obj->msg != NULL){
1744                                 CmiMemoryCheck();
1745                                 CmiFree(obj->msg);
1746                         }
1747                         delete obj;
1748                 }
1749         }
1750 }
1751
1752 /***************************************************************
1753         Restart Methods and handlers
1754 ***************************************************************/        
1755
1756 /**
1757  * Function for restarting an object with message logging
1758  */
1759 void CkMlogRestart(const char * dummy, CkArgMsg *dummyMsg){
1760         printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1761         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1762         _restartFlag = 1;
1763         RestartRequest msg;
1764         msg.PE = CkMyPe();
1765         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1766         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1767 };
1768
1769 void CkMlogRestartDouble(void *,double){
1770         CkMlogRestart(NULL);
1771 };
1772
1773
1774 void readCheckpointFromDisk(int size,char *buf){
1775         char fName[100];
1776         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1777
1778         int fd = open(fName,O_RDONLY);
1779         int count=0;
1780         while(count < size){
1781                 count += read(fd,&buf[count],size-count);
1782         }
1783         close(fd);
1784         
1785 };
1786
1787
1788 void sendCheckpointData();
1789
1790 void _getCheckpointHandler(RestartRequest *restartMsg){
1791         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1792         CkAssert(restartMsg->PE == storedChkpt->PE);
1793         storedRequest = restartMsg;
1794         
1795         verifyAckTotal = 0;
1796         for(int i=0;i<migratedNoticeList.size();i++){
1797                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1798 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1799                         if(migratedNoticeList[i].ackFrom == 0){
1800                                 //need to verify if the object actually exists .. it might not
1801                                 //have been acked but it might exist on it
1802                                 VerifyAckMsg msg;
1803                                 msg.migRecord = migratedNoticeList[i];
1804                                 msg.index = i;
1805                                 msg.fromPE = CmiMyPe();
1806                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1807                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1808                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1809                                 verifyAckTotal++;
1810                         }
1811                 }
1812         }
1813         
1814         if(verifyAckTotal == 0){
1815                 sendCheckpointData();
1816         }
1817         verifyAckCount = 0;
1818 }
1819
1820
1821 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1822         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1823         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1824         if(rec != NULL && rec->type() == CkLocRec::local){
1825                         //this location exists on this processor
1826                         //and needs to be removed       
1827                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1828                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1829                         
1830                         int localIdx = localRec->getLocalIndex();
1831                         LBDatabase *lbdb = localRec->getLBDB();
1832                         LDObjHandle ldHandle = localRec->getLdHandle();
1833                                 
1834                         locMgr->setDuringMigration(true);
1835                         
1836                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1837                         lbdb->UnregisterObj(ldHandle);
1838                         
1839                         locMgr->setDuringMigration(false);
1840                         
1841                         verifyAckedRequests++;
1842
1843         }
1844         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1845         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1846 };
1847
1848
1849 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1850         int index =     verifyReply->index;
1851         migratedNoticeList[index] = verifyReply->migRecord;
1852         verifyAckCount++;
1853         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1854         if(verifyAckCount == verifyAckTotal){
1855                 sendCheckpointData();
1856         }
1857 }
1858
1859
1860
1861 void sendCheckpointData(){      
1862         RestartRequest *restartMsg = storedRequest;
1863         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1864         int numMigratedAwayElements = migratedNoticeList.size();
1865         if(migratedNoticeList.size() != 0){
1866                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1867 //                      CkAssert(migratedNoticeList.size() == 0);
1868         }
1869         
1870         
1871         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1872         
1873         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1874         
1875         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
1876         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
1877         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1878         
1879         char *msg = (char *)CmiAlloc(totalSize);
1880         
1881         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1882         dataMsg->PE = CkMyPe();
1883         dataMsg->restartWallTime = CmiTimer();
1884         dataMsg->checkPointSize = storedChkpt->bufSize;
1885         
1886         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1887 //      dataMsg->numMigratedAwayElements = 0;
1888         
1889         dataMsg->numMigratedInElements = 0;
1890         dataMsg->migratedElementSize = 0;
1891         dataMsg->lbGroupID = globalLBID;
1892         /*msg layout 
1893                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1894                 Local MessageLog|
1895         */
1896         //store checkpoint data
1897         char *buf = &msg[sizeof(RestartProcessorData)];
1898
1899         if(dataMsg->numMigratedAwayElements != 0){
1900                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1901                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1902         }
1903         
1904
1905 #ifdef CHECKPOINT_DISK
1906         readCheckpointFromDisk(storedChkpt->bufSize,buf);
1907 #else   
1908         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1909 #endif
1910         buf = &buf[storedChkpt->bufSize];
1911
1912
1913         //store localmessage Log
1914         dataMsg->numLocalMessages = localMsgQ->length();
1915         for(int i=0;i<localMsgQ->length();i++){
1916                 if(!fault_aware(((*localMsgQ)[i]).recver )){
1917                         CmiAbort("Non fault aware localMsgQ");
1918                 }
1919                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
1920                 buf = &buf[sizeof(LocalMessageLog)];
1921         }
1922         
1923         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1924         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1925         CmiFree(restartMsg);
1926 };
1927
1928
1929 // this list is used to create a vector of the object ids of all
1930 //the chares on this processor currently and the highest TN processed by them 
1931 //the first argument is actually a CkVec<TProcessedLog> *
1932 void createObjIDList(void *data,ChareMlogData *mlogData){
1933         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
1934         TProcessedLog entry;
1935         entry.recver = mlogData->objID;
1936         entry.tProcessed = mlogData->tProcessed;
1937         list->push_back(entry);
1938         char objString[100];
1939         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
1940 }
1941
1942
1943
1944 void _recvCheckpointHandler(char *_restartData){
1945         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1946         MigrationRecord *migratedAwayElements;
1947
1948         globalLBID = restartData->lbGroupID;
1949         
1950         restartData->restartWallTime *= 1000;
1951         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1952         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1953         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1954
1955         
1956         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1957         char *buf = &_restartData[sizeof(RestartProcessorData)];
1958         
1959         if(restartData->numMigratedAwayElements != 0){
1960                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1961                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1962                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1963                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1964         }
1965         
1966         PUP::fromMem pBuf(buf);
1967
1968         pBuf | checkpointCount;
1969
1970         CkPupROData(pBuf);
1971         CkPupGroupData(pBuf);
1972         CkPupNodeGroupData(pBuf);
1973 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1974         pupArrayElementsSkip(pBuf,NULL);
1975         CkAssert(pBuf.size() == restartData->checkPointSize);
1976         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1977         
1978         forAllCharesDo(initializeRestart,NULL);
1979         
1980         //store the restored local message log in a vector
1981         buf = &buf[restartData->checkPointSize];        
1982         for(int i=0;i<restartData->numLocalMessages;i++){
1983                 LocalMessageLog logEntry;
1984                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
1985                 
1986                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
1987                 if(recverObj!=NULL){
1988                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
1989                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
1990                         char senderString[100];
1991                         char recverString[100];
1992                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
1993                 }else{
1994 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
1995                 }
1996                 buf = &buf[sizeof(LocalMessageLog)];
1997         }
1998
1999         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2000
2001         CmiFree(_restartData);
2002         
2003         
2004         _initDone();
2005
2006         getGlobalStep(globalLBID);
2007         
2008         countUpdateHomeAcks = 0;
2009         RestartRequest updateHomeRequest;
2010         updateHomeRequest.PE = CmiMyPe();
2011         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2012         for(int i=0;i<CmiNumPes();i++){
2013                 if(i != CmiMyPe()){
2014                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2015                 }
2016         }
2017
2018 }
2019
2020 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2021         countUpdateHomeAcks++;
2022         CmiFree(updateHomeAck);
2023         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2024         if(countUpdateHomeAcks != CmiNumPes()){
2025                 return;
2026         }
2027
2028         // Send out the request to resend logged messages to all other processors
2029         CkVec<TProcessedLog> objectVec;
2030         forAllCharesDo(createObjIDList, (void *)&objectVec);
2031         int numberObjects = objectVec.size();
2032         
2033         /*
2034                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2035         */
2036         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2037         char *resendMsg = (char *)CmiAlloc(totalSize);
2038         
2039
2040         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2041         resendReq->PE =CkMyPe(); 
2042         resendReq->numberObjects = numberObjects;
2043         char *objList = &resendMsg[sizeof(ResendRequest)];
2044         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2045         
2046
2047         
2048
2049         /* test for parallel restart migrate away object**/
2050         if(parallelRestart){
2051                 distributeRestartedObjects();
2052                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2053         }
2054         
2055         /*      To make restart work for load balancing.. should only
2056         be used when checkpoint happens along with load balancing
2057         **/
2058 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2059
2060         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2061         CpvAccess(_currentObj) = lb;
2062         lb->ReceiveDummyMigration(restartDecisionNumber);
2063
2064         sleep(10);
2065         
2066         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2067         for(int i=0;i<CkNumPes();i++){
2068                 if(i != CkMyPe()){
2069                         CmiSyncSend(i,totalSize,resendMsg);
2070                 }       
2071         }
2072         _resendMessagesHandler(resendMsg);
2073         CmiFree(resendMsg);
2074 };
2075
2076 void initializeRestart(void *data,ChareMlogData *mlogData){
2077         mlogData->resendReplyRecvd = 0;
2078         mlogData->receivedTNs = new CkVec<MCount>;
2079         mlogData->restartFlag = 1;
2080         mlogData->restoredLocalMsgLog.removeAll();
2081         mlogData->mapTable.empty();
2082 };
2083
2084
2085 void updateHomePE(void *data,ChareMlogData *mlogData){
2086         RestartRequest *updateRequest = (RestartRequest *)data;
2087         int PE = updateRequest->PE; //restarted PE
2088         //if this object is an array Element and its home is the restarted processor
2089         // the home processor needs to know its current location
2090         if(mlogData->objID.type == TypeArray){
2091                 //it is an array element
2092                 CkGroupID myGID = mlogData->objID.data.array.id;
2093                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2094                 CkArrayID aid(mlogData->objID.data.array.id);           
2095                 //check if the restarted processor is the home processor for this object
2096                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2097                 if(locMgr->homePe(myIdx) == PE){
2098                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2099                         DEBUGRESTART(myIdx.print());
2100                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2101                 }
2102         }
2103 };
2104
2105
2106 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2107         
2108         int sender = updateRequest->PE;
2109         
2110         forAllCharesDo(updateHomePE,updateRequest);
2111         
2112         updateRequest->PE = CmiMyPe();
2113         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2114         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2115         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2116                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2117                 checkpointCount--;
2118                 startMlogCheckpoint(NULL,0);
2119         }
2120         if(sender == getCheckPointPE()){
2121                 for(int i=0;i<retainedObjectList.size();i++){
2122                         if(retainedObjectList[i]->acked == 0){
2123                                 MigrationNotice migMsg;
2124                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2125                                 migMsg.record = retainedObjectList[i];
2126                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2127                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2128                         }
2129                 }
2130
2131         }
2132 }
2133
2134
2135 //the data argument is of type ResendData which contains the 
2136 //array of objects on  the restartedProcessor
2137 //this method resends the messages stored in this chare's message log 
2138 //to the restarted processor. It also accumulates the maximum TN
2139 //for all the objects on the restarted processor
2140 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2141         char nameString[100];
2142         ResendData *resendData = (ResendData *)data;
2143         int PE = resendData->PE; //restarted PE
2144         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2145         int count=0;
2146         int ticketRequests=0;
2147         CkQ<MlogEntry *> *log = mlogData->getMlog();
2148
2149         
2150         
2151         
2152         for(int i=0;i<log->length();i++){
2153                 MlogEntry *logEntry = (*log)[i];
2154                 
2155                 // if we sent out the logs of a local message to buddy and he crashed
2156                 //before acking
2157                 envelope *env = logEntry->env;
2158                 if(env == NULL){
2159                         continue;
2160                 }
2161                 if(logEntry->unackedLocal){
2162                         char recverString[100];
2163                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2164                         sendLocalMessageCopy(logEntry);
2165                 }
2166                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2167                 if(env->TN <= 0){
2168                         //ticket not yet replied send it out again
2169                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2170                 }
2171                 
2172                 if(env->recver.type != TypeInvalid){
2173                         int flag = 0;//marks if any of the restarted objects matched this log entry
2174                         for(int j=0;j<resendData->numberObjects;j++){
2175                                 if(env->recver == (resendData->listObjects)[j].recver){
2176                                         flag = 1;
2177                                         //message has a valid TN
2178                                         if(env->TN > 0){
2179                                                 //store maxTicket
2180                                                 if(env->TN > resendData->maxTickets[j]){
2181                                                         resendData->maxTickets[j] = env->TN;
2182                                                 }
2183                                                 //if the TN for this entry is more than the TN processed, send the message out
2184                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2185                                                         //store the TNs that have been since the recver last checkpointed
2186                                                         resendData->ticketVecs[j].push_back(env->TN);
2187                                                         
2188                                                         if(PE != CkMyPe()){
2189                                                                 if(env->recver.type == TypeNodeGroup){
2190                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2191                                                                 }else{
2192                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2193                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2194                                                                 }
2195                                                         }else{
2196                                                                 envelope *copyEnv = copyEnvelope(env);
2197                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2198                                                         }
2199                                                         char senderString[100];
2200                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2201                                                         count++;
2202                                                 }       
2203                                         }else{
2204 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2205                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2206                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2207                                                 CkAssert(logEntry->destPE != CkMyPe());
2208                                                 
2209                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2210                                                 
2211                                                 ticketRequests++;*/
2212                                         }
2213                                 }
2214                         }//end of for loop of objects
2215                         
2216                 }       
2217         }
2218         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2219 }
2220
2221 void _resendMessagesHandler(char *msg){
2222         ResendRequest *resendReq = (ResendRequest *)msg;
2223         char *listObjects = &msg[sizeof(ResendRequest)];
2224         ResendData d;
2225         d.numberObjects = resendReq->numberObjects;
2226         d.PE = resendReq->PE;
2227         d.listObjects = (TProcessedLog *)listObjects;
2228         d.maxTickets = new MCount[d.numberObjects];
2229         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2230         for(int i=0;i<d.numberObjects;i++){
2231                 d.maxTickets[i] = 0;
2232         }
2233
2234         //Check if any of the retained objects need to be recreated
2235         //If they have not been recreated on the restarted processor
2236         //they need to be recreated on this processor
2237         int count=0;
2238         for(int i=0;i<retainedObjectList.size();i++){
2239                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2240                         count++;
2241                         int recreate=1;
2242                         for(int j=0;j<d.numberObjects;j++){
2243                                 if(d.listObjects[j].recver.type != TypeArray ){
2244                                         continue;
2245                                 }
2246                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2247                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2248                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2249                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2250                                                 recreate = 0;
2251                                                 break;
2252                                         }
2253                                 }
2254                         }
2255                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2256                         if(recreate){
2257                                 donotCountMigration=1;
2258                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2259                                 donotCountMigration=0;
2260                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2261                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2262                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2263
2264                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2265                                 
2266                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2267                                 CmiAssert(rec->type() == CkLocRec::local);
2268                                 CkVec<CkMigratable *> eltList;
2269                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2270                                 for(int j=0;j<eltList.size();j++){
2271                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2272                                                 CpvAccess(_currentObj) = eltList[j];
2273                                                 eltList[j]->ResumeFromSync();
2274                                         }
2275                                 }
2276
2277
2278                                 
2279                                 retainedObjectList[i]->msg=NULL;
2280                                 
2281                                         
2282                         }
2283                 }
2284         }
2285         
2286         if(count > 0){
2287 //              CmiAbort("retainedObjectList for restarted processor not empty");
2288         }
2289
2290
2291         
2292         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2293         forAllCharesDo(resendMessageForChare,&d);
2294
2295         //send back the maximum ticket number for a message sent to each object on the 
2296         //restarted processor
2297         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2298         
2299         int totalTNStored=0;
2300         for(int i=0;i<d.numberObjects;i++){
2301                 totalTNStored += d.ticketVecs[i].size();
2302         }
2303         
2304         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2305         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2306         
2307         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2308         resendReply->PE = CkMyPe();
2309         resendReply->numberObjects = d.numberObjects;
2310         
2311         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2312         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2313         for(int i=0;i<d.numberObjects;i++){
2314                 replyObjects[i] = d.listObjects[i].recver;
2315         }
2316         
2317         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2318         for(int i=0;i<d.numberObjects;i++){
2319                 int vecsize = d.ticketVecs[i].size();
2320                 memcpy(ticketList,&vecsize,sizeof(int));
2321                 ticketList = &ticketList[sizeof(int)];
2322                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2323                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2324         }       
2325
2326         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2327         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2328         
2329 /*      
2330         if(verifyAckRequestsUnacked){
2331                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2332                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2333                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2334                         LDObjHandle h;
2335                         lb->Migrated(h,1);
2336                 }
2337         }
2338         
2339         verifyAckRequestsUnacked=0;*/
2340
2341
2342         
2343         delete [] d.maxTickets;
2344         delete [] d.ticketVecs;
2345         if(resendReq->PE != CkMyPe()){
2346                 CmiFree(msg);
2347         }       
2348 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2349         lastRestart = CmiWallTimer();
2350 }
2351
2352 void sortVec(CkVec<MCount> *TNvec);
2353 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2354
2355 void _resendReplyHandler(char *msg){    
2356         /**
2357                 need to rewrite this method to deal with parallel restart
2358         */
2359         ResendRequest *resendReply = (ResendRequest *)msg;
2360         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2361
2362         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2363         
2364         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2365         for(int i =0; i< resendReply->numberObjects;i++){       
2366                 Chare *obj = (Chare *)listObjects[i].getObject();
2367                 
2368                 int vecsize;
2369                 memcpy(&vecsize,listTickets,sizeof(int));
2370                 listTickets = &listTickets[sizeof(int)];
2371                 MCount *listTNs = (MCount *)listTickets;
2372                 
2373                 
2374                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2375                 
2376                 if(obj != NULL){
2377                         //the object was restarted on the processor on which it existed
2378                         processReceivedTN(obj,vecsize,listTNs);
2379                 }else{
2380                 //pack up objID vecsize and listTNs and send it to the correct processor
2381                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2382                         char *TNMsg = (char *)CmiAlloc(totalSize);
2383                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2384                         receivedTNData->recver = listObjects[i];
2385                         receivedTNData->numTNs = vecsize;
2386                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2387                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2388
2389                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2390                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2391                 }
2392                 
2393         }
2394 };
2395
2396 void _receivedTNDataHandler(ReceivedTNData *msg){
2397         char objName[100];
2398         Chare *obj = (Chare *) msg->recver.getObject();
2399         if(obj){                
2400                 char *_msg = (char *)msg;
2401                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2402                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2403                 processReceivedTN(obj,msg->numTNs,listTNs);
2404         }else{
2405                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2406                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2407         }
2408 };
2409
2410
2411 void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
2412         obj->mlogData->resendReplyRecvd++;
2413
2414         
2415         for(int j=0;j<listSize;j++){
2416                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2417         }
2418         
2419         //if this object has received all the replies find the ticket numbers
2420         //that senders know about. Those less than the ticket number processed 
2421         //by the receiver can be thrown away. The rest need not be consecutive
2422         // ie there can be holes in the list of ticket numbers seen by senders
2423         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2424                 obj->mlogData->resendReplyRecvd = 0;
2425                 //sort the received TNS
2426                 sortVec(obj->mlogData->receivedTNs);
2427         
2428                 //after all the received tickets are in we need to sort them and then 
2429                 // calculate the holes
2430                 
2431                 if(obj->mlogData->receivedTNs->size() > 0){
2432                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2433                         int vecsize = obj->mlogData->receivedTNs->size();
2434                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2435                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2436                         if(numberHoles == 0){
2437                         }else{
2438                                 char objName[100];                                      
2439                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2440                                 obj->mlogData->numberHoles = numberHoles;
2441                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2442                                 int countHoles=0;
2443                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2444                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2445                                                 //the TNs are not consecutive at this point
2446                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2447                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2448                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2449                                                         countHoles++;
2450                                                 }       
2451                                         }
2452                                 }
2453                                 //Holes have been given new TN
2454                                 if(countHoles != numberHoles){
2455                                         char str[100];
2456                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2457                                 }
2458                                 CkAssert(countHoles == numberHoles);                                    
2459                                 obj->mlogData->currentHoles = numberHoles;
2460                         }
2461                 }       
2462                 
2463                 delete obj->mlogData->receivedTNs;
2464                 obj->mlogData->receivedTNs = NULL;
2465                 obj->mlogData->restartFlag = 0;
2466                 char objString[100];
2467                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2468         }
2469
2470 }
2471
2472
2473 void sortVec(CkVec<MCount> *TNvec){
2474         //sort it ->its bloddy bubble sort
2475         //TODO: use quicksort
2476         for(int i=0;i<TNvec->size();i++){
2477                 for(int j=i+1;j<TNvec->size();j++){
2478                         if((*TNvec)[j] < (*TNvec)[i]){
2479                                 MCount temp;
2480                                 temp = (*TNvec)[i];
2481                                 (*TNvec)[i] = (*TNvec)[j];
2482                                 (*TNvec)[j] = temp;
2483                         }
2484                 }
2485         }
2486         //make it unique .. since its sorted all equal units will be consecutive
2487         MCount *tempArray = new MCount[TNvec->size()];
2488         int     uniqueCount=-1;
2489         for(int i=0;i<TNvec->size();i++){
2490                 tempArray[i] = 0;
2491                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2492                         uniqueCount++;
2493                         tempArray[uniqueCount] = (*TNvec)[i];
2494                 }
2495         }
2496         uniqueCount++;
2497         TNvec->removeAll();
2498         for(int i=0;i<uniqueCount;i++){
2499                 TNvec->push_back(tempArray[i]);
2500         }
2501         delete [] tempArray;
2502 }       
2503
2504 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2505         if(TNVec->size() == 0){
2506                 return -1; //not found in an empty vec
2507         }
2508         //binary search to find 
2509         int left=0;
2510         int right = TNVec->size();
2511         int mid = (left +right)/2;
2512         while(searchTN != (*TNVec)[mid] && left < right){
2513                 if((*TNVec)[mid] > searchTN){
2514                         right = mid-1;
2515                 }else{
2516                         left = mid+1;
2517                 }
2518                 mid = (left + right)/2;
2519         }
2520         if(left < right){
2521                 //mid is the element to be returned
2522                 return mid;
2523         }else{
2524                 if(mid < TNVec->size() && mid >=0){
2525                         if((*TNVec)[mid] == searchTN){
2526                                 return mid;
2527                         }else{
2528                                 return -1;
2529                         }
2530                 }else{
2531                         return -1;
2532                 }
2533         }
2534 };
2535
2536
2537 /*
2538         Method to do parallel restart. Distribute some of the array elements to other processors.
2539         The problem is that we cant use to charm entry methods to do migration as it will get
2540         stuck in the protocol that is going to restart
2541 */
2542
2543 class ElementDistributor: public CkLocIterator{
2544         CkLocMgr *locMgr;
2545         int *targetPE;
2546         void pupLocation(CkLocation &loc,PUP::er &p){
2547                 CkArrayIndexMax idx=loc.getIndex();
2548                 CkGroupID gID = locMgr->ckGetGroupID();
2549                 p|gID;      // store loc mgr's GID as well for easier restore
2550                 p|idx;
2551                 p|loc;
2552         };
2553         public:
2554                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2555                 void addLocation(CkLocation &loc){
2556                         if(*targetPE == CkMyPe()){
2557                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2558                                 return;
2559                         }
2560                         
2561                         CkArrayIndexMax idx=loc.getIndex();
2562                         CkLocRec_local *rec = loc.getLocalRecord();
2563                         
2564                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2565                         idx.print();
2566                         
2567
2568                         //TODO: an element that is being moved should leave some trace behind so that
2569                         // the arraybroadcaster can forward messages to it
2570                         
2571                         //pack up this location and send it across
2572                         PUP::sizer psizer;
2573                         pupLocation(loc,psizer);
2574                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2575                         char *msg = (char *)CmiAlloc(totalSize);
2576                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2577                         PUP::toMem pmem(buf);
2578                         pmem.becomeDeleting();
2579                         pupLocation(loc,pmem);
2580                         
2581                         locMgr->setDuringMigration(CmiTrue);                    
2582                         delete rec;
2583                         locMgr->setDuringMigration(CmiFalse);                   
2584                         locMgr->inform(idx,*targetPE);
2585
2586                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2587                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2588
2589                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2590                         //decide on the target processor for the next object
2591                         *targetPE = (*targetPE +1)%CkNumPes();
2592                 }
2593                 
2594 };
2595
2596 void distributeRestartedObjects(){
2597         int numGroups = CkpvAccess(_groupIDTable)->size();      
2598         int i;
2599         int targetPE=CkMyPe();
2600         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2601 };
2602
2603 void _distributedLocationHandler(char *receivedMsg){
2604         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2605         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2606         PUP::fromMem pmem(buf);
2607         CkGroupID gID;
2608         CkArrayIndexMax idx;
2609         pmem |gID;
2610         pmem |idx;
2611         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2612         donotCountMigration=1;
2613         mgr->resume(idx,pmem);
2614         donotCountMigration=0;
2615         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2616         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2617         idx.print();
2618
2619         CkLocRec *rec = mgr->elementRec(idx);
2620         CmiAssert(rec->type() == CkLocRec::local);
2621         
2622         CkVec<CkMigratable *> eltList;
2623         mgr->migratableList((CkLocRec_local *)rec,eltList);
2624         for(int i=0;i<eltList.size();i++){
2625                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2626                         CpvAccess(_currentObj) = eltList[i];
2627                         eltList[i]->ResumeFromSync();
2628                 }
2629         }
2630         
2631         
2632 }
2633
2634
2635 /** this method is used to send messages to a restarted processor to tell
2636  * it that a particular expected object is not going to get to it */
2637 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2638         DummyMigrationMsg buf;
2639         buf.flag = MLOG_OBJECT;
2640         buf.lbID = lbID;
2641         buf.mgrID = locMgrID;
2642         buf.idx = idx;
2643         buf.locationPE = locationPE;
2644         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2645         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2646 };
2647
2648
2649 /**this method is used by a restarted processor to tell other processors
2650  * that they are not going to receive these many objects.. just the count
2651  * not the objects themselves ***/
2652
2653 void sendDummyMigrationCounts(int *dummyCounts){
2654         DummyMigrationMsg buf;
2655         buf.flag = MLOG_COUNT;
2656         buf.lbID = globalLBID;
2657         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2658         for(int i=0;i<CmiNumPes();i++){
2659                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2660                         buf.count = dummyCounts[i];
2661                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2662                 }
2663         }
2664 }
2665
2666
2667 /** this handler is used to process a dummy migration msg.
2668  * it looks up the load balancer and calls migrated for it */
2669
2670 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2671         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2672         if(msg->flag == MLOG_OBJECT){
2673                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2674                 LDObjHandle h;
2675                 lb->Migrated(h,1);
2676         }
2677         if(msg->flag == MLOG_COUNT){
2678                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2679                 msg->count -= verifyAckedRequests;
2680                 for(int i=0;i<msg->count;i++){
2681                         LDObjHandle h;
2682                         lb->Migrated(h,1);
2683                 }
2684         }
2685         verifyAckedRequests=0;
2686         CmiFree(msg);
2687 };
2688
2689
2690
2691
2692
2693
2694
2695 /*****************************************************
2696         Implementation of a method that can be used to call
2697         any method on the ChareMlogData of all the chares on
2698         a processor currently
2699 ******************************************************/
2700
2701
2702 class ElementCaller :  public CkLocIterator {
2703 private:
2704         CkLocMgr *locMgr;
2705         MlogFn fnPointer;
2706         void *data;
2707 public:
2708         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2709                 locMgr = _locMgr;
2710                 fnPointer = _fnPointer;
2711                 data = _data;
2712         };
2713         void addLocation(CkLocation &loc){
2714                 CkVec<CkMigratable *> list;
2715                 CkLocRec_local *local = loc.getLocalRecord();
2716                 locMgr->migratableList (local,list);
2717                 for(int i=0;i<list.size();i++){
2718                         CkMigratable *migratableElement = list[i];
2719                         fnPointer(data,migratableElement->mlogData);
2720                 }
2721         }
2722 };
2723
2724 void forAllCharesDo(MlogFn fnPointer,void *data){
2725         int numGroups = CkpvAccess(_groupIDTable)->size();
2726         for(int i=0;i<numGroups;i++){
2727                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2728                 fnPointer(data,obj->mlogData);
2729         }
2730         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2731         for(int i=0;i<numNodeGroups;i++){
2732                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
2733                 fnPointer(data,obj->mlogData);
2734         }
2735         int i;
2736         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
2737         
2738         
2739 };
2740
2741
2742 /******************************************************************
2743  Load Balancing
2744 ******************************************************************/
2745
2746 void initMlogLBStep(CkGroupID gid){
2747         countLBMigratedAway = 0;
2748         countLBToMigrate=0;
2749         onGoingLoadBalancing=1;
2750         migrationDoneCalled=0;
2751         checkpointBarrierCount=0;
2752         if(globalLBID.idx != 0){
2753                 CmiAssert(globalLBID.idx == gid.idx);
2754         }
2755         globalLBID = gid;
2756 }
2757
2758 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
2759         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
2760         
2761         resumeLbFnPtr = _fnPtr;
2762         centralLb = _centralLb;
2763         migrationDoneCalled = 1;
2764         if(countLBToMigrate == countLBMigratedAway){
2765                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2766                 startMlogCheckpoint(NULL,CmiWallTimer());       
2767         }
2768 };
2769
2770 void finishedCheckpointLoadBalancing(){
2771         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
2772         CheckpointBarrierMsg msg;
2773         msg.fromPE = CmiMyPe();
2774         msg.checkpointCount = checkpointCount;
2775
2776         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
2777         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
2778         
2779 };
2780
2781
2782 void sendMlogLocation(int targetPE,envelope *env){
2783         void *_msg = EnvToUsr(env);
2784         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2785
2786
2787         int existing = 0;
2788         //if this object is already in the retainedobjectlust destined for this
2789         //processor it should not be sent
2790         
2791         for(int i=0;i<retainedObjectList.size();i++){
2792                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
2793                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
2794                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
2795                         existing = 1;
2796                         break;
2797                 }
2798         }
2799
2800         if(existing){
2801                 return;
2802         }
2803         
2804         
2805         countLBToMigrate++;
2806         
2807         MigrationNotice migMsg;
2808         migMsg.migRecord.gID = msg->gid;
2809         migMsg.migRecord.idx = msg->idx;
2810         migMsg.migRecord.fromPE = CkMyPe();
2811         migMsg.migRecord.toPE =  targetPE;
2812         
2813         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
2814         
2815         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
2816         retainedObject->migRecord = migMsg.migRecord;
2817         retainedObject->acked  = 0;
2818         
2819         CkPackMessage(&env);
2820         
2821         migMsg.record = retainedObject;
2822         retainedObject->msg = env;
2823         int size = retainedObject->size = env->getTotalsize();
2824         
2825         retainedObjectList.push_back(retainedObject);
2826         
2827         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2828         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2829         
2830         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
2831
2832 }
2833
2834 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
2835         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
2836         migratedNoticeList.push_back(msg->migRecord);
2837
2838         MigrationNoticeAck buf;
2839         buf.record = msg->record;
2840         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
2841         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
2842 }
2843
2844 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
2845         
2846         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
2847         retainedObject->acked = 1;
2848
2849         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
2850         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
2851
2852         //inform home about the new location of this object
2853         CkGroupID gID = retainedObject->migRecord.gID ;
2854         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2855         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
2856         
2857         countLBMigratedAway++;
2858         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
2859                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2860                 startMlogCheckpoint(NULL,CmiWallTimer());
2861         }
2862 };
2863
2864 void _receiveMlogLocationHandler(void *buf){
2865         envelope *env = (envelope *)buf;
2866         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
2867         CkUnpackMessage(&env);
2868         void *_msg = EnvToUsr(env);
2869         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2870         CkGroupID gID= msg->gid;
2871         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
2872         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2873         CpvAccess(_currentObj)=mgr;
2874         mgr->immigrate(msg);
2875 };
2876
2877
2878 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
2879 /*      if(mlogData->objID.type == TypeArray){
2880                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
2881         //      TODO: make sure later that atSync has been called and it needs 
2882         //      to be resumed from sync
2883         //
2884                 CpvAccess(_currentObj) = elt;
2885                 elt->ResumeFromSync();
2886         }*/
2887 }
2888
2889 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
2890         if(checkpointBarrierCount == CmiNumPes()){
2891                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
2892                 for(int i=0;i<CmiNumPes();i++){
2893                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
2894                 }
2895         }
2896 }
2897
2898 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
2899         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
2900         if(msg->checkpointCount == checkpointCount){
2901                 checkpointBarrierCount++;
2902                 checkAndSendCheckpointBarrierAcks(msg);
2903         }else{
2904                 if(msg->checkpointCount-1 == checkpointCount){
2905                         checkpointBarrierCount++;
2906                         checkAndSendCheckpointBarrierAcks(msg);
2907                 }else{
2908                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
2909                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
2910                 }
2911         }
2912         CmiFree(msg);
2913 }
2914
2915 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
2916         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
2917         sendRemoveLogRequests();
2918         (*resumeLbFnPtr)(centralLb);
2919         CmiFree(msg);
2920 }
2921
2922 /**
2923         method that informs an array elements home processor of its current location
2924         It is a converse method to bypass the charm++ message logging framework
2925 */
2926
2927 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
2928         double _startTime = CmiWallTimer();
2929         CurrentLocationMsg msg;
2930         msg.mgrID = locMgrID;
2931         msg.idx = idx;
2932         msg.locationPE = currentPE;
2933         msg.fromPE = CkMyPe();
2934
2935         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
2936         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
2937         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
2938         traceUserBracketEvent(37,_startTime,CmiWallTimer());
2939 }
2940
2941
2942 void _receiveLocationHandler(CurrentLocationMsg *data){
2943         double _startTime = CmiWallTimer();
2944         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
2945         if(mgr == NULL){
2946                 CmiFree(data);
2947                 return;
2948         }
2949         CkLocRec *rec = mgr->elementNrec(data->idx);
2950         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
2951         if(rec != NULL){
2952                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
2953                         if(data->fromPE == data->locationPE){
2954                                 CmiAbort("Another processor has the same object");
2955                         }
2956                 }
2957         }
2958         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
2959                 int targetPE = data->fromPE;
2960                 data->fromPE = CmiMyPe();
2961                 data->locationPE = CmiMyPe();
2962                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
2963                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
2964         }else{
2965                 mgr->inform(data->idx,data->locationPE);
2966         }
2967         CmiFree(data);
2968         traceUserBracketEvent(38,_startTime,CmiWallTimer());
2969 }
2970
2971
2972
2973 void getGlobalStep(CkGroupID gID){
2974         LBStepMsg msg;
2975         int destPE = 0;
2976         msg.lbID = gID;
2977         msg.fromPE = CmiMyPe();
2978         msg.step = -1;
2979         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
2980         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
2981 };
2982
2983 void _getGlobalStepHandler(LBStepMsg *msg){
2984         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2985         msg->step = lb->step();
2986         CmiAssert(msg->fromPE != CmiMyPe());
2987         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
2988         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
2989         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
2990 };
2991
2992 void _recvGlobalStepHandler(LBStepMsg *msg){
2993         
2994         restartDecisionNumber=msg->step;
2995         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
2996         _updateHomeAckHandler(dummyAck);
2997 };
2998
2999
3000
3001
3002
3003
3004
3005
3006 void _messageLoggingExit(){
3007 /*      if(CkMyPe() == 0){
3008                 if(countBuffered != 0){
3009                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3010                 }
3011
3012 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3013         }
3014         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3015         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3016 }
3017 /**********************************
3018         * The methods of the message logging
3019         * data structure stored in each chare
3020         ********************************/
3021
3022 MCount ChareMlogData::nextSN(const CkObjID &recver){
3023 /*      MCount SN = snTable.get(recver);
3024         snTable.put(recver) = SN+1;
3025         return SN+1;*/
3026         double _startTime = CmiWallTimer();
3027         MCount *SN = snTable.getPointer(recver);
3028         if(SN==NULL){
3029                 snTable.put(recver) = 1;
3030                 return 1;
3031         }else{
3032                 (*SN)++;
3033                 return *SN;
3034         }
3035 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3036 };
3037
3038
3039 MCount ChareMlogData::newTN(){
3040         MCount TN;
3041         if(currentHoles > 0){
3042                 int holeidx = numberHoles-currentHoles;
3043                 TN = ticketHoles[holeidx];
3044                 currentHoles--;
3045                 if(currentHoles == 0){
3046                         delete []ticketHoles;
3047                         numberHoles = 0;
3048                 }
3049         }else{
3050                 TN = ++tCount;
3051         }       
3052         return TN;
3053 };
3054
3055 Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
3056         char senderName[100];
3057         char recverName[100];
3058         double _startTime =CmiWallTimer();
3059         Ticket ticket;
3060         if(restartFlag){
3061                 ticket.TN = 0;
3062                 return ticket;
3063         }
3064 /*      SNToTicket &ticketRow = ticketTable.put(sender);
3065         Ticket earlierTicket = ticketRow.get(SN);
3066         if(earlierTicket.TN == 0){
3067                 //This SN has not been ever alloted a ticket
3068                 ticket.TN = newTN();
3069                 ticketRow.put(SN)=ticket;
3070         }else{
3071                 ticket.TN = earlierTicket.TN;
3072         }*/
3073         
3074
3075         SNToTicket *ticketRow = ticketTable.get(sender);
3076         if(ticketRow != NULL){
3077                 Ticket earlierTicket = ticketRow->get(SN);
3078                 if(earlierTicket.TN == 0){
3079                         ticket.TN = newTN();
3080                         ticketRow->put(SN) = ticket;
3081                         DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
3082                 }else{
3083                         ticket.TN = earlierTicket.TN;
3084                         if(ticket.TN > tCount){
3085                                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
3086                         }
3087                                 CmiAssert(ticket.TN <= tCount);
3088                 }
3089                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
3090         }else{
3091                 SNToTicket *newRow = new SNToTicket;            
3092                 ticket.TN = newTN();
3093                 newRow->put(SN) = ticket;
3094                 ticketTable.put(sender) = newRow;
3095                 DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
3096         }
3097 /*TODO: check if the message for this SN has already been received
3098         in the table of received SNs 
3099         If it was received before the last checkpoint mark it as old
3100         other wise received
3101         */
3102         ticket.state = NEW_TICKET;
3103 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3104         return ticket;  
3105 };
3106
3107 void ChareMlogData::addLogEntry(MlogEntry *entry){
3108         char nameString[100];
3109         DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
3110                 DEBUG(CmiMemoryCheck());
3111         mlog.enq(entry);
3112 };
3113
3114 double totalSearchRestoredTime=0;
3115 double totalSearchRestoredCount=0;
3116
3117
3118 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
3119         double start= CkWallTimer();
3120         MCount TN=0;    
3121         if(mapTable.numObjects() > 0){
3122                 RestoredLocalMap *map = mapTable.get(sender);
3123                 if(map){
3124                         int index = SN - map->minSN;
3125                         if(index < map->count){
3126                                 TN = map->TNArray[index];
3127                         }
3128                 }
3129         /*      if(i > 0){
3130                         printf("i %d restoredLocalMsgLog.size() %d\n",i,restoredLocalMsgLog.size());
3131                 }*/
3132         }
3133         
3134         char senderName[100],recverName[100];
3135         
3136         if(TN != 0){
3137                 DEBUG(CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN));
3138         }
3139         totalSearchRestoredTime += CkWallTimer()-start;
3140         totalSearchRestoredCount ++;
3141         return TN;
3142 }
3143
3144 void ChareMlogData::addToRestoredLocalQ(LocalMessageLog *logEntry){
3145         restoredLocalMsgLog.push_back(*logEntry);
3146 }
3147
3148 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData){
3149         mlogData->sortRestoredLocalMsgLog();
3150 }
3151
3152 void ChareMlogData::sortRestoredLocalMsgLog(){
3153         //sort it ->its bloddy bubble sort
3154         
3155         for(int i=0;i<restoredLocalMsgLog.size();i++){
3156                 LocalMessageLog &logEntry = restoredLocalMsgLog[i];
3157                 RestoredLocalMap *map = mapTable.get(logEntry.sender);
3158                 if(map == NULL){
3159                         map = new RestoredLocalMap;
3160                         mapTable.put(logEntry.sender)=map;
3161                 }
3162                 map->count++;
3163                 if(map->minSN == 0){
3164                         map->minSN = logEntry.SN;
3165                 }else{
3166                         if(logEntry.SN < map->minSN){
3167                                 map->minSN = logEntry.SN;
3168                         }
3169                 }
3170                 if(logEntry.SN > map->maxSN){
3171                         map->maxSN = logEntry.SN;
3172                 }
3173
3174         }
3175         for(int i=0;i< restoredLocalMsgLog.size();i++){
3176                 LocalMessageLog &logEntry = restoredLocalMsgLog[i];
3177                 RestoredLocalMap *map = mapTable.get(logEntry.sender);
3178                 CkAssert(map != NULL);
3179                 if(map->TNArray == NULL){
3180                         map->TNArray = new MCount[map->maxSN-map->minSN+1];                     
3181                         //HERE: erase from here
3182                         printf("map->count:%d\n",map->count);
3183                         printf("map->maxSN:%d\n",map->maxSN);
3184                         printf("map->minSN:%d\n",map->minSN);
3185                         //HERE: to here
3186
3187                         CkAssert(map->count == map->maxSN-map->minSN+1);
3188                         map->count = 0;
3189                 }
3190                 map->TNArray[map->count] = logEntry.TN;
3191                 map->count++;
3192         }
3193         restoredLocalMsgLog.free();
3194 }
3195
3196
3197
3198 void ChareMlogData::pup(PUP::er &p){
3199         int startSize=0;
3200         char nameStr[100];
3201         if(p.isSizing()){
3202                 PUP::sizer *sizep = (PUP::sizer *)&p;
3203                 startSize = sizep->size();
3204         }
3205         double _startTime = CkWallTimer();
3206         
3207         p | objID;
3208         p | tCount;
3209         p | tProcessed;
3210         if(p.isUnpacking()){
3211                 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
3212         }
3213         p | toResumeOrNot;
3214         p | resumeCount;
3215         DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
3216         
3217
3218         /*pack the receivedTN vector*/
3219         int lengthReceivedTNs;
3220         if(!p.isUnpacking()){
3221                 if(receivedTNs == NULL){
3222                         lengthReceivedTNs = -1;
3223                 }else{
3224                         lengthReceivedTNs = receivedTNs->size();                
3225                 }
3226         }
3227         p | lengthReceivedTNs;
3228         if(p.isUnpacking()){
3229                 if(lengthReceivedTNs == -1){
3230                         receivedTNs = NULL;
3231                 }else{
3232                         receivedTNs = new CkVec<MCount>;
3233                         for(int i=0;i<lengthReceivedTNs;i++){
3234                                 MCount tempTicket;
3235                                 p | tempTicket;
3236                                 CkAssert(tempTicket > 0);
3237                                 receivedTNs->push_back(tempTicket);
3238                         }
3239                 }
3240         }else{
3241                 for(int i=0;i<lengthReceivedTNs;i++){
3242                         p | (*receivedTNs)[i];
3243                 }
3244         }
3245         
3246         
3247         p | currentHoles;
3248         p | numberHoles;
3249         if(p.isUnpacking()){
3250                 if(numberHoles > 0){
3251                         ticketHoles = new MCount[numberHoles];                  
3252                 }else{
3253                         ticketHoles = NULL;
3254                 }
3255         }
3256         if(numberHoles > 0){
3257                 p(ticketHoles,numberHoles);
3258         }
3259         
3260         snTable.pup(p);
3261         //pup the message log as well
3262         int length;
3263         if(!p.isUnpacking()){           
3264                 length = mlog.length(); 
3265                 if(length > 0)
3266                         DEBUG(printf("[%d] Mlog length %d \n",CkMyPe(),length));
3267         /*      for(int i=0;i<length;i++){
3268                                 if(!fault_aware(mlog[i]->env->sender)){
3269                                         CkAbort("unknown sender in message log");
3270                                 }
3271                         }*/
3272         }
3273         p | length;
3274         for(int i=0;i<length;i++){
3275                 MlogEntry *entry;
3276                 if(p.isUnpacking()){
3277                         entry = new MlogEntry();
3278                         mlog.enq(entry);
3279                 }else{
3280                         entry = mlog[i];
3281                 }
3282 //              DEBUG(printf("[%d] entry at %d pointer %p about to be pupped\n",CkMyPe(),i,mlog[i]));
3283                 entry->pup(p);
3284         }
3285         
3286         p| restoredLocalMsgLog;
3287         p | resendReplyRecvd;
3288         p | restartFlag;
3289
3290         // pup the mapTable
3291         int tableSize;
3292         if(!p.isUnpacking()){
3293                 tableSize = mapTable.numObjects();
3294         }
3295         p | tableSize;
3296         if(!p.isUnpacking()){
3297                 CkHashtableIterator *iter = mapTable.iterator();
3298                 while(iter->hasNext()){
3299                         CkObjID *objID;
3300                         RestoredLocalMap **map = (RestoredLocalMap **) iter->next((void **)&objID);
3301                         p | (*objID);
3302                         (*map)->pup(p);
3303                 }
3304         }else{
3305                 for(int i=0;i<tableSize;i++){
3306                         CkObjID objID;
3307                         p | objID;
3308                         RestoredLocalMap *map = new RestoredLocalMap;
3309                         map->pup(p);
3310                         mapTable.put(objID) = map;
3311                 }
3312         }
3313
3314         //pup the ticketTable
3315         {
3316                 int ticketTableSize;
3317                 if(!p.isUnpacking()){
3318                         ticketTableSize = ticketTable.numObjects();
3319                 }
3320                 p | ticketTableSize;
3321                 if(!p.isUnpacking()){
3322                         CkHashtableIterator *iter = ticketTable.iterator();
3323                         while(iter->hasNext()){
3324                                 CkObjID *objID;
3325                                 SNToTicket **ticketRow = (SNToTicket **)iter->next((void **)&objID);
3326                                 p | (*objID);
3327                                 (*ticketRow)->pup(p);
3328                         }
3329                 }else{
3330                         for(int i=0;i<ticketTableSize;i++){
3331                                 CkObjID objID;
3332                                 p | objID;
3333                                 SNToTicket *ticketRow = new SNToTicket;
3334                                 ticketRow->pup(p);
3335                                 ticketTable.put(objID) = ticketRow;
3336                         }
3337                 }
3338         }
3339
3340
3341         
3342         
3343         if(p.isSizing()){
3344                 char name[40];
3345                 PUP::sizer *sizep = (PUP::sizer *)&p;
3346                 int pupSize = sizep->size()-startSize;
3347                 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
3348         //      CkAssert(pupSize <100000000);
3349         }
3350         
3351         double _finTime = CkWallTimer();
3352         DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
3353 };
3354
3355
3356 /**
3357         The method for returning the actual object pointed to by an id
3358         If the object doesnot exist on the processor it returns NULL
3359 **/
3360
3361 void* CkObjID::getObject(){
3362         
3363                 switch(type){
3364                         case TypeChare:
3365         
3366                                 return CkLocalChare(&data.chare.id);
3367                         case TypeGroup:
3368         
3369                                 CkAssert(data.group.onPE == CkMyPe());
3370                                 return CkLocalBranch(data.group.id);
3371                         case TypeNodeGroup:
3372                                 CkAssert(data.group.onPE == CkMyNode());
3373                                 //CkLocalNodeBranch(data.group.id);
3374                                 {
3375                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3376                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3377                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3378         
3379                                         return retval;
3380                                 }       
3381                         case TypeArray:
3382                                 {
3383         
3384         
3385                                         CkArrayID aid(data.array.id);
3386         
3387                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3388         
3389                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
3390         
3391                                         return aProxy.ckLocal();
3392                                 }
3393                         default:
3394                                 CkAssert(0);
3395                 }
3396 }
3397
3398
3399 int CkObjID::guessPE(){
3400                 switch(type){
3401                         case TypeChare:
3402                                 return data.chare.id.onPE;
3403                         case TypeGroup:
3404                         case TypeNodeGroup:
3405                                 return data.group.onPE;
3406                         case TypeArray:
3407                                 {
3408                                         CkArrayID aid(data.array.id);
3409                                         if(aid.ckLocalBranch() == NULL){
3410                                                 return -1;
3411                                         }
3412                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
3413                                 }
3414                         default:
3415                                 CkAssert(0);
3416                 }
3417 };
3418
3419 char *CkObjID::toString(char *buf) const {
3420         
3421         switch(type){
3422                 case TypeChare:
3423                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
3424                         break;
3425                 case TypeGroup:
3426                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
3427                         break;
3428                 case TypeNodeGroup:
3429                         sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
3430                         break;
3431                 case TypeArray:
3432                         {
3433                                 const CkArrayIndexMax &idx = data.array.idx.asMax();
3434                                 const int *indexData = idx.data();
3435                                 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
3436                                 break;
3437                         }
3438                 default:
3439                         CkAssert(0);
3440         }
3441         
3442         return buf;
3443 };
3444
3445 void CkObjID::updatePosition(int PE){
3446         if(guessPE() == PE){
3447                 return;
3448         }
3449         switch(type){
3450                 case TypeArray:
3451                         {
3452                                         CkArrayID aid(data.array.id);
3453                                         if(aid.ckLocalBranch() == NULL){
3454                                                 
3455                                         }else{
3456                                                 char str[100];
3457                                                 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
3458 //                                              CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
3459                                                 CkLocRec *rec = mgr->elementNrec(data.array.idx.asMax());
3460                                                 if(rec != NULL){
3461                                                         if(rec->type() == CkLocRec::local){
3462                                                                 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
3463                                                                 return;
3464                                                         }
3465                                                 }
3466                                                 mgr->inform(data.array.idx.asMax(),PE);
3467                                         }       
3468                                 }
3469
3470                         break;
3471                 case TypeChare:
3472                         CkAssert(data.chare.id.onPE == PE);
3473                         break;
3474                 case TypeGroup:
3475                 case TypeNodeGroup:
3476                         CkAssert(data.group.onPE == PE);
3477                         break;
3478                 default:
3479                         CkAssert(0);
3480         }
3481 }
3482
3483
3484
3485 void MlogEntry::pup(PUP::er &p){
3486         p | destPE;
3487         p | _infoIdx;
3488         p | unackedLocal;
3489         int size;
3490         if(!p.isUnpacking()){
3491 /*              CkAssert(env);
3492                 if(!env->isPacked()){
3493                         CkPackMessage(&env);
3494                 }*/
3495                 if(env == NULL){
3496                         //message was probably local and has been removed from logs
3497                         size = 0;
3498                 }else{
3499                         size = env->getTotalsize();
3500                 }       
3501         }
3502         p | size;
3503         if(p.isUnpacking()){
3504                 if(size > 0){
3505                         env = (envelope *)_allocEnv(ForChareMsg,size);