Included basic support for group message logging.
[charm.git] / src / ck-core / ckmessagelogging.C
1 #include "charm.h"
2 #include "ck.h"
3 #include "ckmessagelogging.h"
4 #include "queueing.h"
5 #include <sys/types.h>
6 #include <signal.h>
7 #include "CentralLB.h"
8
9 #ifdef _FAULT_MLOG_
10
11 //#define DEBUG(x)  if(_restartFlag) {x;}
12 #define DEBUG(x)  //x
13 #define DEBUGRESTART(x)  //x
14 #define DEBUGLB(x) // x
15
16 #define BUFFERED_LOCAL
17 #define BUFFERED_REMOTE
18
19 extern const char *idx2str(const CkArrayIndex &ind);
20 extern const char *idx2str(const ArrayElement *el);
21 const char *idx2str(const CkArrayIndexMax &ind){
22         return idx2str((const CkArrayIndex &)ind);
23 };
24
25 void getGlobalStep(CkGroupID gID);
26
27 bool fault_aware(CkObjID &recver);
28
29 int _restartFlag=0;
30 int restarted=0;
31
32 //GML: variables for measuring savings with groups in message logging
33 float MLOGFT_totalLogSize = 0.0;
34 float MLOGFT_totalMessages = 0.0;
35 float MLOGFT_totalObjects = 0.0;
36
37 //TODO: remove for perf runs
38 int countHashRefs=0; //count the number of gets
39 int countHashCollisions=0;
40
41
42 //#define CHECKPOINT_DISK
43 char *checkpointDirectory=".";
44 int unAckedCheckpoint=0;
45
46 int countLocal=0,countBuffered=0;
47 int countPiggy=0;
48 int countClearBufferedLocalCalls=0;
49
50 int countUpdateHomeAcks=0;
51
52 extern int chkptPeriod;
53 extern bool parallelRestart;
54
55 char *killFile;
56 int killFlag=0;
57 int restartingMlogFlag=0;
58 void readKillFile();
59 double killTime=0.0;
60 int checkpointCount=0;
61
62
63 CpvDeclare(Chare *,_currentObj);
64 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
65 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
66 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
67 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
68 CpvDeclare(Queue, _outOfOrderMessageQueue);
69 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
70 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
71 CpvDeclare(char **,_bufferedTicketRequests);
72 CpvDeclare(int *,_numBufferedTicketRequests);
73 CpvDeclare(char *,_bufferTicketReply);
74
75
76
77 static double adjustChkptPeriod=0.0; //in ms
78 static double nextCheckpointTime=0.0;//in seconds
79
80 double lastBufferedLocalMessageCopyTime;
81
82 int _maxBufferedMessages;
83 int _maxBufferedTicketRequests;
84 int BUFFER_TIME=2; // in ms
85
86
87 int _ticketRequestHandlerIdx;
88 int _ticketHandlerIdx;
89 int _localMessageCopyHandlerIdx;
90 int _localMessageAckHandlerIdx;
91 int _pingHandlerIdx;
92 int _bufferedLocalMessageCopyHandlerIdx;
93 int _bufferedLocalMessageAckHandlerIdx;
94 int _bufferedTicketRequestHandlerIdx;
95 int _bufferedTicketHandlerIdx;
96
97
98 char objString[100];
99 int _checkpointRequestHandlerIdx;
100 int _storeCheckpointHandlerIdx;
101 int _checkpointAckHandlerIdx;
102 int _getCheckpointHandlerIdx;
103 int _recvCheckpointHandlerIdx;
104 int _removeProcessedLogHandlerIdx;
105
106 int _verifyAckRequestHandlerIdx;
107 int _verifyAckHandlerIdx;
108 int _dummyMigrationHandlerIdx;
109
110
111 int     _getGlobalStepHandlerIdx;
112 int     _recvGlobalStepHandlerIdx;
113
114 int _updateHomeRequestHandlerIdx;
115 int _updateHomeAckHandlerIdx;
116 int _resendMessagesHandlerIdx;
117 int _resendReplyHandlerIdx;
118 int _receivedTNDataHandlerIdx;
119 int _distributedLocationHandlerIdx;
120
121
122 int verifyAckTotal;
123 int verifyAckCount;
124
125 int verifyAckedRequests=0;
126
127 RestartRequest *storedRequest;
128
129
130
131 int _falseRestart =0; /**
132                                                                                                         For testing on clusters we might carry out restarts on 
133                                                                                                         a porcessor without actually starting it
134                                                                                                         1 -> false restart
135                                                                                                         0 -> restart after an actual crash
136                                                                                                 */                                                                                                                              
137
138 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
139 int _lockNewTicket=0;
140
141
142 //Load balancing globals
143 int onGoingLoadBalancing=0;
144 void *centralLb;
145 void (*resumeLbFnPtr)(void *);
146 int _receiveMlogLocationHandlerIdx;
147 int _receiveMigrationNoticeHandlerIdx;
148 int _receiveMigrationNoticeAckHandlerIdx;
149 int _checkpointBarrierHandlerIdx;
150 int _checkpointBarrierAckHandlerIdx;
151
152 CkVec<MigrationRecord> migratedNoticeList;
153 CkVec<RetainedMigratedObject *> retainedObjectList;
154 int donotCountMigration=0;
155 int countLBMigratedAway=0;
156 int countLBToMigrate=0;
157 int migrationDoneCalled=0;
158 int checkpointBarrierCount=0;
159 int globalResumeCount=0;
160 CkGroupID globalLBID;
161 int restartDecisionNumber=-1;
162
163
164 double lastCompletedAlarm=0;
165 double lastRestart=0;
166
167
168 //update location globals
169 int _receiveLocationHandlerIdx;
170
171
172
173 // initialize message logging datastructures and register handlers
174 void _messageLoggingInit(){
175         //current object
176         CpvInitialize(Chare *,_currentObj);
177         
178         //registering handlers for message logging
179         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
180         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
181         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
182         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
183         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
184         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
185         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
186   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
187         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
188
189                 
190         //handlers for checkpointing
191         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
192         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
193         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
194         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
195
196
197         //handlers for restart
198         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
199         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
200         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
201         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
202         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
203         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
204         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
205         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
206         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
207         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
208         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
209
210         
211         //handlers for load balancing
212         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
213         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
214         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
215         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
216         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
217         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
218         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
219         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
220         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
221
222         
223         //handlers for updating locations
224         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
225         
226         //Cpv variables for message logging
227         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
228         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
229         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
230         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
231         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
232         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
233         CpvInitialize(Queue, _outOfOrderMessageQueue);
234         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
235         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
236         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
237         
238         CpvInitialize(char **,_bufferedTicketRequests);
239         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
240         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
241         for(int i=0;i<CkNumPes();i++){
242                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
243                 CpvAccess(_numBufferedTicketRequests)[i]=0;
244         }
245   CpvInitialize(char *,_bufferTicketReply);
246         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
247         
248 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
249         CcdCallFnAfter(retryTicketRequest,NULL,100);    
250         
251         
252         //Cpv variables for checkpoint
253         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
254         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
255         
256 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
257 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
258 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
259         if(CkMyPe() == 0){
260 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
261 #ifdef  BUFFERED_LOCAL
262                 if(CmiMyPe() == 0){
263                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
264                 }
265 #endif
266         }
267 #ifdef  BUFFERED_REMOTE
268         if(CmiMyPe() == 0){
269                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
270         }
271 #endif
272
273         traceRegisterUserEvent("Remove Logs", 20);
274         traceRegisterUserEvent("Ticket Request Handler", 21);
275         traceRegisterUserEvent("Ticket Handler", 22);
276         traceRegisterUserEvent("Local Message Copy Handler", 23);
277         traceRegisterUserEvent("Local Message Ack Handler", 24);        
278         traceRegisterUserEvent("Preprocess current message",25);
279         traceRegisterUserEvent("Preprocess past message",26);
280         traceRegisterUserEvent("Preprocess future message",27);
281         traceRegisterUserEvent("Checkpoint",28);
282         traceRegisterUserEvent("Checkpoint Store",29);
283         traceRegisterUserEvent("Checkpoint Ack",30);
284         
285         traceRegisterUserEvent("Send Ticket Request",31);
286         traceRegisterUserEvent("Generalticketrequest1",32);
287         traceRegisterUserEvent("TicketLogLocal",33);
288         traceRegisterUserEvent("next_ticket and SN",34);
289         traceRegisterUserEvent("Timeout for buffered remote messages",35);
290         traceRegisterUserEvent("Timeout for buffered local messages",36);
291         traceRegisterUserEvent("Inform Location Home",37);
292         traceRegisterUserEvent("Receive Location Handler",38);
293         
294         lastCompletedAlarm=CmiWallTimer();
295         lastRestart = CmiWallTimer();
296 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
297         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
298 //      printf("[%d] killFlag %d\n",CkMyPe(),killFlag);
299 //      if(killFlag){
300 //              readKillFile();
301 //      }
302 }
303
304 void killLocal(void *_dummy,double curWallTime);        
305
306 void readKillFile(){
307         FILE *fp=fopen(killFile,"r");
308         if(!fp){
309                 return;
310         }
311         int proc;
312         double sec;
313         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
314                 if(proc == CkMyPe()){
315                         killTime = CmiWallTimer()+sec;
316                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
317                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
318                 }
319         }
320         fclose(fp);
321 }
322
323 void killLocal(void *_dummy,double curWallTime){
324         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
325         if(CmiWallTimer()<killTime-1){
326                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
327         }else{  
328                 kill(getpid(),SIGKILL);
329         }
330 }
331
332
333
334 /************************ Message logging methods ****************/
335
336 // send a ticket request to a group on a processor
337 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
338         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
339                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
340                 void *origMsg = EnvToUsr(env);
341                 for(int i=0;i<CmiNumPes();i++){
342                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
343                                 void *copyMsg = CkCopyMsg(&origMsg);
344                                 envelope *copyEnv = UsrToEnv(copyMsg);
345                                 copyEnv->SN=0;
346                                 copyEnv->TN=0;
347                                 copyEnv->sender.type = TypeInvalid;
348                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
349                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
350                         }
351                 }
352                 return;
353         }
354         CkObjID recver;
355         recver.type = TypeGroup;
356         recver.data.group.id = env->getGroupNum();
357         recver.data.group.onPE = destPE;
358 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
359                 CmiPrintStackTrace(0);
360         }*/
361         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
362 }
363
364 //send a ticket request to a nodegroup
365 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
366         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
367                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
368                 void *origMsg = EnvToUsr(env);
369                 for(int i=0;i<CmiNumNodes();i++){
370                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
371                                 void *copyMsg = CkCopyMsg(&origMsg);
372                                 envelope *copyEnv = UsrToEnv(copyMsg);
373                                 copyEnv->SN=0;
374                                 copyEnv->TN=0;
375                                 copyEnv->sender.type = TypeInvalid;
376                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
377                         }
378                 }
379                 return;
380         }
381         CkObjID recver;
382         recver.type = TypeNodeGroup;
383         recver.data.group.id = env->getGroupNum();
384         recver.data.group.onPE = destNode;
385         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
386 }
387
388 //send a ticket request to an array element
389 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
390         CkObjID recver;
391         recver.type = TypeArray;
392         recver.data.array.id = env->getsetArrayMgr();
393         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
394
395         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
396                 char recverString[100],senderString[100];
397                 
398                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
399         }
400
401         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
402 };
403
404 //a method to generate the actual ticket requests for groups,nodegroups or arrays
405 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
406         envelope *env = _env;
407         int resend=0; //is it a resend
408         char recverName[100];
409         double _startTime=CkWallTimer();
410         
411         if(CpvAccess(_currentObj) == NULL){
412 //              CkAssert(0);
413                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
414                 generalCldEnqueue(destPE,env,_infoIdx);
415                 return;
416         }
417         
418         if(env->sender.type == TypeInvalid){
419                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
420                 //Set message logging data in the envelope
421         }else{
422                 envelope *copyEnv = copyEnvelope(env);
423                 env = copyEnv;
424                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
425                 env->SN = 0;
426         }
427         
428         
429         CkObjID &sender = env->sender;
430         env->recver = recver;
431
432         Chare *obj = (Chare *)env->sender.getObject();
433           
434         if(env->SN == 0){
435                 env->SN = obj->mlogData->nextSN(recver);
436         }else{
437                 resend = 1;
438         }
439         
440         
441         char senderString[100];
442 //      if(env->SN != 1){
443                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
444         //      CmiPrintStackTrace(0);
445 /*      }else{
446                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
447         }*/
448                 
449         MlogEntry * mEntry = new MlogEntry(env,destPE,_infoIdx);
450 //      CkPackMessage(&(mEntry->env));
451 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
452         
453         
454
455         _startTime = CkWallTimer();
456         if(destPE == CkMyPe()){
457                 ticketLogLocalMessage(mEntry);
458         }else{
459                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
460 //              traceUserBracketEvent(31,_startTime,CkWallTimer());                     
461         }
462 }
463
464 //method that does the actual send by creating a ticketrequest filling it up and sending it
465 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
466         char recverString[100],senderString[100];
467         envelope *env = entry->env;
468         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
469 /*      envelope *env = entry->env;
470         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
471
472         Chare *obj = (Chare *)entry->env->sender.getObject();
473         if(!resend){
474                 //GML: only stores message if either it goes to this processor or to a processor in a different group
475                 if(CkMyPe() != destPE && CkMyPe()/GROUP_SIZE_MLOG != destPE/GROUP_SIZE_MLOG){
476                         obj->mlogData->addLogEntry(entry);
477                         MLOGFT_totalMessages += 1.0;
478                         MLOGFT_totalLogSize += entry->env->getTotalsize();
479                 }
480         }
481         
482
483 #ifdef BUFFERED_REMOTE
484         //buffer the ticket request 
485         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
486                 //first message to this processor, buffer needs to be created
487                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
488                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
489                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
490         }
491         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
492         //Buffer the ticketrequests
493         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
494         ticketRequest->sender = sender;
495         ticketRequest->recver = recver;
496         ticketRequest->logEntry = entry;
497         ticketRequest->SN = SN;
498         ticketRequest->senderPE = CkMyPe();
499
500         CpvAccess(_numBufferedTicketRequests)[destPE]++;
501         
502         
503         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
504                 sendBufferedTicketRequests(destPE);
505         }else{
506                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
507                         int *checkPE = new int;
508                         *checkPE = destPE;
509                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
510                 }
511         }
512 #else
513
514         TicketRequest ticketRequest;
515         ticketRequest.sender = sender;
516         ticketRequest.recver = recver;
517         ticketRequest.logEntry = entry;
518         ticketRequest.SN = SN;
519         ticketRequest.senderPE = CkMyPe();
520         
521         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
522 //      CmiBecomeImmediate(&ticketRequest);
523         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
524 #endif
525         DEBUG(CmiMemoryCheck());
526 };
527
528 /**
529  * Send the ticket requests buffered for processor PE
530  **/
531 void sendBufferedTicketRequests(int destPE){
532         DEBUG(CmiMemoryCheck());
533         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
534         if(numberRequests == 0){
535                 return;
536         }
537         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
538         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
539         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
540         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
541         header->numberLogs = numberRequests;
542         
543         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
544         CmiSyncSend(destPE,totalSize,(char *)buf);
545         
546         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
547         DEBUG(CmiMemoryCheck());
548 };
549
550 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
551         int destPE = *(int *)_destPE;
552   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
553                 sendBufferedTicketRequests(destPE);
554 //              traceUserEvent(35);
555         }
556         delete (int *)_destPE;
557         DEBUG(CmiMemoryCheck());
558 };
559
560
561
562
563 //get a ticket for a local message and then send a copy to the buddy
564 void ticketLogLocalMessage(MlogEntry *entry){
565         //this method is always in the main thread(not interrupt).. so it should 
566         //never find itself locked out of a newTicket
567 //      CkAssert(_lockNewTicket==0);
568         double _startTime=CkWallTimer();
569         
570 //      _lockNewTicket=1;
571         
572                 DEBUG(CmiMemoryCheck());
573         
574
575         Chare *recverObj = (Chare *)entry->env->recver.getObject();
576         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
577         if(recverObj){
578                 //Consider the case, after a restart when this message has already been allotted a ticket number
579                 // and should get the same one as the old one.
580                 Ticket ticket;
581                 if(recverObj->mlogData->mapTable.numObjects() > 0){
582                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
583                 }else{
584                         ticket.TN = 0;
585                 }
586                 
587                 char senderString[100], recverString[100] ;
588                 
589                 if(ticket.TN == 0){
590                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
591         
592                         if(ticket.TN == 0){
593                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
594                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
595                                 
596         //              _lockNewTicket = 0;
597 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
598                                 return;
599                         }
600                 }       
601                 //TODO: check for the case when an invalid ticket is returned
602                 //TODO: check for OLD or RECEIVED TICKETS
603                 entry->env->TN = ticket.TN;
604                 CkAssert(entry->env->TN > 0);
605                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
606                 
607
608                 sendLocalMessageCopy(entry);
609                 
610                 
611                 DEBUG(CmiMemoryCheck());
612                 entry->unackedLocal = 1;
613                 DEBUG(CmiMemoryCheck());
614                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
615                 DEBUG(CmiMemoryCheck());
616         }else{
617                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
618         }
619         _lockNewTicket=0;
620 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
621 };
622
623
624 void sendLocalMessageCopy(MlogEntry *entry){
625         LocalMessageLog msgLog;
626         msgLog.sender = entry->env->sender;
627         msgLog.recver = entry->env->recver;
628         msgLog.SN = entry->env->SN;
629         msgLog.TN = entry->env->TN;
630         msgLog.entry = entry;
631         msgLog.PE = CkMyPe();
632         
633         char recvString[100];
634         char senderString[100];
635         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
636
637 #ifdef BUFFERED_LOCAL
638         countLocal++;
639         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
640         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
641                 sendBufferedLocalMessageCopy();
642         }else{
643                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
644                         lastBufferedLocalMessageCopyTime = CkWallTimer();
645                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
646                         countClearBufferedLocalCalls++;
647                 }       
648         }
649 #else   
650         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
651         
652         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
653 #endif
654         DEBUG(CmiMemoryCheck());
655 };
656
657
658 void sendBufferedLocalMessageCopy(){
659         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
660         if(numberLogs == 0){
661                 return;
662         }
663         countBuffered++;
664         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
665         void *buf=CmiAlloc(totalSize);
666         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
667         header->numberLogs=numberLogs;
668
669         DEBUG(CmiMemoryCheck());
670         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
671         
672         char *ptr = (char *)buf;
673         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
674         
675         for(int i=0;i<numberLogs;i++){
676                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
677                 memcpy(ptr,&log,sizeof(LocalMessageLog));
678                 ptr = &ptr[sizeof(LocalMessageLog)];
679         }
680
681         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
682
683         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
684         DEBUG(CmiMemoryCheck());
685 };
686
687 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
688         countClearBufferedLocalCalls--;
689         if(countClearBufferedLocalCalls > 10){
690                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
691         }
692         DEBUG(CmiMemoryCheck());
693         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
694         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
695                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
696                         sendBufferedLocalMessageCopy();
697 //                      traceUserEvent(36);
698                 }
699         }
700         DEBUG(CmiMemoryCheck());
701 }
702
703 /****
704         The handler functions
705 *****/
706
707
708 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
709 /**
710  *  If there are any delayed requests, process them first before 
711  *  processing this request
712  * */
713 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
714         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
715         double  _startTime = CkWallTimer();
716         if(CpvAccess(_delayedTicketRequests)->length() > 0){
717                 retryTicketRequest(NULL,_startTime);
718         }
719         _processTicketRequest(ticketRequest);
720         CmiFree(ticketRequest);
721 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
722 }
723 /** Handler used for dealing with a bunch of ticket requests
724  * from one processor. The replies are also bunched together
725  * Does not use _ticketRequestHandler
726  * */
727 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
728         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
729         DEBUG(CmiMemoryCheck());
730         double _startTime = CkWallTimer();
731         if(CpvAccess(_delayedTicketRequests)->length() > 0){
732                 retryTicketRequest(NULL,_startTime);
733         }
734         DEBUG(CmiMemoryCheck());
735   int numRequests = recvdHeader->numberLogs;
736         char *msg = (char *)recvdHeader;
737         msg = &msg[sizeof(BufferedTicketRequestHeader)];
738         int senderPE=((TicketRequest *)msg)->senderPE;
739
740         
741         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
742         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
743         
744         char *ptr = (char *)buf;
745         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
746         header->numberLogs = 0;
747
748         DEBUG(CmiMemoryCheck());
749         
750         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
751         
752         for(int i=0;i<numRequests;i++){
753                 TicketRequest *request = (TicketRequest *)msg;
754                 msg = &msg[sizeof(TicketRequest)];
755                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
756
757                 if(replied){
758                         //the ticket request has been processed and 
759                         //the reply will be stored in the ptr
760                         header->numberLogs++;
761                         ptr = &ptr[sizeof(TicketReply)];
762                 }
763         }
764 /*      if(header->numberLogs == 0){
765                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
766         }*/
767
768         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
769         CmiSyncSend(senderPE,totalSize,(char *)buf);
770         CmiFree(recvdHeader);
771 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
772         DEBUG(CmiMemoryCheck());
773 };
774
775 /**Process the ticket request. 
776  * If it is processed and a reply is being sent 
777  * by this processor return true
778  * else return false.
779  * If a reply buffer is specified put the reply into that
780  * else send the reply
781  * */
782 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
783
784 /*      if(_lockNewTicket){
785                 printf("ddeded %d\n",CkMyPe());
786                 if(CmiIsImmediate(ticketRequest)){
787                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
788                 }
789                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
790                 
791         }else{
792                 _lockNewTicket = 1;
793         }*/
794
795         DEBUG(CmiMemoryCheck());
796         CkObjID sender = ticketRequest->sender;
797         CkObjID recver = ticketRequest->recver;
798         MCount SN = ticketRequest->SN;
799         
800         
801         Chare *recverObj = (Chare *)recver.getObject();
802         
803         
804         char recverName[100];
805         DEBUG(recver.toString(recverName);)
806         if(recverObj == NULL){
807                 int estPE = recver.guessPE();
808                 if(estPE == CkMyPe() || estPE == -1){           
809                         //try to fulfill the request after some time
810                         char senderString[100];
811                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
812                         if(estPE == CkMyPe() && recver.type == TypeArray){
813                                 CkArrayID aid(recver.data.array.id);            
814                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
815                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
816
817                         }
818                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
819                         *delayed = *ticketRequest;
820                         CpvAccess(_delayedTicketRequests)->enq(delayed);
821                         
822                 }else{
823                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
824                         TicketRequest forward = *ticketRequest;
825                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
826                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
827                         
828                         
829                 }
830         DEBUG(CmiMemoryCheck());
831                 return false; // if the receverObj does not exist the ticket request cannot have been 
832                               // processed successfully
833         }else{
834                 char senderString[100];
835                 Ticket ticket;
836                 //check if a ticket for this has been already handed out to an object that used to be local but 
837                 // is no longer so.. need for parallel restart
838                 
839                 if(recverObj->mlogData->mapTable.numObjects() > 0){
840                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
841                 }
842                 
843                 if(ticket.TN == 0){
844                         ticket = recverObj->mlogData->next_ticket(sender,SN);
845                 }
846                 if(ticket.TN > recverObj->mlogData->tProcessed){
847                         ticket.state = NEW_TICKET;
848                 }else{
849                         ticket.state = OLD_TICKET;
850                 }
851                 //TODO: check for the case when an invalid ticket is returned
852                 if(ticket.TN == 0){
853                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
854                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
855                         *delayed = *ticketRequest;
856                         CpvAccess(_delayedTicketRequests)->enq(delayed);
857                         return false;
858                 }
859 /*              if(ticket.TN < SN){ //error state this really should not happen
860                         recver.toString(recverName);
861                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
862                 }*/
863 //              CkAssert(ticket.TN >= SN);
864                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
865
866 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
867     if(reply == NULL){ 
868                         //There is no reply buffer and the ticketreply is going to be 
869                         //sent immediately
870                         TicketReply ticketReply;
871                         ticketReply.request = *ticketRequest;
872                         ticketReply.ticket = ticket;
873                         ticketReply.recverPE = CkMyPe();
874                 
875                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
876 //              CmiBecomeImmediate(&ticketReply);
877                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
878          }else{ // Store ticket reply in the buffer provided
879                  reply->request = *ticketRequest;
880                  reply->ticket = ticket;
881                  reply->recverPE = CkMyPe();
882                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
883                                                          // in case the ticket needs to be forwarded or something
884
885          }
886                 DEBUG(CmiMemoryCheck());
887                 return true;
888         }
889 //      _lockNewTicket=0;
890 };
891
892 inline void _ticketHandler(TicketReply *ticketReply){
893
894         double _startTime = CkWallTimer();
895         DEBUG(CmiMemoryCheck());
896         
897         
898         char senderString[100];
899         CkObjID sender = ticketReply->request.sender;
900         CkObjID recver = ticketReply->request.recver;
901         
902         if(sender.guessPE() != CkMyPe()){               
903                 DEBUG(CkAssert(sender.guessPE()>= 0));
904                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
905         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
906                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
907                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
908 #ifdef BUFFERED_REMOTE
909                 //will be freed by the buffered ticket handler most of the time
910                 //this might lead to a leak just after migration
911                 //when the ticketHandler is directly used without going through the buffered handler
912                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
913 #else
914                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
915 #endif  
916         }else{
917                 char recverName[100];
918                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
919                 MlogEntry *logEntry=NULL;
920                 if(ticketReply->ticket.state & FORWARDED_TICKET){
921                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
922                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
923                         Chare *senderObj = (Chare *)sender.getObject();
924                         if(senderObj){
925                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
926                                 for(int i=0;i<mlog->length();i++){
927                                         MlogEntry *tempEntry = (*mlog)[i];
928                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
929                                                 logEntry = tempEntry;
930                                                 break;
931                                         }
932                                 }
933                                 if(logEntry == NULL){
934 #ifdef BUFFERED_REMOTE
935 #else
936                                         CmiFree(ticketReply);
937 #endif                                  
938                                         return;
939                                 }
940                         }else{
941                                 CmiAbort("This processor thinks it should have the sender\n");
942                         }
943                         ticketReply->ticket.state ^= FORWARDED_TICKET;
944                 }else{
945                         logEntry = ticketReply->request.logEntry;
946                 }
947                 if(logEntry->env->TN <= 0){
948                         //This logEntry has not received a TN earlier
949                         char recverString[100];
950                         logEntry->env->TN = ticketReply->ticket.TN;
951                         logEntry->env->setSrcPe(CkMyPe());
952                         if(ticketReply->ticket.state == NEW_TICKET){
953                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
954                                 if(ticketReply->recverPE != CkMyPe()){
955                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
956                                 }else{
957                                         //It is now a local message use the local message protocol
958                                         sendLocalMessageCopy(logEntry);
959                                 }               
960                         }
961                 }else{
962                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
963                 }
964                 recver.updatePosition(ticketReply->recverPE);
965 #ifdef BUFFERED_REMOTE
966 #else
967                 CmiFree(ticketReply);
968 #endif
969         }
970         CmiMemoryCheck();
971 #ifdef BUFFERED_REMOTE
972 #else
973 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
974 #endif
975         
976 };
977
978 /**
979  * Message to handle the bunch of tickets 
980  * that we get from one processor. We send 
981  * the tickets to be handled one at a time
982  * */
983
984 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
985         double _startTime=CmiWallTimer();
986         int numTickets = recvdHeader->numberLogs;
987         char *msg = (char *)recvdHeader;
988         msg = &msg[sizeof(BufferedTicketRequestHeader)];
989         DEBUG(CmiMemoryCheck());
990         
991         TicketReply *_reply = (TicketReply *)msg;
992
993         
994         for(int i=0;i<numTickets;i++){
995                 TicketReply *reply = (TicketReply *)msg;
996                 _ticketHandler(reply);
997                 
998                 msg = &msg[sizeof(TicketReply)];
999         }
1000         
1001         CmiFree(recvdHeader);
1002 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1003         DEBUG(CmiMemoryCheck());
1004 };
1005
1006
1007 void _localMessageCopyHandler(LocalMessageLog *msgLog){
1008         double _startTime = CkWallTimer();
1009         
1010         char senderString[100],recverString[100];
1011         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1012 /*      if(!fault_aware(msgLog->recver)){
1013                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1014         }*/
1015         CpvAccess(_localMessageLog)->enq(*msgLog);
1016         
1017         LocalMessageLogAck ack;
1018         ack.entry = msgLog->entry;
1019         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1020         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1021         CmiSyncSend(msgLog->PE,sizeof(LocalMessageLogAck),(char *)&ack);
1022         
1023 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1024 };
1025
1026 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1027         double _startTime = CkWallTimer();
1028         DEBUG(CmiMemoryCheck());
1029         
1030         int numLogs = recvdHeader->numberLogs;
1031         char *msg = (char *)recvdHeader;
1032
1033         //piggy back the logs already stored on this processor
1034         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1035         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1036 /*      if(numPiggyLogs > 0){
1037                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1038                         CmiAssert(0);
1039                 }
1040         }*/
1041         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1042         
1043         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1044         void *buf = CmiAlloc(totalSize);
1045         char *ptr = (char *)buf;
1046         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1047         
1048         msg = &msg[sizeof(BufferedLocalLogHeader)];
1049         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1050
1051         DEBUG(CmiMemoryCheck());
1052         int PE;
1053         for(int i=0;i<numLogs;i++){
1054                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1055                 CpvAccess(_localMessageLog)->enq(*msgLog);
1056                 PE = msgLog->PE;
1057                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1058
1059                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1060                 ack->entry = msgLog->entry;
1061                 
1062                 msg = &msg[sizeof(LocalMessageLog)];
1063                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1064         }
1065         DEBUG(CmiMemoryCheck());
1066
1067         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1068         piggyHeader->numberLogs = numPiggyLogs;
1069         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1070         if(numPiggyLogs > 0){
1071                 countPiggy++;
1072         }
1073
1074         for(int i=0;i<numPiggyLogs;i++){
1075                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1076                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1077                 ptr = &ptr[sizeof(LocalMessageLog)];
1078         }
1079         DEBUG(CmiMemoryCheck());
1080         
1081         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1082         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1083                 
1084 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1085                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1086                         if(!fault_aware(localLogEntry.recver)){
1087                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1088                         }
1089         }*/
1090         if(freeHeader){
1091                 CmiFree(recvdHeader);
1092         }
1093         DEBUG(CmiMemoryCheck());
1094 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1095 }
1096
1097
1098 void _localMessageAckHandler(LocalMessageLogAck *ack){
1099         
1100         double _startTime = CkWallTimer();
1101         
1102         MlogEntry *entry = ack->entry;
1103         if(entry == NULL){
1104                 CkExit();
1105         }
1106         entry->unackedLocal = 0;
1107         envelope *env = entry->env;
1108         char recverName[100];
1109         char senderString[100];
1110         DEBUG(CmiMemoryCheck());
1111         
1112         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1113         if(env == NULL)
1114                 return;
1115         CkAssert(env->SN > 0);
1116         CkAssert(env->TN > 0);
1117         env->sender.toString(senderString);
1118         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1119         env->recver.toString(recverName);
1120
1121         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1122         
1123 /*      
1124         void *origMsg = EnvToUsr(env);
1125         void *copyMsg = CkCopyMsg(&origMsg);
1126         envelope *copyEnv = UsrToEnv(copyMsg);
1127         entry->env = UsrToEnv(origMsg);*/
1128
1129 //      envelope *copyEnv = copyEnvelope(env);
1130
1131         envelope *copyEnv = env;
1132         copyEnv->localMlogEntry = entry;
1133
1134         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1135         if(CmiMyPe() != entry->destPE){
1136                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1137         }
1138 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1139         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1140         
1141         
1142 #ifdef BUFFERED_LOCAL
1143 #else
1144         CmiFree(ack);
1145 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1146 #endif
1147         
1148         DEBUG(CmiMemoryCheck());
1149         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1150 }
1151
1152
1153 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1154
1155         double _startTime=CkWallTimer();
1156         DEBUG(CmiMemoryCheck());
1157
1158         int numLogs = recvdHeader->numberLogs;
1159         char *msg = (char *)recvdHeader;
1160         msg = &msg[sizeof(BufferedLocalLogHeader)];
1161
1162         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1163         
1164         for(int i=0;i<numLogs;i++){
1165                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1166                 _localMessageAckHandler(ack);
1167                 
1168                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1169         }
1170
1171         //deal with piggy backed local logs
1172         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1173         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1174         if(piggyHeader->numberLogs > 0){
1175                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1176         }
1177         
1178         CmiFree(recvdHeader);
1179         DEBUG(CmiMemoryCheck());
1180 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1181 }
1182
1183
1184
1185
1186
1187
1188 bool fault_aware(CkObjID &recver){
1189         switch(recver.type){
1190                 case TypeChare:
1191                         return false;
1192                 case TypeGroup:
1193                 case TypeNodeGroup:
1194                 case TypeArray:
1195                         return true;
1196                 default:
1197                         return false;
1198         }
1199 };
1200
1201 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1202         char recverString[100];
1203         char senderString[100];
1204         
1205         DEBUG(CmiMemoryCheck());
1206         CkObjID recver = env->recver;
1207         if(!fault_aware(recver))
1208                 return 1;
1209
1210
1211         Chare *obj = (Chare *)recver.getObject();
1212         *objPointer = obj;
1213         if(obj == NULL){
1214                 int possiblePE = recver.guessPE();
1215                 if(possiblePE != CkMyPe()){
1216                         int totalSize = env->getTotalsize();                    
1217                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1218                 }
1219                 return 0;
1220         }
1221
1222
1223         double _startTime = CkWallTimer();
1224 //env->sender.updatePosition(env->getSrcPe());
1225         if(env->TN == obj->mlogData->tProcessed+1){
1226                 //the message that needs to be processed now
1227                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1228                 // once we find a message that we can process we put back all the messages in the out of order queue
1229                 // back into the main scheduler queue. 
1230                 if(env->sender.guessPE() == CkMyPe()){
1231                         *logEntryPointer = env->localMlogEntry;
1232                 }
1233         DEBUG(CmiMemoryCheck());
1234                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1235                         void *qMsgPtr;
1236                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1237                         envelope *qEnv = (envelope *)qMsgPtr;
1238                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1239         DEBUG(CmiMemoryCheck());
1240                 }
1241 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1242                 //TODO: this might be a problem.. change made for leanMD
1243 //              CpvAccess(_currentObj) = obj;
1244         DEBUG(CmiMemoryCheck());
1245                 return 1;
1246         }
1247         if(env->TN <= obj->mlogData->tProcessed){
1248                 //message already processed
1249                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1250 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1251         DEBUG(CmiMemoryCheck());
1252                 return 0;
1253         }
1254         //message that needs to be processed in the future
1255
1256 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1257         //the message cant be processed now put it back in the out of order message Q, 
1258         //It will be transferred to the main queue later
1259         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1260 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1261         DEBUG(CmiMemoryCheck());
1262         
1263         return 0;
1264 }
1265
1266 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1267         char senderString[100];
1268         if(obj){
1269                 if(sender.guessPE() == CkMyPe()){
1270                         if(entry != NULL){
1271                                 entry->env = NULL;
1272                         }
1273                 }
1274                 obj->mlogData->tProcessed++;
1275 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1276                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1277 //              CpvAccess(_currentObj)= NULL;
1278         }
1279         DEBUG(CmiMemoryCheck());
1280 }
1281
1282 /***
1283         Helpers for the handlers and message logging methods
1284 ***/
1285
1286 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1287 //      double _startTime = CkWallTimer();
1288         if(env->recver.type != TypeNodeGroup){
1289         //This repeats a step performed in skipCldEnq for messages sent to
1290         //other processors. I do this here so that messages to local processors
1291         //undergo the same transformation.. It lets the resend be uniform for 
1292         //all messages
1293 //              CmiSetXHandler(env,CmiGetHandler(env));
1294                 _skipCldEnqueue(destPE,env,_infoIdx);
1295         }else{
1296                 _noCldNodeEnqueue(destPE,env);
1297         }
1298 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1299 }
1300 //extern "C" int CmiGetNonLocalLength();
1301 /** This method is used to retry the ticket requests
1302  * that had been queued up earlier
1303  * */
1304
1305 int calledRetryTicketRequest=0;
1306
1307 void retryTicketRequestTimer(void *_dummy,double _time){
1308                 calledRetryTicketRequest=0;
1309                 retryTicketRequest(_dummy,_time);
1310 }
1311
1312 void retryTicketRequest(void *_dummy,double curWallTime){       
1313         double start = CkWallTimer();
1314         DEBUG(CmiMemoryCheck());
1315         int length = CpvAccess(_delayedTicketRequests)->length();
1316         for(int i=0;i<length;i++){
1317                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1318                 if(ticketRequest){
1319                         char senderString[100],recverString[100];
1320                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1321                         DEBUG(CmiMemoryCheck());
1322                         _processTicketRequest(ticketRequest);
1323                   CmiFree(ticketRequest);
1324                         DEBUG(CmiMemoryCheck());
1325                 }       
1326         }       
1327         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1328                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1329                 ticketLogLocalMessage(entry);
1330         }
1331         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1332 //      int converse_qLength = CmiGetNonLocalLength();
1333         
1334 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1335
1336 /*      PingMsg pingMsg;
1337         pingMsg.PE = CkMyPe();
1338         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1339         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1340                 for(int i=0;i<CkNumPes();i++){
1341                         if(i != CkMyPe()){
1342                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1343                         }
1344                 }
1345         }*/     
1346         //TODO: change this back to 100
1347         if(calledRetryTicketRequest == 0){
1348                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1349                 calledRetryTicketRequest =1;
1350         }
1351         DEBUG(CmiMemoryCheck());
1352 }
1353
1354 void _pingHandler(CkPingMsg *msg){
1355         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1356         CmiFree(msg);
1357 }
1358
1359
1360 /*****************************************************************************
1361         Checkpointing methods..
1362                 Pack all the data on a processor and send it to the buddy periodically
1363                 Also used to throw away message logs
1364 *****************************************************************************/
1365 CkVec<TProcessedLog> processedTicketLog;
1366 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1367 void clearUpMigratedRetainedLists(int PE);
1368
1369 void checkpointAlarm(void *_dummy,double curWallTime){
1370         double diff = curWallTime-lastCompletedAlarm;
1371         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1372 /*      if(CkWallTimer()-lastRestart < 50){
1373                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1374                 return;
1375         }*/
1376         if(diff < ((chkptPeriod) - 2)){
1377                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1378                 return;
1379         }
1380         CheckpointRequest request;
1381         request.PE = CkMyPe();
1382         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1383         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1384 };
1385
1386 void _checkpointRequestHandler(CheckpointRequest *request){
1387         startMlogCheckpoint(NULL,CmiWallTimer());       
1388 }
1389
1390 void startMlogCheckpoint(void *_dummy,double curWallTime){
1391         double _startTime = CkWallTimer();
1392         checkpointCount++;
1393 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1394                 kill(getpid(),SIGKILL);
1395         }*/
1396         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1397                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1398         }
1399         PUP::sizer psizer;
1400         DEBUG(CmiMemoryCheck());
1401
1402         psizer | checkpointCount;
1403         
1404         CkPupROData(psizer);
1405         DEBUG(CmiMemoryCheck());
1406         CkPupGroupData(psizer);
1407         DEBUG(CmiMemoryCheck());
1408         CkPupNodeGroupData(psizer);
1409         DEBUG(CmiMemoryCheck());
1410         pupArrayElementsSkip(psizer,NULL);
1411         DEBUG(CmiMemoryCheck());
1412
1413         int dataSize = psizer.size();
1414         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1415         char *msg = (char *)CmiAlloc(totalSize);
1416         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1417         chkMsg->PE = CkMyPe();
1418         chkMsg->dataSize = dataSize;
1419
1420         
1421         char *buf = &msg[sizeof(CheckPointDataMsg)];
1422         PUP::toMem pBuf(buf);   
1423
1424         pBuf | checkpointCount;
1425         
1426         CkPupROData(pBuf);
1427         CkPupGroupData(pBuf);
1428         CkPupNodeGroupData(pBuf);
1429         pupArrayElementsSkip(pBuf,NULL);
1430
1431         unAckedCheckpoint=1;
1432         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1433         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1434         
1435         /*
1436                 Store the highest Ticket number processed for each chare on this processor
1437         */
1438         processedTicketLog.removeAll();
1439         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1440         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1441                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1442         }
1443
1444         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1445                 lastCompletedAlarm = curWallTime;
1446                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1447         }
1448         traceUserBracketEvent(28,_startTime,CkWallTimer());
1449 };
1450
1451 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1452         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1453         TProcessedLog logEntry;
1454         logEntry.recver = mlogData->objID;
1455         logEntry.tProcessed = mlogData->tProcessed;
1456         log->push_back(logEntry);
1457         char objString[100];
1458         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1459 }
1460
1461 class ElementPacker : public CkLocIterator {
1462 private:
1463         CkLocMgr *locMgr;
1464         PUP::er &p;
1465 public:
1466                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1467                 void addLocation(CkLocation &loc) {
1468                         CkArrayIndexMax idx=loc.getIndex();
1469                         CkGroupID gID = locMgr->ckGetGroupID();
1470                         p|gID;      // store loc mgr's GID as well for easier restore
1471                         p|idx;
1472                         p|loc;
1473     }
1474 };
1475
1476
1477 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1478         int numElements,i;
1479   int numGroups = CkpvAccess(_groupIDTable)->size();    
1480         if(!p.isUnpacking()){
1481                 numElements = CkCountArrayElements();
1482         }       
1483         p | numElements;
1484         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1485         if(!p.isUnpacking()){
1486                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1487         }else{
1488                 //Flush all recs of all LocMgrs before putting in new elements
1489 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1490                 for(int j=0;j<listsize;j++){
1491                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1492                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1493                                 listToSkip[j].idx.print();
1494                         }
1495                 }
1496                 
1497  printf("numElements = %d\n",numElements);
1498         
1499           for (int i=0; i<numElements; i++) {
1500                         CkGroupID gID;
1501                         CkArrayIndexMax idx;
1502                         p|gID;
1503             p|idx;
1504                         int flag=0;
1505                         int matchedIdx=0;
1506                         for(int j=0;j<listsize;j++){
1507                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1508                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1509                                                 matchedIdx = j;
1510                                                 flag = 1;
1511                                                 break;
1512                                         }
1513                                 }
1514                         }
1515                         if(flag == 1){
1516                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1517                         }else{
1518                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1519                         }
1520                                 
1521                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1522                         mgr->resume(idx,p,flag);
1523                         if(flag == 1){
1524                                 int homePE = mgr->homePe(idx);
1525                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1526                         }
1527           }
1528         }
1529 };
1530
1531
1532 void writeCheckpointToDisk(int size,char *chkpt){
1533         char fNameTemp[100];
1534         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1535         int fd = creat(fNameTemp,S_IRWXU);
1536         int ret = write(fd,chkpt,size);
1537         CkAssert(ret == size);
1538         close(fd);
1539         
1540         char fName[100];
1541         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1542         unlink(fName);
1543
1544         rename(fNameTemp,fName);
1545         
1546 }
1547
1548 //handler that receives the checkpoint from a processor
1549 //it stores it and acks it
1550 void _storeCheckpointHandler(char *msg){
1551         
1552         double _startTime=CkWallTimer();
1553                 
1554         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1555         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1556         
1557         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1558         
1559         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1560         if(oldChkpt != NULL){
1561                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1562                 CmiFree(oldmsg);
1563         }
1564         //turning off storing checkpoints
1565         
1566         int sendingPE = chkMsg->PE;
1567         
1568         CpvAccess(_storedCheckpointData)->buf = chkpt;
1569         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1570         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1571
1572 #ifdef CHECKPOINT_DISK
1573         //store the checkpoint on disk
1574         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1575         CpvAccess(_storedCheckpointData)->buf = NULL;
1576         CmiFree(msg);
1577 #endif
1578
1579         int count=0;
1580         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1581                 if(migratedNoticeList[j].fromPE == sendingPE){
1582                         migratedNoticeList[j].ackFrom = 1;
1583                 }else{
1584                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1585                 }
1586                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1587                         migratedNoticeList.remove(j);
1588                         count++;
1589                 }
1590                 
1591         }
1592         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1593         
1594         CheckPointAck ackMsg;
1595         ackMsg.PE = CkMyPe();
1596         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1597         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1598         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1599         
1600         
1601         
1602         traceUserBracketEvent(29,_startTime,CkWallTimer());
1603 };
1604
1605
1606 void sendRemoveLogRequests(){
1607         double _startTime = CkWallTimer();      
1608         //send out the messages asking senders to throw away message logs below a certain ticket number
1609         /*
1610                 The remove log request message looks like
1611                 |RemoveLogRequest||List of TProcessedLog|
1612         */
1613         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1614         char *requestMsg = (char *)CmiAlloc(totalSize);
1615         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1616         request->PE = CkMyPe();
1617         request->numberObjects = processedTicketLog.size();
1618         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1619         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1620         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1621         
1622         DEBUG(CmiMemoryCheck());
1623         for(int i=0;i<CkNumPes();i++){
1624                 CmiSyncSend(i,totalSize,requestMsg);
1625         }
1626         CmiFree(requestMsg);
1627
1628         clearUpMigratedRetainedLists(CmiMyPe());
1629         //TODO: clear ticketTable
1630         
1631         traceUserBracketEvent(30,_startTime,CkWallTimer());
1632         DEBUG(CmiMemoryCheck());
1633 }
1634
1635
1636 void _checkpointAckHandler(CheckPointAck *ackMsg){
1637         DEBUG(CmiMemoryCheck());
1638         unAckedCheckpoint=0;
1639         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1640         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1641         if(onGoingLoadBalancing){
1642                 onGoingLoadBalancing = 0;
1643                 finishedCheckpointLoadBalancing();
1644         }else{
1645                 sendRemoveLogRequests();
1646         }
1647         CmiFree(ackMsg);
1648         
1649 };
1650
1651 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1652         DEBUG(CmiMemoryCheck());
1653         CmiMemoryCheck();
1654         char *data = (char *)_data;
1655         RemoveLogRequest *request = (RemoveLogRequest *)data;
1656         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1657         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1658
1659         int count=0;
1660         for(int i=0;i<mlog->length();i++){
1661                 MlogEntry *logEntry = mlog->deq();
1662                 int match=0;
1663                 for(int j=0;j<request->numberObjects;j++){
1664                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1665                                 //this log Entry should be removed
1666                                 match = 1;
1667                                 break;
1668                         }
1669                 }
1670                 char senderString[100],recverString[100];
1671 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1672                 if(match){
1673                         count++;
1674                         delete logEntry;
1675                 }else{
1676                         mlog->enq(logEntry);
1677                 }
1678         }
1679         if(count > 0){
1680                 char nameString[100];
1681                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1682         }
1683         DEBUG(CmiMemoryCheck());
1684         CmiMemoryCheck();
1685 };
1686
1687 void _removeProcessedLogHandler(char *requestMsg){
1688         double start = CkWallTimer();
1689         forAllCharesDo(removeProcessedLogs,requestMsg);
1690         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1691         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1692         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1693         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1694         if(request->PE == getCheckPointPE()){
1695                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1696                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1697                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1698                 int count=0;
1699 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1700                 DEBUG(char nameString[100];)
1701                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1702                 DEBUG(})*/
1703                 for(int i=0;i<localQ->length();i++){
1704                         LocalMessageLog localLogEntry = (*localQ)[i];
1705                         if(!fault_aware(localLogEntry.recver)){
1706                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1707                         }
1708                         bool keep = true;
1709                         for(int j=0;j<request->numberObjects;j++){                              
1710                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1711                                         keep = false;
1712                                         break;
1713                                 }
1714                         }       
1715                         if(keep){
1716                                 tempQ->enq(localLogEntry);
1717                         }else{
1718                                 count++;
1719                         }
1720                 }
1721                 delete localQ;
1722                 CpvAccess(_localMessageLog) = tempQ;
1723                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1724         }
1725
1726         /*
1727                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1728         */
1729         CmiMemoryCheck();
1730         clearUpMigratedRetainedLists(request->PE);
1731         
1732         traceUserBracketEvent(20,start,CkWallTimer());
1733         CmiFree(requestMsg);    
1734 };
1735
1736
1737 void clearUpMigratedRetainedLists(int PE){
1738         int count=0;
1739         CmiMemoryCheck();
1740         
1741         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1742                 if(migratedNoticeList[j].toPE == PE){
1743                         migratedNoticeList[j].ackTo = 1;
1744                 }
1745                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1746                         migratedNoticeList.remove(j);
1747                         count++;
1748                 }
1749         }
1750         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1751         
1752         for(int j=retainedObjectList.size()-1;j>=0;j--){
1753                 if(retainedObjectList[j]->migRecord.toPE == PE){
1754                         RetainedMigratedObject *obj = retainedObjectList[j];
1755                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1756                         retainedObjectList.remove(j);
1757                         if(obj->msg != NULL){
1758                                 CmiMemoryCheck();
1759                                 CmiFree(obj->msg);
1760                         }
1761                         delete obj;
1762                 }
1763         }
1764 }
1765
1766 /***************************************************************
1767         Restart Methods and handlers
1768 ***************************************************************/        
1769
1770 /**
1771  * Function for restarting an object with message logging
1772  */
1773 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1774         printf("[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1775         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1776         _restartFlag = 1;
1777         RestartRequest msg;
1778         msg.PE = CkMyPe();
1779         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1780         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1781 };
1782
1783 void CkMlogRestartDouble(void *,double){
1784         CkMlogRestart(NULL,NULL);
1785 };
1786
1787 //GML: restarting from local (group) failure
1788 void CkMlogRestartLocal(){
1789     CkMlogRestart(NULL,NULL);
1790 };
1791
1792
1793 void readCheckpointFromDisk(int size,char *buf){
1794         char fName[100];
1795         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1796
1797         int fd = open(fName,O_RDONLY);
1798         int count=0;
1799         while(count < size){
1800                 count += read(fd,&buf[count],size-count);
1801         }
1802         close(fd);
1803         
1804 };
1805
1806
1807 void sendCheckpointData();
1808
1809 void _getCheckpointHandler(RestartRequest *restartMsg){
1810         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1811         CkAssert(restartMsg->PE == storedChkpt->PE);
1812         storedRequest = restartMsg;
1813         
1814         verifyAckTotal = 0;
1815         for(int i=0;i<migratedNoticeList.size();i++){
1816                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1817 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1818                         if(migratedNoticeList[i].ackFrom == 0){
1819                                 //need to verify if the object actually exists .. it might not
1820                                 //have been acked but it might exist on it
1821                                 VerifyAckMsg msg;
1822                                 msg.migRecord = migratedNoticeList[i];
1823                                 msg.index = i;
1824                                 msg.fromPE = CmiMyPe();
1825                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1826                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1827                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1828                                 verifyAckTotal++;
1829                         }
1830                 }
1831         }
1832         
1833         if(verifyAckTotal == 0){
1834                 sendCheckpointData();
1835         }
1836         verifyAckCount = 0;
1837 }
1838
1839
1840 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1841         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1842         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1843         if(rec != NULL && rec->type() == CkLocRec::local){
1844                         //this location exists on this processor
1845                         //and needs to be removed       
1846                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1847                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1848                         
1849                         int localIdx = localRec->getLocalIndex();
1850                         LBDatabase *lbdb = localRec->getLBDB();
1851                         LDObjHandle ldHandle = localRec->getLdHandle();
1852                                 
1853                         locMgr->setDuringMigration(true);
1854                         
1855                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1856                         lbdb->UnregisterObj(ldHandle);
1857                         
1858                         locMgr->setDuringMigration(false);
1859                         
1860                         verifyAckedRequests++;
1861
1862         }
1863         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1864         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1865 };
1866
1867
1868 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1869         int index =     verifyReply->index;
1870         migratedNoticeList[index] = verifyReply->migRecord;
1871         verifyAckCount++;
1872         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1873         if(verifyAckCount == verifyAckTotal){
1874                 sendCheckpointData();
1875         }
1876 }
1877
1878
1879
1880 void sendCheckpointData(){      
1881         RestartRequest *restartMsg = storedRequest;
1882         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
1883         int numMigratedAwayElements = migratedNoticeList.size();
1884         if(migratedNoticeList.size() != 0){
1885                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1886 //                      CkAssert(migratedNoticeList.size() == 0);
1887         }
1888         
1889         
1890         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1891         
1892         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1893         
1894         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
1895         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
1896         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1897         
1898         char *msg = (char *)CmiAlloc(totalSize);
1899         
1900         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1901         dataMsg->PE = CkMyPe();
1902         dataMsg->restartWallTime = CmiTimer();
1903         dataMsg->checkPointSize = storedChkpt->bufSize;
1904         
1905         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1906 //      dataMsg->numMigratedAwayElements = 0;
1907         
1908         dataMsg->numMigratedInElements = 0;
1909         dataMsg->migratedElementSize = 0;
1910         dataMsg->lbGroupID = globalLBID;
1911         /*msg layout 
1912                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1913                 Local MessageLog|
1914         */
1915         //store checkpoint data
1916         char *buf = &msg[sizeof(RestartProcessorData)];
1917
1918         if(dataMsg->numMigratedAwayElements != 0){
1919                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1920                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1921         }
1922         
1923
1924 #ifdef CHECKPOINT_DISK
1925         readCheckpointFromDisk(storedChkpt->bufSize,buf);
1926 #else   
1927         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1928 #endif
1929         buf = &buf[storedChkpt->bufSize];
1930
1931
1932         //store localmessage Log
1933         dataMsg->numLocalMessages = localMsgQ->length();
1934         for(int i=0;i<localMsgQ->length();i++){
1935                 if(!fault_aware(((*localMsgQ)[i]).recver )){
1936                         CmiAbort("Non fault aware localMsgQ");
1937                 }
1938                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
1939                 buf = &buf[sizeof(LocalMessageLog)];
1940         }
1941         
1942         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1943         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1944         CmiFree(restartMsg);
1945 };
1946
1947
1948 // this list is used to create a vector of the object ids of all
1949 //the chares on this processor currently and the highest TN processed by them 
1950 //the first argument is actually a CkVec<TProcessedLog> *
1951 void createObjIDList(void *data,ChareMlogData *mlogData){
1952         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
1953         TProcessedLog entry;
1954         entry.recver = mlogData->objID;
1955         entry.tProcessed = mlogData->tProcessed;
1956         list->push_back(entry);
1957         char objString[100];
1958         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
1959 }
1960
1961
1962
1963 void _recvCheckpointHandler(char *_restartData){
1964         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1965         MigrationRecord *migratedAwayElements;
1966
1967         globalLBID = restartData->lbGroupID;
1968         
1969         restartData->restartWallTime *= 1000;
1970         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1971         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1972         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1973
1974         
1975         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1976         char *buf = &_restartData[sizeof(RestartProcessorData)];
1977         
1978         if(restartData->numMigratedAwayElements != 0){
1979                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1980                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1981                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1982                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1983         }
1984         
1985         PUP::fromMem pBuf(buf);
1986
1987         pBuf | checkpointCount;
1988
1989         CkPupROData(pBuf);
1990         CkPupGroupData(pBuf);
1991         CkPupNodeGroupData(pBuf);
1992 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1993         pupArrayElementsSkip(pBuf,NULL);
1994         CkAssert(pBuf.size() == restartData->checkPointSize);
1995         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1996         
1997         forAllCharesDo(initializeRestart,NULL);
1998         
1999         //store the restored local message log in a vector
2000         buf = &buf[restartData->checkPointSize];        
2001         for(int i=0;i<restartData->numLocalMessages;i++){
2002                 LocalMessageLog logEntry;
2003                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2004                 
2005                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2006                 if(recverObj!=NULL){
2007                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2008                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2009                         char senderString[100];
2010                         char recverString[100];
2011                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2012                 }else{
2013 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2014                 }
2015                 buf = &buf[sizeof(LocalMessageLog)];
2016         }
2017
2018         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2019
2020         CmiFree(_restartData);
2021         
2022         
2023         _initDone();
2024
2025         getGlobalStep(globalLBID);
2026         
2027         countUpdateHomeAcks = 0;
2028         RestartRequest updateHomeRequest;
2029         updateHomeRequest.PE = CmiMyPe();
2030         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2031         for(int i=0;i<CmiNumPes();i++){
2032                 if(i != CmiMyPe()){
2033                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2034                 }
2035         }
2036
2037 }
2038
2039 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2040         countUpdateHomeAcks++;
2041         CmiFree(updateHomeAck);
2042         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2043         if(countUpdateHomeAcks != CmiNumPes()){
2044                 return;
2045         }
2046
2047         // Send out the request to resend logged messages to all other processors
2048         CkVec<TProcessedLog> objectVec;
2049         forAllCharesDo(createObjIDList, (void *)&objectVec);
2050         int numberObjects = objectVec.size();
2051         
2052         /*
2053                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2054         */
2055         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2056         char *resendMsg = (char *)CmiAlloc(totalSize);
2057         
2058
2059         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2060         resendReq->PE =CkMyPe(); 
2061         resendReq->numberObjects = numberObjects;
2062         char *objList = &resendMsg[sizeof(ResendRequest)];
2063         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2064         
2065
2066         
2067
2068         /* test for parallel restart migrate away object**/
2069         if(parallelRestart){
2070                 distributeRestartedObjects();
2071                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2072         }
2073         
2074         /*      To make restart work for load balancing.. should only
2075         be used when checkpoint happens along with load balancing
2076         **/
2077 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2078
2079         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2080         CpvAccess(_currentObj) = lb;
2081         lb->ReceiveDummyMigration(restartDecisionNumber);
2082
2083         sleep(10);
2084         
2085         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2086         for(int i=0;i<CkNumPes();i++){
2087                 if(i != CkMyPe()){
2088                         CmiSyncSend(i,totalSize,resendMsg);
2089                 }       
2090         }
2091         _resendMessagesHandler(resendMsg);
2092         CmiFree(resendMsg);
2093 };
2094
2095 void initializeRestart(void *data,ChareMlogData *mlogData){
2096         mlogData->resendReplyRecvd = 0;
2097         mlogData->receivedTNs = new CkVec<MCount>;
2098         mlogData->restartFlag = 1;
2099         mlogData->restoredLocalMsgLog.removeAll();
2100         mlogData->mapTable.empty();
2101 };
2102
2103
2104 void updateHomePE(void *data,ChareMlogData *mlogData){
2105         RestartRequest *updateRequest = (RestartRequest *)data;
2106         int PE = updateRequest->PE; //restarted PE
2107         //if this object is an array Element and its home is the restarted processor
2108         // the home processor needs to know its current location
2109         if(mlogData->objID.type == TypeArray){
2110                 //it is an array element
2111                 CkGroupID myGID = mlogData->objID.data.array.id;
2112                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2113                 CkArrayID aid(mlogData->objID.data.array.id);           
2114                 //check if the restarted processor is the home processor for this object
2115                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2116                 if(locMgr->homePe(myIdx) == PE){
2117                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2118                         DEBUGRESTART(myIdx.print());
2119                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2120                 }
2121         }
2122 };
2123
2124
2125 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2126         
2127         int sender = updateRequest->PE;
2128         
2129         forAllCharesDo(updateHomePE,updateRequest);
2130         
2131         updateRequest->PE = CmiMyPe();
2132         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2133         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2134         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2135                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2136                 checkpointCount--;
2137                 startMlogCheckpoint(NULL,0);
2138         }
2139         if(sender == getCheckPointPE()){
2140                 for(int i=0;i<retainedObjectList.size();i++){
2141                         if(retainedObjectList[i]->acked == 0){
2142                                 MigrationNotice migMsg;
2143                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2144                                 migMsg.record = retainedObjectList[i];
2145                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2146                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2147                         }
2148                 }
2149
2150         }
2151 }
2152
2153
2154 //the data argument is of type ResendData which contains the 
2155 //array of objects on  the restartedProcessor
2156 //this method resends the messages stored in this chare's message log 
2157 //to the restarted processor. It also accumulates the maximum TN
2158 //for all the objects on the restarted processor
2159 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2160         char nameString[100];
2161         ResendData *resendData = (ResendData *)data;
2162         int PE = resendData->PE; //restarted PE
2163         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2164         int count=0;
2165         int ticketRequests=0;
2166         CkQ<MlogEntry *> *log = mlogData->getMlog();
2167
2168         
2169         
2170         
2171         for(int i=0;i<log->length();i++){
2172                 MlogEntry *logEntry = (*log)[i];
2173                 
2174                 // if we sent out the logs of a local message to buddy and he crashed
2175                 //before acking
2176                 envelope *env = logEntry->env;
2177                 if(env == NULL){
2178                         continue;
2179                 }
2180                 if(logEntry->unackedLocal){
2181                         char recverString[100];
2182                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2183                         sendLocalMessageCopy(logEntry);
2184                 }
2185                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2186                 if(env->TN <= 0){
2187                         //ticket not yet replied send it out again
2188                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2189                 }
2190                 
2191                 if(env->recver.type != TypeInvalid){
2192                         int flag = 0;//marks if any of the restarted objects matched this log entry
2193                         for(int j=0;j<resendData->numberObjects;j++){
2194                                 if(env->recver == (resendData->listObjects)[j].recver){
2195                                         flag = 1;
2196                                         //message has a valid TN
2197                                         if(env->TN > 0){
2198                                                 //store maxTicket
2199                                                 if(env->TN > resendData->maxTickets[j]){
2200                                                         resendData->maxTickets[j] = env->TN;
2201                                                 }
2202                                                 //if the TN for this entry is more than the TN processed, send the message out
2203                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2204                                                         //store the TNs that have been since the recver last checkpointed
2205                                                         resendData->ticketVecs[j].push_back(env->TN);
2206                                                         
2207                                                         if(PE != CkMyPe()){
2208                                                                 if(env->recver.type == TypeNodeGroup){
2209                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2210                                                                 }else{
2211                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2212                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2213                                                                 }
2214                                                         }else{
2215                                                                 envelope *copyEnv = copyEnvelope(env);
2216                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2217                                                         }
2218                                                         char senderString[100];
2219                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2220                                                         count++;
2221                                                 }       
2222                                         }else{
2223 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2224                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2225                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2226                                                 CkAssert(logEntry->destPE != CkMyPe());
2227                                                 
2228                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2229                                                 
2230                                                 ticketRequests++;*/
2231                                         }
2232                                 }
2233                         }//end of for loop of objects
2234                         
2235                 }       
2236         }
2237         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2238 }
2239
2240 void _resendMessagesHandler(char *msg){
2241         ResendRequest *resendReq = (ResendRequest *)msg;
2242
2243         //GML: examines the origin processor to determine if it belongs to the same group
2244         if(resendReq->PE != CkMyPe() && resendReq->PE/GROUP_SIZE_MLOG == CkMyPe()/GROUP_SIZE_MLOG){
2245                 //TODO: change the function call from restart to group-restart to avoid cyclic calls,
2246                 // for now it will work only with 1 failure
2247                 if(_restartFlag)
2248                         return;
2249                 CmiMemoryCheck();
2250                 CkPrintf("[%d] RESTART: same group\n",CkMyPe());
2251                 //HERE _resetNodeBocInitVec();
2252 /*      int numGroups = CkpvAccess(_groupIDTable)->size();
2253                 int i;
2254                 CKLOCMGR_LOOP(mgr->startInserting(););
2255                 CKLOCMGR_LOOP(mgr->flushAllRecs(););
2256 */
2257                 // rolls back to the previous checkpoint and sends a broadcast to resend messages to this processor
2258                 CkMlogRestartLocal();
2259                 return;
2260         }
2261
2262
2263         char *listObjects = &msg[sizeof(ResendRequest)];
2264         ResendData d;
2265         d.numberObjects = resendReq->numberObjects;
2266         d.PE = resendReq->PE;
2267         d.listObjects = (TProcessedLog *)listObjects;
2268         d.maxTickets = new MCount[d.numberObjects];
2269         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2270         for(int i=0;i<d.numberObjects;i++){
2271                 d.maxTickets[i] = 0;
2272         }
2273
2274         //Check if any of the retained objects need to be recreated
2275         //If they have not been recreated on the restarted processor
2276         //they need to be recreated on this processor
2277         int count=0;
2278         for(int i=0;i<retainedObjectList.size();i++){
2279                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2280                         count++;
2281                         int recreate=1;
2282                         for(int j=0;j<d.numberObjects;j++){
2283                                 if(d.listObjects[j].recver.type != TypeArray ){
2284                                         continue;
2285                                 }
2286                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2287                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2288                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2289                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2290                                                 recreate = 0;
2291                                                 break;
2292                                         }
2293                                 }
2294                         }
2295                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2296                         if(recreate){
2297                                 donotCountMigration=1;
2298                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2299                                 donotCountMigration=0;
2300                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2301                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2302                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2303
2304                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2305                                 
2306                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2307                                 CmiAssert(rec->type() == CkLocRec::local);
2308                                 CkVec<CkMigratable *> eltList;
2309                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2310                                 for(int j=0;j<eltList.size();j++){
2311                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2312                                                 CpvAccess(_currentObj) = eltList[j];
2313                                                 eltList[j]->ResumeFromSync();
2314                                         }
2315                                 }
2316
2317
2318                                 
2319                                 retainedObjectList[i]->msg=NULL;
2320                                 
2321                                         
2322                         }
2323                 }
2324         }
2325         
2326         if(count > 0){
2327 //              CmiAbort("retainedObjectList for restarted processor not empty");
2328         }
2329
2330
2331         
2332         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2333         forAllCharesDo(resendMessageForChare,&d);
2334
2335         //send back the maximum ticket number for a message sent to each object on the 
2336         //restarted processor
2337         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2338         
2339         int totalTNStored=0;
2340         for(int i=0;i<d.numberObjects;i++){
2341                 totalTNStored += d.ticketVecs[i].size();
2342         }
2343         
2344         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2345         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2346         
2347         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2348         resendReply->PE = CkMyPe();
2349         resendReply->numberObjects = d.numberObjects;
2350         
2351         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2352         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2353         for(int i=0;i<d.numberObjects;i++){
2354                 replyObjects[i] = d.listObjects[i].recver;
2355         }
2356         
2357         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2358         for(int i=0;i<d.numberObjects;i++){
2359                 int vecsize = d.ticketVecs[i].size();
2360                 memcpy(ticketList,&vecsize,sizeof(int));
2361                 ticketList = &ticketList[sizeof(int)];
2362                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2363                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2364         }       
2365
2366         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2367         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2368         
2369 /*      
2370         if(verifyAckRequestsUnacked){
2371                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2372                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2373                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2374                         LDObjHandle h;
2375                         lb->Migrated(h,1);
2376                 }
2377         }
2378         
2379         verifyAckRequestsUnacked=0;*/
2380
2381
2382         
2383         delete [] d.maxTickets;
2384         delete [] d.ticketVecs;
2385         if(resendReq->PE != CkMyPe()){
2386                 CmiFree(msg);
2387         }       
2388 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2389         lastRestart = CmiWallTimer();
2390 }
2391
2392 void sortVec(CkVec<MCount> *TNvec);
2393 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2394
2395 void _resendReplyHandler(char *msg){    
2396         /**
2397                 need to rewrite this method to deal with parallel restart
2398         */
2399         ResendRequest *resendReply = (ResendRequest *)msg;
2400         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2401
2402         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2403         
2404         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2405         for(int i =0; i< resendReply->numberObjects;i++){       
2406                 Chare *obj = (Chare *)listObjects[i].getObject();
2407                 
2408                 int vecsize;
2409                 memcpy(&vecsize,listTickets,sizeof(int));
2410                 listTickets = &listTickets[sizeof(int)];
2411                 MCount *listTNs = (MCount *)listTickets;
2412                 
2413                 
2414                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2415                 
2416                 if(obj != NULL){
2417                         //the object was restarted on the processor on which it existed
2418                         processReceivedTN(obj,vecsize,listTNs);
2419                 }else{
2420                 //pack up objID vecsize and listTNs and send it to the correct processor
2421                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2422                         char *TNMsg = (char *)CmiAlloc(totalSize);
2423                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2424                         receivedTNData->recver = listObjects[i];
2425                         receivedTNData->numTNs = vecsize;
2426                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2427                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2428
2429                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2430                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2431                 }
2432                 
2433         }
2434 };
2435
2436 void _receivedTNDataHandler(ReceivedTNData *msg){
2437         char objName[100];
2438         Chare *obj = (Chare *) msg->recver.getObject();
2439         if(obj){                
2440                 char *_msg = (char *)msg;
2441                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2442                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2443                 processReceivedTN(obj,msg->numTNs,listTNs);
2444         }else{
2445                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2446                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2447         }
2448 };
2449
2450
2451 void processReceivedTN(Chare *obj,int listSize,MCount *listTNs){
2452         obj->mlogData->resendReplyRecvd++;
2453
2454         
2455         for(int j=0;j<listSize;j++){
2456                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2457         }
2458         
2459         //if this object has received all the replies find the ticket numbers
2460         //that senders know about. Those less than the ticket number processed 
2461         //by the receiver can be thrown away. The rest need not be consecutive
2462         // ie there can be holes in the list of ticket numbers seen by senders
2463         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2464                 obj->mlogData->resendReplyRecvd = 0;
2465                 //sort the received TNS
2466                 sortVec(obj->mlogData->receivedTNs);
2467         
2468                 //after all the received tickets are in we need to sort them and then 
2469                 // calculate the holes
2470                 
2471                 if(obj->mlogData->receivedTNs->size() > 0){
2472                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2473                         int vecsize = obj->mlogData->receivedTNs->size();
2474                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2475                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2476                         if(numberHoles == 0){
2477                         }else{
2478                                 char objName[100];                                      
2479                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2480                                 obj->mlogData->numberHoles = numberHoles;
2481                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2482                                 int countHoles=0;
2483                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2484                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2485                                                 //the TNs are not consecutive at this point
2486                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2487                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2488                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2489                                                         countHoles++;
2490                                                 }       
2491                                         }
2492                                 }
2493                                 //Holes have been given new TN
2494                                 if(countHoles != numberHoles){
2495                                         char str[100];
2496                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2497                                 }
2498                                 CkAssert(countHoles == numberHoles);                                    
2499                                 obj->mlogData->currentHoles = numberHoles;
2500                         }
2501                 }       
2502                 
2503                 delete obj->mlogData->receivedTNs;
2504                 obj->mlogData->receivedTNs = NULL;
2505                 obj->mlogData->restartFlag = 0;
2506                 char objString[100];
2507                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2508         }
2509
2510 }
2511
2512
2513 void sortVec(CkVec<MCount> *TNvec){
2514         //sort it ->its bloddy bubble sort
2515         //TODO: use quicksort
2516         for(int i=0;i<TNvec->size();i++){
2517                 for(int j=i+1;j<TNvec->size();j++){
2518                         if((*TNvec)[j] < (*TNvec)[i]){
2519                                 MCount temp;
2520                                 temp = (*TNvec)[i];
2521                                 (*TNvec)[i] = (*TNvec)[j];
2522                                 (*TNvec)[j] = temp;
2523                         }
2524                 }
2525         }
2526         //make it unique .. since its sorted all equal units will be consecutive
2527         MCount *tempArray = new MCount[TNvec->size()];
2528         int     uniqueCount=-1;
2529         for(int i=0;i<TNvec->size();i++){
2530                 tempArray[i] = 0;
2531                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2532                         uniqueCount++;
2533                         tempArray[uniqueCount] = (*TNvec)[i];
2534                 }
2535         }
2536         uniqueCount++;
2537         TNvec->removeAll();
2538         for(int i=0;i<uniqueCount;i++){
2539                 TNvec->push_back(tempArray[i]);
2540         }
2541         delete [] tempArray;
2542 }       
2543
2544 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2545         if(TNVec->size() == 0){
2546                 return -1; //not found in an empty vec
2547         }
2548         //binary search to find 
2549         int left=0;
2550         int right = TNVec->size();
2551         int mid = (left +right)/2;
2552         while(searchTN != (*TNVec)[mid] && left < right){
2553                 if((*TNVec)[mid] > searchTN){
2554                         right = mid-1;
2555                 }else{
2556                         left = mid+1;
2557                 }
2558                 mid = (left + right)/2;
2559         }
2560         if(left < right){
2561                 //mid is the element to be returned
2562                 return mid;
2563         }else{
2564                 if(mid < TNVec->size() && mid >=0){
2565                         if((*TNVec)[mid] == searchTN){
2566                                 return mid;
2567                         }else{
2568                                 return -1;
2569                         }
2570                 }else{
2571                         return -1;
2572                 }
2573         }
2574 };
2575
2576
2577 /*
2578         Method to do parallel restart. Distribute some of the array elements to other processors.
2579         The problem is that we cant use to charm entry methods to do migration as it will get
2580         stuck in the protocol that is going to restart
2581 */
2582
2583 class ElementDistributor: public CkLocIterator{
2584         CkLocMgr *locMgr;
2585         int *targetPE;
2586         void pupLocation(CkLocation &loc,PUP::er &p){
2587                 CkArrayIndexMax idx=loc.getIndex();
2588                 CkGroupID gID = locMgr->ckGetGroupID();
2589                 p|gID;      // store loc mgr's GID as well for easier restore
2590                 p|idx;
2591                 p|loc;
2592         };
2593         public:
2594                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2595                 void addLocation(CkLocation &loc){
2596                         if(*targetPE == CkMyPe()){
2597                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2598                                 return;
2599                         }
2600                         
2601                         CkArrayIndexMax idx=loc.getIndex();
2602                         CkLocRec_local *rec = loc.getLocalRecord();
2603                         
2604                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2605                         idx.print();
2606                         
2607
2608                         //TODO: an element that is being moved should leave some trace behind so that
2609                         // the arraybroadcaster can forward messages to it
2610                         
2611                         //pack up this location and send it across
2612                         PUP::sizer psizer;
2613                         pupLocation(loc,psizer);
2614                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2615                         char *msg = (char *)CmiAlloc(totalSize);
2616                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2617                         PUP::toMem pmem(buf);
2618                         pmem.becomeDeleting();
2619                         pupLocation(loc,pmem);
2620                         
2621                         locMgr->setDuringMigration(CmiTrue);                    
2622                         delete rec;
2623                         locMgr->setDuringMigration(CmiFalse);                   
2624                         locMgr->inform(idx,*targetPE);
2625
2626                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2627                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2628
2629                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2630                         //decide on the target processor for the next object
2631                         *targetPE = (*targetPE +1)%CkNumPes();
2632                 }
2633                 
2634 };
2635
2636 void distributeRestartedObjects(){
2637         int numGroups = CkpvAccess(_groupIDTable)->size();      
2638         int i;
2639         int targetPE=CkMyPe();
2640         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2641 };
2642
2643 void _distributedLocationHandler(char *receivedMsg){
2644         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2645         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2646         PUP::fromMem pmem(buf);
2647         CkGroupID gID;
2648         CkArrayIndexMax idx;
2649         pmem |gID;
2650         pmem |idx;
2651         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2652         donotCountMigration=1;
2653         mgr->resume(idx,pmem);
2654         donotCountMigration=0;
2655         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2656         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2657         idx.print();
2658
2659         CkLocRec *rec = mgr->elementRec(idx);
2660         CmiAssert(rec->type() == CkLocRec::local);
2661         
2662         CkVec<CkMigratable *> eltList;
2663         mgr->migratableList((CkLocRec_local *)rec,eltList);
2664         for(int i=0;i<eltList.size();i++){
2665                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2666                         CpvAccess(_currentObj) = eltList[i];
2667                         eltList[i]->ResumeFromSync();
2668                 }
2669         }
2670         
2671         
2672 }
2673
2674
2675 /** this method is used to send messages to a restarted processor to tell
2676  * it that a particular expected object is not going to get to it */
2677 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2678         DummyMigrationMsg buf;
2679         buf.flag = MLOG_OBJECT;
2680         buf.lbID = lbID;
2681         buf.mgrID = locMgrID;
2682         buf.idx = idx;
2683         buf.locationPE = locationPE;
2684         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2685         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2686 };
2687
2688
2689 /**this method is used by a restarted processor to tell other processors
2690  * that they are not going to receive these many objects.. just the count
2691  * not the objects themselves ***/
2692
2693 void sendDummyMigrationCounts(int *dummyCounts){
2694         DummyMigrationMsg buf;
2695         buf.flag = MLOG_COUNT;
2696         buf.lbID = globalLBID;
2697         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2698         for(int i=0;i<CmiNumPes();i++){
2699                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2700                         buf.count = dummyCounts[i];
2701                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2702                 }
2703         }
2704 }
2705
2706
2707 /** this handler is used to process a dummy migration msg.
2708  * it looks up the load balancer and calls migrated for it */
2709
2710 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2711         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2712         if(msg->flag == MLOG_OBJECT){
2713                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2714                 LDObjHandle h;
2715                 lb->Migrated(h,1);
2716         }
2717         if(msg->flag == MLOG_COUNT){
2718                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2719                 msg->count -= verifyAckedRequests;
2720                 for(int i=0;i<msg->count;i++){
2721                         LDObjHandle h;
2722                         lb->Migrated(h,1);
2723                 }
2724         }
2725         verifyAckedRequests=0;
2726         CmiFree(msg);
2727 };
2728
2729
2730
2731
2732
2733
2734
2735 /*****************************************************
2736         Implementation of a method that can be used to call
2737         any method on the ChareMlogData of all the chares on
2738         a processor currently
2739 ******************************************************/
2740
2741
2742 class ElementCaller :  public CkLocIterator {
2743 private:
2744         CkLocMgr *locMgr;
2745         MlogFn fnPointer;
2746         void *data;
2747 public:
2748         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
2749                 locMgr = _locMgr;
2750                 fnPointer = _fnPointer;
2751                 data = _data;
2752         };
2753         void addLocation(CkLocation &loc){
2754                 CkVec<CkMigratable *> list;
2755                 CkLocRec_local *local = loc.getLocalRecord();
2756                 locMgr->migratableList (local,list);
2757                 for(int i=0;i<list.size();i++){
2758                         CkMigratable *migratableElement = list[i];
2759                         fnPointer(data,migratableElement->mlogData);
2760                 }
2761         }
2762 };
2763
2764 void forAllCharesDo(MlogFn fnPointer,void *data){
2765         int numGroups = CkpvAccess(_groupIDTable)->size();
2766         for(int i=0;i<numGroups;i++){
2767                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
2768                 fnPointer(data,obj->mlogData);
2769         }
2770         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
2771         for(int i=0;i<numNodeGroups;i++){
2772                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
2773                 fnPointer(data,obj->mlogData);
2774         }
2775         int i;
2776         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
2777         
2778         
2779 };
2780
2781
2782 /******************************************************************
2783  Load Balancing
2784 ******************************************************************/
2785
2786 void initMlogLBStep(CkGroupID gid){
2787         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
2788         countLBMigratedAway = 0;
2789         countLBToMigrate=0;
2790         onGoingLoadBalancing=1;
2791         migrationDoneCalled=0;
2792         checkpointBarrierCount=0;
2793         if(globalLBID.idx != 0){
2794                 CmiAssert(globalLBID.idx == gid.idx);
2795         }
2796         globalLBID = gid;
2797 }
2798
2799 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
2800         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
2801         
2802         resumeLbFnPtr = _fnPtr;
2803         centralLb = _centralLb;
2804         migrationDoneCalled = 1;
2805         if(countLBToMigrate == countLBMigratedAway){
2806                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2807                 startMlogCheckpoint(NULL,CmiWallTimer());       
2808         }
2809 };
2810
2811 void finishedCheckpointLoadBalancing(){
2812         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
2813         CheckpointBarrierMsg msg;
2814         msg.fromPE = CmiMyPe();
2815         msg.checkpointCount = checkpointCount;
2816
2817         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
2818         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
2819         
2820 };
2821
2822
2823 void sendMlogLocation(int targetPE,envelope *env){
2824         void *_msg = EnvToUsr(env);
2825         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2826
2827
2828         int existing = 0;
2829         //if this object is already in the retainedobjectlust destined for this
2830         //processor it should not be sent
2831         
2832         for(int i=0;i<retainedObjectList.size();i++){
2833                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
2834                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
2835                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
2836                         existing = 1;
2837                         break;
2838                 }
2839         }
2840
2841         if(existing){
2842                 return;
2843         }
2844         
2845         
2846         countLBToMigrate++;
2847         
2848         MigrationNotice migMsg;
2849         migMsg.migRecord.gID = msg->gid;
2850         migMsg.migRecord.idx = msg->idx;
2851         migMsg.migRecord.fromPE = CkMyPe();
2852         migMsg.migRecord.toPE =  targetPE;
2853         
2854         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
2855         
2856         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
2857         retainedObject->migRecord = migMsg.migRecord;
2858         retainedObject->acked  = 0;
2859         
2860         CkPackMessage(&env);
2861         
2862         migMsg.record = retainedObject;
2863         retainedObject->msg = env;
2864         int size = retainedObject->size = env->getTotalsize();
2865         
2866         retainedObjectList.push_back(retainedObject);
2867         
2868         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2869         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2870         
2871         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
2872
2873 }
2874
2875 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
2876         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
2877         migratedNoticeList.push_back(msg->migRecord);
2878
2879         MigrationNoticeAck buf;
2880         buf.record = msg->record;
2881         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
2882         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
2883 }
2884
2885 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
2886         
2887         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
2888         retainedObject->acked = 1;
2889
2890         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
2891         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
2892
2893         //inform home about the new location of this object
2894         CkGroupID gID = retainedObject->migRecord.gID ;
2895         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2896         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
2897         
2898         countLBMigratedAway++;
2899         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
2900                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
2901                 startMlogCheckpoint(NULL,CmiWallTimer());
2902         }
2903 };
2904
2905 void _receiveMlogLocationHandler(void *buf){
2906         envelope *env = (envelope *)buf;
2907         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
2908         CkUnpackMessage(&env);
2909         void *_msg = EnvToUsr(env);
2910         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
2911         CkGroupID gID= msg->gid;
2912         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
2913         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2914         CpvAccess(_currentObj)=mgr;
2915         mgr->immigrate(msg);
2916 };
2917
2918
2919 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
2920 /*      if(mlogData->objID.type == TypeArray){
2921                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
2922         //      TODO: make sure later that atSync has been called and it needs 
2923         //      to be resumed from sync
2924         //
2925                 CpvAccess(_currentObj) = elt;
2926                 elt->ResumeFromSync();
2927         }*/
2928 }
2929
2930 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
2931         if(checkpointBarrierCount == CmiNumPes()){
2932                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
2933                 for(int i=0;i<CmiNumPes();i++){
2934                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
2935                 }
2936         }
2937 }
2938
2939 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
2940         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
2941         if(msg->checkpointCount == checkpointCount){
2942                 checkpointBarrierCount++;
2943                 checkAndSendCheckpointBarrierAcks(msg);
2944         }else{
2945                 if(msg->checkpointCount-1 == checkpointCount){
2946                         checkpointBarrierCount++;
2947                         checkAndSendCheckpointBarrierAcks(msg);
2948                 }else{
2949                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
2950                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
2951                 }
2952         }
2953         CmiFree(msg);
2954 }
2955
2956 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
2957         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
2958         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
2959         sendRemoveLogRequests();
2960         (*resumeLbFnPtr)(centralLb);
2961         CmiFree(msg);
2962 }
2963
2964 /**
2965         method that informs an array elements home processor of its current location
2966         It is a converse method to bypass the charm++ message logging framework
2967 */
2968
2969 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
2970         double _startTime = CmiWallTimer();
2971         CurrentLocationMsg msg;
2972         msg.mgrID = locMgrID;
2973         msg.idx = idx;
2974         msg.locationPE = currentPE;
2975         msg.fromPE = CkMyPe();
2976
2977         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
2978         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
2979         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
2980         traceUserBracketEvent(37,_startTime,CmiWallTimer());
2981 }
2982
2983
2984 void _receiveLocationHandler(CurrentLocationMsg *data){
2985         double _startTime = CmiWallTimer();
2986         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
2987         if(mgr == NULL){
2988                 CmiFree(data);
2989                 return;
2990         }
2991         CkLocRec *rec = mgr->elementNrec(data->idx);
2992         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
2993         if(rec != NULL){
2994                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
2995                         if(data->fromPE == data->locationPE){
2996                                 CmiAbort("Another processor has the same object");
2997                         }
2998                 }
2999         }
3000         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3001                 int targetPE = data->fromPE;
3002                 data->fromPE = CmiMyPe();
3003                 data->locationPE = CmiMyPe();
3004                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3005                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3006         }else{
3007                 mgr->inform(data->idx,data->locationPE);
3008         }
3009         CmiFree(data);
3010         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3011 }
3012
3013
3014
3015 void getGlobalStep(CkGroupID gID){
3016         LBStepMsg msg;
3017         int destPE = 0;
3018         msg.lbID = gID;
3019         msg.fromPE = CmiMyPe();
3020         msg.step = -1;
3021         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3022         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3023 };
3024
3025 void _getGlobalStepHandler(LBStepMsg *msg){
3026         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3027         msg->step = lb->step();
3028         CmiAssert(msg->fromPE != CmiMyPe());
3029         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3030         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3031         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3032 };
3033
3034 void _recvGlobalStepHandler(LBStepMsg *msg){
3035         
3036         restartDecisionNumber=msg->step;
3037         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3038         _updateHomeAckHandler(dummyAck);
3039 };
3040
3041
3042
3043
3044
3045
3046
3047
3048 void _messageLoggingExit(){
3049 /*      if(CkMyPe() == 0){
3050                 if(countBuffered != 0){
3051                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3052                 }
3053
3054 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3055         }
3056         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3057         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3058
3059         //GML: printing some statistics for group approach
3060         if(GROUP_SIZE_MLOG > 1)
3061                 CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
3062
3063 }
3064 /**********************************
3065         * The methods of the message logging
3066         * data structure stored in each chare
3067         ********************************/
3068
3069 MCount ChareMlogData::nextSN(const CkObjID &recver){
3070 /*      MCount SN = snTable.get(recver);
3071         snTable.put(recver) = SN+1;
3072         return SN+1;*/
3073         double _startTime = CmiWallTimer();
3074         MCount *SN = snTable.getPointer(recver);
3075         if(SN==NULL){
3076                 snTable.put(recver) = 1;
3077                 return 1;
3078         }else{
3079                 (*SN)++;
3080                 return *SN;
3081         }
3082 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3083 };
3084
3085
3086 MCount ChareMlogData::newTN(){
3087         MCount TN;
3088         if(currentHoles > 0){
3089                 int holeidx = numberHoles-currentHoles;
3090                 TN = ticketHoles[holeidx];
3091                 currentHoles--;
3092                 if(currentHoles == 0){
3093                         delete []ticketHoles;
3094                         numberHoles = 0;
3095                 }
3096         }else{
3097                 TN = ++tCount;
3098         }       
3099         return TN;
3100 };
3101
3102 Ticket ChareMlogData::next_ticket(CkObjID &sender,MCount SN){
3103         char senderName[100];
3104         char recverName[100];
3105         double _startTime =CmiWallTimer();
3106         Ticket ticket;
3107         if(restartFlag){
3108                 ticket.TN = 0;
3109                 return ticket;
3110         }
3111 /*      SNToTicket &ticketRow = ticketTable.put(sender);
3112         Ticket earlierTicket = ticketRow.get(SN);
3113         if(earlierTicket.TN == 0){
3114                 //This SN has not been ever alloted a ticket
3115                 ticket.TN = newTN();
3116                 ticketRow.put(SN)=ticket;
3117         }else{
3118                 ticket.TN = earlierTicket.TN;
3119         }*/
3120         
3121
3122         SNToTicket *ticketRow = ticketTable.get(sender);
3123         if(ticketRow != NULL){
3124                 Ticket earlierTicket = ticketRow->get(SN);
3125                 if(earlierTicket.TN == 0){
3126                         ticket.TN = newTN();
3127                         ticketRow->put(SN) = ticket;
3128                         DEBUG(CkAssert((ticketRow->get(SN)).TN == ticket.TN));
3129                 }else{
3130                         ticket.TN = earlierTicket.TN;
3131                         if(ticket.TN > tCount){
3132                                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
3133                         }
3134                                 CmiAssert(ticket.TN <= tCount);
3135                 }
3136                 DEBUG(CmiPrintf("[%d] next_ticket old row ticket sender %s recver %s SN %d TN %d tCount %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN,tCount));
3137         }else{
3138                 SNToTicket *newRow = new SNToTicket;            
3139                 ticket.TN = newTN();
3140                 newRow->put(SN) = ticket;
3141                 ticketTable.put(sender) = newRow;
3142                 DEBUG(printf("[%d] next_ticket new row ticket sender %s recver %s SN %d TN %d\n",CkMyPe(),sender.toString(senderName),objID.toString(recverName),SN,ticket.TN));
3143         }
3144 /*TODO: check if the message for this SN has already been received
3145         in the table of received SNs 
3146         If it was received before the last checkpoint mark it as old
3147         other wise received
3148         */
3149         ticket.state = NEW_TICKET;
3150 //      traceUserBracketEvent(34,_startTime,CkWallTimer());
3151         return ticket;  
3152 };
3153
3154 void ChareMlogData::addLogEntry(MlogEntry *entry){
3155         char nameString[100];
3156         DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
3157                 DEBUG(CmiMemoryCheck());
3158         mlog.enq(entry);
3159 };
3160
3161 double totalSearchRestoredTime=0;
3162 double totalSearchRestoredCount=0;
3163
3164
3165 MCount ChareMlogData::searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN){
3166         double start= CkWallTimer();
3167         MCount TN=0;    
3168         if(mapTable.numObjects() > 0){
3169                 RestoredLocalMap *map = mapTable.get(sender);
3170                 if(map){
3171                         int index = SN - map->minSN;
3172                         if(index < map->count){
3173                                 TN = map->TNArray[index];
3174                         }
3175                 }
3176         /*      if(i > 0){
3177                         printf("i %d restoredLocalMsgLog.size() %d\n",i,restoredLocalMsgLog.size());
3178                 }*/
3179         }
3180         
3181         char senderName[100],recverName[100];
3182         
3183         if(TN != 0){
3184                 DEBUG(CmiPrintf("[%d] searchRestoredLocalQ found match sender %s recver %s SN %d TN %d\n",CmiMyPe(),sender.toString(senderName),recver.toString(recverName),SN,TN));
3185         }
3186         totalSearchRestoredTime += CkWallTimer()-start;
3187         totalSearchRestoredCount ++;
3188         return TN;
3189 }
3190
3191 void ChareMlogData::addToRestoredLocalQ(LocalMessageLog *logEntry){
3192         restoredLocalMsgLog.push_back(*logEntry);
3193 }
3194
3195 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData){
3196         mlogData->sortRestoredLocalMsgLog();
3197 }
3198
3199 void ChareMlogData::sortRestoredLocalMsgLog(){
3200         //sort it ->its bloddy bubble sort
3201         
3202         for(int i=0;i<restoredLocalMsgLog.size();i++){
3203                 LocalMessageLog &logEntry = restoredLocalMsgLog[i];
3204                 RestoredLocalMap *map = mapTable.get(logEntry.sender);
3205                 if(map == NULL){
3206                         map = new RestoredLocalMap;
3207                         mapTable.put(logEntry.sender)=map;
3208                 }
3209                 map->count++;
3210                 if(map->minSN == 0){
3211                         map->minSN = logEntry.SN;
3212                 }else{
3213                         if(logEntry.SN < map->minSN){
3214                                 map->minSN = logEntry.SN;
3215                         }
3216                 }
3217                 if(logEntry.SN > map->maxSN){
3218                         map->maxSN = logEntry.SN;
3219                 }
3220
3221         }
3222         for(int i=0;i< restoredLocalMsgLog.size();i++){
3223                 LocalMessageLog &logEntry = restoredLocalMsgLog[i];
3224                 RestoredLocalMap *map = mapTable.get(logEntry.sender);
3225                 CkAssert(map != NULL);
3226                 if(map->TNArray == NULL){
3227                         map->TNArray = new MCount[map->maxSN-map->minSN+1];                     
3228                         //HERE: erase from here
3229                         printf("map->count:%d\n",map->count);
3230                         printf("map->maxSN:%d\n",map->maxSN);
3231                         printf("map->minSN:%d\n",map->minSN);
3232                         //HERE: to here
3233
3234                         CkAssert(map->count == map->maxSN-map->minSN+1);
3235                         map->count = 0;
3236                 }
3237                 map->TNArray[map->count] = logEntry.TN;
3238                 map->count++;
3239         }
3240         restoredLocalMsgLog.free();
3241 }
3242
3243
3244 /**
3245  * Pup method for the metadata.
3246  * We are preventing the whole message log to be stored (as proposed by Sayantan for dealing with multiple failures).
3247  * Then, we only support one failure at a time. Read Sayantan's thesis, sections 4.2 and 4.3 for more details.
3248  */
3249 void ChareMlogData::pup(PUP::er &p){
3250         int startSize=0;
3251         char nameStr[100];
3252         if(p.isSizing()){
3253                 PUP::sizer *sizep = (PUP::sizer *)&p;
3254                 startSize = sizep->size();
3255         }
3256         double _startTime = CkWallTimer();
3257         
3258         p | objID;
3259         p | tCount;
3260         p | tProcessed;
3261         if(p.isUnpacking()){
3262                 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
3263         }
3264         p | toResumeOrNot;
3265         p | resumeCount;
3266         DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
3267         
3268
3269         /*pack the receivedTN vector*/
3270         int lengthReceivedTNs;
3271         if(!p.isUnpacking()){
3272                 if(receivedTNs == NULL){
3273                         lengthReceivedTNs = -1;
3274                 }else{
3275                         lengthReceivedTNs = receivedTNs->size();                
3276                 }
3277         }
3278         p | lengthReceivedTNs;
3279         if(p.isUnpacking()){
3280                 if(lengthReceivedTNs == -1){
3281                         receivedTNs = NULL;
3282                 }else{
3283                         receivedTNs = new CkVec<MCount>;
3284                         for(int i=0;i<lengthReceivedTNs;i++){
3285                                 MCount tempTicket;
3286                                 p | tempTicket;
3287                                 CkAssert(tempTicket > 0);
3288                                 receivedTNs->push_back(tempTicket);
3289                         }
3290                 }
3291         }else{
3292                 for(int i=0;i<lengthReceivedTNs;i++){
3293                         p | (*receivedTNs)[i];
3294                 }
3295         }
3296         
3297         
3298         p | currentHoles;
3299         p | numberHoles;
3300         if(p.isUnpacking()){
3301                 if(numberHoles > 0){
3302                         ticketHoles = new MCount[numberHoles];                  
3303                 }else{
3304                         ticketHoles = NULL;
3305                 }
3306         }
3307         if(numberHoles > 0){
3308                 p(ticketHoles,numberHoles);
3309         }
3310         
3311         snTable.pup(p);
3312
3313         // pupping only the unacked local messages in the message log
3314         int length = 0;
3315         MlogEntry *entry;
3316         if(!p.isUnpacking()){
3317                 for(int i=0; i<mlog.length(); i++){
3318                         entry = mlog[i];
3319                         if(entry->unackedLocal)
3320                                 length++;
3321                 }
3322         }
3323         p | length;
3324         if(p.isUnpacking()){
3325                 for(int i=0; i<length; i++){
3326                         entry = new MlogEntry();
3327                         mlog.enq(entry);
3328                         entry->pup(p);
3329                 }
3330         }else{
3331                 for(int i=0; i<mlog.length(); i++){
3332                         entry = mlog[i];
3333                         if(entry->unackedLocal){
3334                                 entry->pup(p);
3335                         }
3336                 }
3337         }
3338
3339 /*      int length;
3340         if(!p.isUnpacking()){           
3341                 length = mlog.length(); 
3342                 if(length > 0)
3343                         DEBUG(printf("[%d] Mlog length %d \n",CkMyPe(),length));
3344         }
3345         p | length;
3346         for(int i=0;i<length;i++){
3347                 MlogEntry *entry;
3348                 if(p.isUnpacking()){
3349                         entry = new MlogEntry();
3350                         mlog.enq(entry);
3351                 }else{
3352                         entry = mlog[i];
3353                 }
3354                 entry->pup(p);
3355         }*/
3356         
3357         p | restoredLocalMsgLog;
3358         p | resendReplyRecvd;
3359         p | restartFlag;
3360
3361         // pup the mapTable
3362         int tableSize;
3363         if(!p.isUnpacking()){
3364                 tableSize = mapTable.numObjects();
3365         }
3366         p | tableSize;
3367         if(!p.isUnpacking()){
3368                 CkHashtableIterator *iter = mapTable.iterator();
3369                 while(iter->hasNext()){
3370                         CkObjID *objID;
3371                         RestoredLocalMap **map = (RestoredLocalMap **) iter->next((void **)&objID);
3372                         p | (*objID);
3373                         (*map)->pup(p);
3374                 }
3375         }else{
3376                 for(int i=0;i<tableSize;i++){
3377                         CkObjID objID;
3378                         p | objID;
3379                         RestoredLocalMap *map = new RestoredLocalMap;
3380                         map->pup(p);
3381                         mapTable.put(objID) = map;
3382                 }
3383         }
3384
3385         //pup the ticketTable
3386         {
3387                 int ticketTableSize;
3388                 if(!p.isUnpacking()){
3389                         ticketTableSize = ticketTable.numObjects();
3390                 }
3391                 p | ticketTableSize;
3392                 if(!p.isUnpacking()){
3393                         CkHashtableIterator *iter = ticketTable.iterator();
3394                         while(iter->hasNext()){
3395                                 CkObjID *objID;
3396                                 SNToTicket **ticketRow = (SNToTicket **)iter->next((void **)&objID);
3397                                 p | (*objID);
3398                                 (*ticketRow)->pup(p);
3399                         }
3400                 }else{
3401                         for(int i=0;i<ticketTableSize;i++){
3402                                 CkObjID objID;
3403                                 p | objID;
3404                                 SNToTicket *ticketRow = new SNToTicket;
3405                                 ticketRow->pup(p);
3406                                 ticketTable.put(objID) = ticketRow;
3407                         }
3408                 }
3409         }
3410
3411
3412         
3413         
3414         if(p.isSizing()){
3415                 char name[40];
3416                 PUP::sizer *sizep = (PUP::sizer *)&p;
3417                 int pupSize = sizep->size()-startSize;
3418                 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
3419         //      CkAssert(pupSize <100000000);
3420         }
3421         
3422         double _finTime = CkWallTimer();
3423         DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
3424 };
3425
3426
3427 /**
3428         The method for returning the actual object pointed to by an id
3429         If the object doesnot exist on the processor it returns NULL
3430 **/
3431
3432 void* CkObjID::getObject(){
3433         
3434                 switch(type){
3435                         case TypeChare:
3436         
3437                                 return CkLocalChare(&data.chare.id);
3438                         case TypeGroup:
3439         
3440                                 CkAssert(data.group.onPE == CkMyPe());
3441                                 return CkLocalBranch(data.group.id);
3442                         case TypeNodeGroup:
3443                                 CkAssert(data.group.onPE == CkMyNode());
3444                                 //CkLocalNodeBranch(data.group.id);
3445                                 {
3446                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3447                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3448                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3449         
3450                                         return retval;
3451                                 }       
3452                         case TypeArray:
3453                                 {
3454         
3455         
3456                                         CkArrayID aid(data.array.id);
3457         
3458                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3459         
3460                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
3461         
3462                                         return aProxy.ckLocal();
3463                                 }
3464                         default:
3465                                 CkAssert(0);
3466                 }
3467 }
3468
3469
3470 int CkObjID::guessPE(){
3471                 switch(type){
3472                         case TypeChare:
3473                                 return data.chare.id.onPE;
3474                         case TypeGroup:
3475                         case TypeNodeGroup:
3476                                 return data.group.onPE;
3477                         case TypeArray:
3478                                 {
3479                                         CkArrayID aid(data.array.id);
3480                                         if(aid.ckLocalBranch() == NULL){
3481                                                 return -1;
3482                                         }
3483                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
3484                                 }
3485                         default:
3486                                 CkAssert(0);
3487                 }
3488 };
3489
3490 char *CkObjID::toString(char *buf) const {
3491         
3492         switch(type){
3493                 case TypeChare:
3494                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
3495                         break;
3496                 case TypeGroup:
3497                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
3498                         break;
3499                 case TypeNodeGroup:
3500                         sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
3501                         break;
3502                 case TypeArray:
3503                         {
3504                                 const CkArrayIndexMax &idx = data.array.idx.asMax();
3505                                 const int *indexData = idx.data();
3506                                 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
3507                                 break;
3508                         }
3509                 default:
3510                         CkAssert(0);
3511         }
3512         
3513         return buf;
3514 };
3515
3516 void CkObjID::updatePosition(int PE){
3517         if(guessPE() == PE){
3518                 return;
3519