Merge remote branch 'origin/charmrun' into charmrun
[charm.git] / src / ck-core / ckmessagelogging.C
1 /**
2  * Message Logging Fault Tolerance Protocol
3  * It includes the main functions for the basic and team-based schemes.
4  */
5
6 #include "charm.h"
7 #include "ck.h"
8 #include "ckmessagelogging.h"
9 #include "queueing.h"
10 #include <sys/types.h>
11 #include <signal.h>
12 #include "CentralLB.h"
13
14 #ifdef _FAULT_MLOG_
15
16 //#define DEBUG(x)  if(_restartFlag) {x;}
17 #define DEBUG_MEM(x) //x
18 #define DEBUG(x)  //x
19 #define DEBUGRESTART(x)  //x
20 #define DEBUGLB(x) // x
21 #define DEBUG_TEAM(x)  // x
22
23 #define BUFFERED_LOCAL
24 #define BUFFERED_REMOTE 
25
26 extern const char *idx2str(const CkArrayIndex &ind);
27 extern const char *idx2str(const ArrayElement *el);
28 const char *idx2str(const CkArrayIndexMax &ind){
29         return idx2str((const CkArrayIndex &)ind);
30 };
31
32 void getGlobalStep(CkGroupID gID);
33
34 bool fault_aware(CkObjID &recver);
35 void sendCheckpointData(int mode);
36 void createObjIDList(void *data,ChareMlogData *mlogData);
37 inline bool isLocal(int destPE);
38 inline bool isTeamLocal(int destPE);
39
40 int _restartFlag=0;
41 //ERASE int restarted=0; // it's not being used anywhere
42
43 //TML: variables for measuring savings with teams in message logging
44 float MLOGFT_totalLogSize = 0.0;
45 float MLOGFT_totalMessages = 0.0;
46 float MLOGFT_totalObjects = 0.0;
47
48 //TODO: remove for perf runs
49 int countHashRefs=0; //count the number of gets
50 int countHashCollisions=0;
51
52 //#define CHECKPOINT_DISK
53 char *checkpointDirectory=".";
54 int unAckedCheckpoint=0;
55
56 int countLocal=0,countBuffered=0;
57 int countPiggy=0;
58 int countClearBufferedLocalCalls=0;
59
60 int countUpdateHomeAcks=0;
61
62 extern int teamSize;
63 extern int chkptPeriod;
64 extern bool parallelRestart;
65
66 char *killFile;
67 int killFlag=0;
68 int restartingMlogFlag=0;
69 void readKillFile();
70 double killTime=0.0;
71 int checkpointCount=0;
72
73
74 CpvDeclare(Chare *,_currentObj);
75 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
76 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
77 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
78 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
79 CpvDeclare(Queue, _outOfOrderMessageQueue);
80 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
81 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
82 CpvDeclare(char **,_bufferedTicketRequests);
83 CpvDeclare(int *,_numBufferedTicketRequests);
84 CpvDeclare(char *,_bufferTicketReply);
85
86
87
88 static double adjustChkptPeriod=0.0; //in ms
89 static double nextCheckpointTime=0.0;//in seconds
90
91 double lastBufferedLocalMessageCopyTime;
92
93 int _maxBufferedMessages;
94 int _maxBufferedTicketRequests;
95 int BUFFER_TIME=2; // in ms
96
97
98 int _ticketRequestHandlerIdx;
99 int _ticketHandlerIdx;
100 int _localMessageCopyHandlerIdx;
101 int _localMessageAckHandlerIdx;
102 int _pingHandlerIdx;
103 int _bufferedLocalMessageCopyHandlerIdx;
104 int _bufferedLocalMessageAckHandlerIdx;
105 int _bufferedTicketRequestHandlerIdx;
106 int _bufferedTicketHandlerIdx;
107
108
109 char objString[100];
110 int _checkpointRequestHandlerIdx;
111 int _storeCheckpointHandlerIdx;
112 int _checkpointAckHandlerIdx;
113 int _getCheckpointHandlerIdx;
114 int _recvCheckpointHandlerIdx;
115 int _removeProcessedLogHandlerIdx;
116
117 int _verifyAckRequestHandlerIdx;
118 int _verifyAckHandlerIdx;
119 int _dummyMigrationHandlerIdx;
120
121
122 int     _getGlobalStepHandlerIdx;
123 int     _recvGlobalStepHandlerIdx;
124
125 int _updateHomeRequestHandlerIdx;
126 int _updateHomeAckHandlerIdx;
127 int _resendMessagesHandlerIdx;
128 int _resendReplyHandlerIdx;
129 int _receivedTNDataHandlerIdx;
130 int _distributedLocationHandlerIdx;
131
132 //TML: integer constants for team-based message logging
133 int _restartHandlerIdx;
134 int _getRestartCheckpointHandlerIdx;
135 int _recvRestartCheckpointHandlerIdx;
136 void setTeamRecovery(void *data, ChareMlogData *mlogData);
137 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
138
139 int verifyAckTotal;
140 int verifyAckCount;
141
142 int verifyAckedRequests=0;
143
144 RestartRequest *storedRequest;
145
146
147
148 int _falseRestart =0; /**
149                                                                                                         For testing on clusters we might carry out restarts on 
150                                                                                                         a porcessor without actually starting it
151                                                                                                         1 -> false restart
152                                                                                                         0 -> restart after an actual crash
153                                                                                                 */                                                                                                                              
154
155 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
156 int _lockNewTicket=0;
157
158
159 //Load balancing globals
160 int onGoingLoadBalancing=0;
161 void *centralLb;
162 void (*resumeLbFnPtr)(void *);
163 int _receiveMlogLocationHandlerIdx;
164 int _receiveMigrationNoticeHandlerIdx;
165 int _receiveMigrationNoticeAckHandlerIdx;
166 int _checkpointBarrierHandlerIdx;
167 int _checkpointBarrierAckHandlerIdx;
168
169 CkVec<MigrationRecord> migratedNoticeList;
170 CkVec<RetainedMigratedObject *> retainedObjectList;
171 int donotCountMigration=0;
172 int countLBMigratedAway=0;
173 int countLBToMigrate=0;
174 int migrationDoneCalled=0;
175 int checkpointBarrierCount=0;
176 int globalResumeCount=0;
177 CkGroupID globalLBID;
178 int restartDecisionNumber=-1;
179
180
181 double lastCompletedAlarm=0;
182 double lastRestart=0;
183
184
185 //update location globals
186 int _receiveLocationHandlerIdx;
187
188
189
190 // initialize message logging datastructures and register handlers
191 void _messageLoggingInit(){
192         //current object
193         CpvInitialize(Chare *,_currentObj);
194         
195         //registering handlers for message logging
196         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
197         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
198         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
199         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
200         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
201         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
202         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
203         _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
204         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
205
206                 
207         //handlers for checkpointing
208         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
209         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
210         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
211         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
212
213
214         //handlers for restart
215         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
216         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
217         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
218         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
219         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
220         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
221         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
222         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
223         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
224         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
225         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
226
227         //TML: handlers for team-based message logging
228         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
229         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
230         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
231
232         
233         //handlers for load balancing
234         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
235         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
236         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
237         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
238         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
239         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
240         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
241         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
242         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
243
244         
245         //handlers for updating locations
246         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
247         
248         //Cpv variables for message logging
249         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
250         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
251         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
252         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
253         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
254         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
255         CpvInitialize(Queue, _outOfOrderMessageQueue);
256         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
257         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
258         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
259         
260         CpvInitialize(char **,_bufferedTicketRequests);
261         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
262         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
263         for(int i=0;i<CkNumPes();i++){
264                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
265                 CpvAccess(_numBufferedTicketRequests)[i]=0;
266         }
267   CpvInitialize(char *,_bufferTicketReply);
268         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
269         
270 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
271         CcdCallFnAfter(retryTicketRequest,NULL,100);    
272         
273         
274         //Cpv variables for checkpoint
275         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
276         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
277         
278 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
279 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
280 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
281         if(CkMyPe() == 0){
282 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
283 #ifdef  BUFFERED_LOCAL
284                 if(CmiMyPe() == 0){
285                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
286                 }
287 #endif
288         }
289 #ifdef  BUFFERED_REMOTE
290         if(CmiMyPe() == 0){
291                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
292         }
293 #endif
294
295         traceRegisterUserEvent("Remove Logs", 20);
296         traceRegisterUserEvent("Ticket Request Handler", 21);
297         traceRegisterUserEvent("Ticket Handler", 22);
298         traceRegisterUserEvent("Local Message Copy Handler", 23);
299         traceRegisterUserEvent("Local Message Ack Handler", 24);        
300         traceRegisterUserEvent("Preprocess current message",25);
301         traceRegisterUserEvent("Preprocess past message",26);
302         traceRegisterUserEvent("Preprocess future message",27);
303         traceRegisterUserEvent("Checkpoint",28);
304         traceRegisterUserEvent("Checkpoint Store",29);
305         traceRegisterUserEvent("Checkpoint Ack",30);
306         
307         traceRegisterUserEvent("Send Ticket Request",31);
308         traceRegisterUserEvent("Generalticketrequest1",32);
309         traceRegisterUserEvent("TicketLogLocal",33);
310         traceRegisterUserEvent("next_ticket and SN",34);
311         traceRegisterUserEvent("Timeout for buffered remote messages",35);
312         traceRegisterUserEvent("Timeout for buffered local messages",36);
313         traceRegisterUserEvent("Inform Location Home",37);
314         traceRegisterUserEvent("Receive Location Handler",38);
315         
316         lastCompletedAlarm=CmiWallTimer();
317         lastRestart = CmiWallTimer();
318 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
319         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
320 }
321
322 void killLocal(void *_dummy,double curWallTime);        
323
324 void readKillFile(){
325         FILE *fp=fopen(killFile,"r");
326         if(!fp){
327                 return;
328         }
329         int proc;
330         double sec;
331         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
332                 if(proc == CkMyPe()){
333                         killTime = CmiWallTimer()+sec;
334                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
335                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
336                 }
337         }
338         fclose(fp);
339 }
340
341 void killLocal(void *_dummy,double curWallTime){
342         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
343         if(CmiWallTimer()<killTime-1){
344                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
345         }else{  
346                 kill(getpid(),SIGKILL);
347         }
348 }
349
350
351
352 /************************ Message logging methods ****************/
353
354 // send a ticket request to a group on a processor
355 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
356         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
357                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
358                 void *origMsg = EnvToUsr(env);
359                 for(int i=0;i<CmiNumPes();i++){
360                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
361                                 void *copyMsg = CkCopyMsg(&origMsg);
362                                 envelope *copyEnv = UsrToEnv(copyMsg);
363                                 copyEnv->SN=0;
364                                 copyEnv->TN=0;
365                                 copyEnv->sender.type = TypeInvalid;
366                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
367                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
368                         }
369                 }
370                 return;
371         }
372         CkObjID recver;
373         recver.type = TypeGroup;
374         recver.data.group.id = env->getGroupNum();
375         recver.data.group.onPE = destPE;
376 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
377                 CmiPrintStackTrace(0);
378         }*/
379         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
380 }
381
382 //send a ticket request to a nodegroup
383 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
384         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
385                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
386                 void *origMsg = EnvToUsr(env);
387                 for(int i=0;i<CmiNumNodes();i++){
388                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
389                                 void *copyMsg = CkCopyMsg(&origMsg);
390                                 envelope *copyEnv = UsrToEnv(copyMsg);
391                                 copyEnv->SN=0;
392                                 copyEnv->TN=0;
393                                 copyEnv->sender.type = TypeInvalid;
394                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
395                         }
396                 }
397                 return;
398         }
399         CkObjID recver;
400         recver.type = TypeNodeGroup;
401         recver.data.group.id = env->getGroupNum();
402         recver.data.group.onPE = destNode;
403         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
404 }
405
406 //send a ticket request to an array element
407 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
408         CkObjID recver;
409         recver.type = TypeArray;
410         recver.data.array.id = env->getsetArrayMgr();
411         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
412
413         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
414                 char recverString[100],senderString[100];
415                 
416                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
417         }
418
419         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
420 };
421
422 /**
423  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
424  */
425 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
426         envelope *env = _env;
427         MCount ticketNumber = 0;
428         int resend=0; //is it a resend
429         char recverName[100];
430         double _startTime=CkWallTimer();
431         
432         if(CpvAccess(_currentObj) == NULL){
433 //              CkAssert(0);
434                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
435                 generalCldEnqueue(destPE,env,_infoIdx);
436                 return;
437         }
438         
439         if(env->sender.type == TypeInvalid){
440                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
441                 //Set message logging data in the envelope
442         }else{
443                 envelope *copyEnv = copyEnvelope(env);
444                 env = copyEnv;
445                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
446                 env->SN = 0;
447         }
448         
449         CkObjID &sender = env->sender;
450         env->recver = recver;
451
452         Chare *obj = (Chare *)env->sender.getObject();
453           
454         if(env->SN == 0){
455                 env->SN = obj->mlogData->nextSN(recver);
456         }else{
457                 resend = 1;
458         }
459         
460         char senderString[100];
461 //      if(env->SN != 1){
462                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
463         //      CmiPrintStackTrace(0);
464 /*      }else{
465                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
466         }*/
467                 
468         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
469 //      CkPackMessage(&(mEntry->env));
470 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
471         
472         _startTime = CkWallTimer();
473
474         // uses the proper ticketing mechanism for local, group and general messages
475         if(isLocal(destPE)){
476                 ticketLogLocalMessage(mEntry);
477         }else{
478                 if((teamSize > 1) && isTeamLocal(destPE)){
479
480                         // look to see if this message already has a ticket in the team-table
481                         Chare *senderObj = (Chare *)sender.getObject();
482                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
483                         if(ticketRow != NULL){
484                                 Ticket ticket = ticketRow->get(env->SN);
485                                 if(ticket.TN != 0){
486                                         ticketNumber = ticket.TN;
487                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
488                                 }
489                         }
490                 }
491                 
492                 // sending the ticket request
493                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
494                 
495         }
496 }
497
498 /**
499  * Determines if the message is local or not. A message is local if:
500  * 1) Both the destination and origin are the same PE.
501  */
502 inline bool isLocal(int destPE){
503         // both the destination and the origin are the same PE
504         if(destPE == CkMyPe())
505                 return true;
506
507         return false;
508 }
509
510 /**
511  * Determines if the message is group local or not. A message is group local if:
512  * 1) They belong to the same group in the group-based message logging.
513  */
514 inline bool isTeamLocal(int destPE){
515
516         // they belong to the same group
517         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
518                 return true;
519
520         return false;
521 }
522
523
524
525 /**
526  * Method that does the actual send by creating a ticket request filling it up and sending it.
527  */
528 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
529         char recverString[100],senderString[100];
530         envelope *env = entry->env;
531         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
532 /*      envelope *env = entry->env;
533         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
534
535         Chare *obj = (Chare *)entry->env->sender.getObject();
536         if(!resend){
537                 //TML: only stores message if either it goes to this processor or to a processor in a different group
538                 if(!isTeamLocal(entry->destPE)){
539                         obj->mlogData->addLogEntry(entry);
540                         MLOGFT_totalMessages += 1.0;
541                         MLOGFT_totalLogSize += entry->env->getTotalsize();
542                 }else{
543                         // the message has to be deleted after it has been sent
544                         entry->env->freeMsg = true;
545                 }
546         }
547
548 #ifdef BUFFERED_REMOTE
549         //buffer the ticket request 
550         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
551                 //first message to this processor, buffer needs to be created
552                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
553                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
554                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
555         }
556         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
557         //Buffer the ticketrequests
558         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
559         ticketRequest->sender = sender;
560         ticketRequest->recver = recver;
561         ticketRequest->logEntry = entry;
562         ticketRequest->SN = SN;
563         ticketRequest->TN = TN;
564         ticketRequest->senderPE = CkMyPe();
565
566         CpvAccess(_numBufferedTicketRequests)[destPE]++;
567         
568         
569         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
570                 sendBufferedTicketRequests(destPE);
571         }else{
572                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
573                         int *checkPE = new int;
574                         *checkPE = destPE;
575                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
576                 }
577         }
578 #else
579
580         TicketRequest ticketRequest;
581         ticketRequest.sender = sender;
582         ticketRequest.recver = recver;
583         ticketRequest.logEntry = entry;
584         ticketRequest.SN = SN;
585         ticketRequest.TN = TN;
586         ticketRequest.senderPE = CkMyPe();
587         
588         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
589 //      CmiBecomeImmediate(&ticketRequest);
590         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
591 #endif
592         DEBUG_MEM(CmiMemoryCheck());
593 };
594
595 /**
596  * Send the ticket requests buffered for processor PE
597  **/
598 void sendBufferedTicketRequests(int destPE){
599         DEBUG_MEM(CmiMemoryCheck());
600         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
601         if(numberRequests == 0){
602                 return;
603         }
604         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
605         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
606         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
607         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
608         header->numberLogs = numberRequests;
609         
610         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
611         CmiSyncSend(destPE,totalSize,(char *)buf);
612         
613         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
614         DEBUG_MEM(CmiMemoryCheck());
615 };
616
617 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
618         int destPE = *(int *)_destPE;
619   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
620                 sendBufferedTicketRequests(destPE);
621 //              traceUserEvent(35);
622         }
623         delete (int *)_destPE;
624         DEBUG_MEM(CmiMemoryCheck());
625 };
626
627 /**
628  * Gets a ticket for a local message and then sends a copy to the buddy.
629  * This method is always in the main thread(not interrupt).. so it should 
630  * never find itself locked out of a newTicket.
631  */
632 void ticketLogLocalMessage(MlogEntry *entry){
633         double _startTime=CkWallTimer();
634         DEBUG_MEM(CmiMemoryCheck());
635
636         Chare *recverObj = (Chare *)entry->env->recver.getObject();
637         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
638         if(recverObj){
639                 //Consider the case, after a restart when this message has already been allotted a ticket number
640                 // and should get the same one as the old one.
641                 Ticket ticket;
642                 if(recverObj->mlogData->mapTable.numObjects() > 0){
643                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
644                 }else{
645                         ticket.TN = 0;
646                 }
647                 
648                 char senderString[100], recverString[100] ;
649                 
650                 if(ticket.TN == 0){
651                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
652         
653                         if(ticket.TN == 0){
654                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
655                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
656                                 
657         //              _lockNewTicket = 0;
658 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
659                                 return;
660                         }
661                 }       
662                 //TODO: check for the case when an invalid ticket is returned
663                 //TODO: check for OLD or RECEIVED TICKETS
664                 entry->env->TN = ticket.TN;
665                 CkAssert(entry->env->TN > 0);
666                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
667         
668                 // sends a copy of the metadata to the buddy    
669                 sendLocalMessageCopy(entry);
670                 
671                 DEBUG_MEM(CmiMemoryCheck());
672
673                 // sets the unackedLocal flag and stores the message in the log
674                 entry->unackedLocal = 1;
675                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
676
677                 DEBUG_MEM(CmiMemoryCheck());
678         }else{
679                 CkPrintf("[%d] Local message in team-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
680                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
681         }
682         _lockNewTicket=0;
683 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
684 };
685
686 /**
687  * Sends the metadata of a local message to its buddy.
688  */
689 void sendLocalMessageCopy(MlogEntry *entry){
690         LocalMessageLog msgLog;
691         msgLog.sender = entry->env->sender;
692         msgLog.recver = entry->env->recver;
693         msgLog.SN = entry->env->SN;
694         msgLog.TN = entry->env->TN;
695         msgLog.entry = entry;
696         msgLog.senderPE = CkMyPe();
697         
698         char recvString[100];
699         char senderString[100];
700         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
701
702 #ifdef BUFFERED_LOCAL
703         countLocal++;
704         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
705         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
706                 sendBufferedLocalMessageCopy();
707         }else{
708                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
709                         lastBufferedLocalMessageCopyTime = CkWallTimer();
710                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
711                         countClearBufferedLocalCalls++;
712                 }       
713         }
714 #else   
715         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
716         
717         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
718 #endif
719         DEBUG_MEM(CmiMemoryCheck());
720 };
721
722
723 void sendBufferedLocalMessageCopy(){
724         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
725         if(numberLogs == 0){
726                 return;
727         }
728         countBuffered++;
729         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
730         void *buf=CmiAlloc(totalSize);
731         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
732         header->numberLogs=numberLogs;
733
734         DEBUG_MEM(CmiMemoryCheck());
735         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
736         
737         char *ptr = (char *)buf;
738         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
739         
740         for(int i=0;i<numberLogs;i++){
741                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
742                 memcpy(ptr,&log,sizeof(LocalMessageLog));
743                 ptr = &ptr[sizeof(LocalMessageLog)];
744         }
745
746         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
747
748         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
749         DEBUG_MEM(CmiMemoryCheck());
750 };
751
752 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
753         countClearBufferedLocalCalls--;
754         if(countClearBufferedLocalCalls > 10){
755                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
756         }
757         DEBUG_MEM(CmiMemoryCheck());
758         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
759         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
760                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
761                         sendBufferedLocalMessageCopy();
762 //                      traceUserEvent(36);
763                 }
764         }
765         DEBUG_MEM(CmiMemoryCheck());
766 }
767
768 /****
769         The handler functions
770 *****/
771
772
773 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
774 /**
775  *  If there are any delayed requests, process them first before 
776  *  processing this request
777  * */
778 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
779         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
780         double  _startTime = CkWallTimer();
781         if(CpvAccess(_delayedTicketRequests)->length() > 0){
782                 retryTicketRequest(NULL,_startTime);
783         }
784         _processTicketRequest(ticketRequest);
785         CmiFree(ticketRequest);
786 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
787 }
788 /** Handler used for dealing with a bunch of ticket requests
789  * from one processor. The replies are also bunched together
790  * Does not use _ticketRequestHandler
791  * */
792 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
793         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
794         DEBUG_MEM(CmiMemoryCheck());
795         double _startTime = CkWallTimer();
796         if(CpvAccess(_delayedTicketRequests)->length() > 0){
797                 retryTicketRequest(NULL,_startTime);
798         }
799         DEBUG_MEM(CmiMemoryCheck());
800   int numRequests = recvdHeader->numberLogs;
801         char *msg = (char *)recvdHeader;
802         msg = &msg[sizeof(BufferedTicketRequestHeader)];
803         int senderPE=((TicketRequest *)msg)->senderPE;
804
805         
806         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
807         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
808         
809         char *ptr = (char *)buf;
810         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
811         header->numberLogs = 0;
812
813         DEBUG_MEM(CmiMemoryCheck());
814         
815         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
816         
817         for(int i=0;i<numRequests;i++){
818                 TicketRequest *request = (TicketRequest *)msg;
819                 msg = &msg[sizeof(TicketRequest)];
820                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
821
822                 if(replied){
823                         //the ticket request has been processed and 
824                         //the reply will be stored in the ptr
825                         header->numberLogs++;
826                         ptr = &ptr[sizeof(TicketReply)];
827                 }
828         }
829 /*      if(header->numberLogs == 0){
830                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
831         }*/
832
833         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
834         CmiSyncSend(senderPE,totalSize,(char *)buf);
835         CmiFree(recvdHeader);
836 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
837         DEBUG_MEM(CmiMemoryCheck());
838 };
839
840 /**Process the ticket request. 
841  * If it is processed and a reply is being sent 
842  * by this processor return true
843  * else return false.
844  * If a reply buffer is specified put the reply into that
845  * else send the reply
846  * */
847 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
848
849 /*      if(_lockNewTicket){
850                 printf("ddeded %d\n",CkMyPe());
851                 if(CmiIsImmediate(ticketRequest)){
852                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
853                 }
854                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
855                 
856         }else{
857                 _lockNewTicket = 1;
858         }*/
859
860         DEBUG_MEM(CmiMemoryCheck());
861
862         // getting information from request
863         CkObjID sender = ticketRequest->sender;
864         CkObjID recver = ticketRequest->recver;
865         MCount SN = ticketRequest->SN;
866         MCount TN = ticketRequest->TN;
867         Chare *recverObj = (Chare *)recver.getObject();
868         
869         DEBUG(char recverName[100]);
870         DEBUG(recver.toString(recverName);)
871
872         if(recverObj == NULL){
873                 int estPE = recver.guessPE();
874                 if(estPE == CkMyPe() || estPE == -1){           
875                         //try to fulfill the request after some time
876                         char senderString[100];
877                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
878                         if(estPE == CkMyPe() && recver.type == TypeArray){
879                                 CkArrayID aid(recver.data.array.id);            
880                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
881                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
882                         }
883                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
884                         *delayed = *ticketRequest;
885                         CpvAccess(_delayedTicketRequests)->enq(delayed);
886                         
887                 }else{
888                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
889                         TicketRequest forward = *ticketRequest;
890                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
891                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);
892                 }
893         DEBUG_MEM(CmiMemoryCheck());
894                 return false; // if the receverObj does not exist the ticket request cannot have been 
895                               // processed successfully
896         }else{
897                 char senderString[100];
898                 
899                 Ticket ticket;
900
901                 // checking if the message is team local and if it has a ticket already assigned
902                 if(teamSize > 1 && TN != 0){
903                         DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
904                         ticket.TN = TN;
905                         recverObj->mlogData->verifyTicket(sender,SN,TN);
906                 }
907
908                 //check if a ticket for this has been already handed out to an object that used to be local but 
909                 // is no longer so.. need for parallel restart
910                 if(recverObj->mlogData->mapTable.numObjects() > 0){
911                         
912                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
913                 }
914                 
915                 if(ticket.TN == 0){
916                         ticket = recverObj->mlogData->next_ticket(sender,SN);
917                 }
918                 if(ticket.TN > recverObj->mlogData->tProcessed){
919                         ticket.state = NEW_TICKET;
920                 }else{
921                         ticket.state = OLD_TICKET;
922                 }
923                 //TODO: check for the case when an invalid ticket is returned
924                 if(ticket.TN == 0){
925                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
926                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
927                         *delayed = *ticketRequest;
928                         CpvAccess(_delayedTicketRequests)->enq(delayed);
929                         return false;
930                 }
931 /*              if(ticket.TN < SN){ //error state this really should not happen
932                         recver.toString(recverName);
933                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
934                 }*/
935 //              CkAssert(ticket.TN >= SN);
936                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
937 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
938     if(reply == NULL){ 
939                         //There is no reply buffer and the ticketreply is going to be 
940                         //sent immediately
941                         TicketReply ticketReply;
942                         ticketReply.request = *ticketRequest;
943                         ticketReply.ticket = ticket;
944                         ticketReply.recverPE = CkMyPe();
945                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
946 //              CmiBecomeImmediate(&ticketReply);
947                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
948          }else{ // Store ticket reply in the buffer provided
949                  reply->request = *ticketRequest;
950                  reply->ticket = ticket;
951                  reply->recverPE = CkMyPe();
952                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
953                                                          // in case the ticket needs to be forwarded or something
954          }
955                 DEBUG_MEM(CmiMemoryCheck());
956                 return true;
957         }
958 //      _lockNewTicket=0;
959 };
960
961
962 /**
963  * @brief This function handles the ticket received after a request.
964  */
965 inline void _ticketHandler(TicketReply *ticketReply){
966
967         double _startTime = CkWallTimer();
968         DEBUG_MEM(CmiMemoryCheck());    
969         
970         char senderString[100];
971         CkObjID sender = ticketReply->request.sender;
972         CkObjID recver = ticketReply->request.recver;
973         
974         if(sender.guessPE() != CkMyPe()){
975                 DEBUG(CkAssert(sender.guessPE()>= 0));
976                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
977         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
978                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
979                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
980 #ifdef BUFFERED_REMOTE
981                 //will be freed by the buffered ticket handler most of the time
982                 //this might lead to a leak just after migration
983                 //when the ticketHandler is directly used without going through the buffered handler
984                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
985 #else
986                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
987 #endif  
988         }else{
989                 char recverName[100];
990                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
991                 MlogEntry *logEntry=NULL;
992                 if(ticketReply->ticket.state & FORWARDED_TICKET){
993                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
994                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
995                         Chare *senderObj = (Chare *)sender.getObject();
996                         if(senderObj){
997                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
998                                 for(int i=0;i<mlog->length();i++){
999                                         MlogEntry *tempEntry = (*mlog)[i];
1000                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
1001                                                 logEntry = tempEntry;
1002                                                 break;
1003                                         }
1004                                 }
1005                                 if(logEntry == NULL){
1006 #ifdef BUFFERED_REMOTE
1007 #else
1008                                         CmiFree(ticketReply);
1009 #endif                                  
1010                                         return;
1011                                 }
1012                         }else{
1013                                 CmiAbort("This processor thinks it should have the sender\n");
1014                         }
1015                         ticketReply->ticket.state ^= FORWARDED_TICKET;
1016                 }else{
1017                         logEntry = ticketReply->request.logEntry;
1018                 }
1019                 if(logEntry->env->TN <= 0){
1020                         //This logEntry has not received a TN earlier
1021                         char recverString[100];
1022                         logEntry->env->TN = ticketReply->ticket.TN;
1023                         logEntry->env->setSrcPe(CkMyPe());
1024                         if(ticketReply->ticket.state == NEW_TICKET){
1025
1026                                 // if message is group local, we store its metadata in teamTable
1027                                 if(isTeamLocal(ticketReply->recverPE)){
1028                                         //DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
1029                                         Chare *senderObj = (Chare *)sender.getObject();
1030                                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
1031                                         if(ticketRow == NULL){
1032                                                 ticketRow = new SNToTicket();
1033                                                 senderObj->mlogData->teamTable.put(recver) = ticketRow; 
1034                                         }
1035                                         ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
1036                                 }
1037
1038                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
1039                                 if(ticketReply->recverPE != CkMyPe()){
1040                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
1041                                 }else{
1042                                         //It is now a local message use the local message protocol
1043                                         sendLocalMessageCopy(logEntry);
1044                                 }       
1045                         }
1046                 }else{
1047                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
1048                 }
1049                 recver.updatePosition(ticketReply->recverPE);
1050 #ifdef BUFFERED_REMOTE
1051 #else
1052                 CmiFree(ticketReply);
1053 #endif
1054         }
1055         CmiMemoryCheck();
1056
1057 //      traceUserBracketEvent(22,_startTime,CkWallTimer());     
1058 };
1059
1060 /**
1061  * Message to handle the bunch of tickets 
1062  * that we get from one processor. We send 
1063  * the tickets to be handled one at a time
1064  * */
1065
1066 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
1067         double _startTime=CmiWallTimer();
1068         int numTickets = recvdHeader->numberLogs;
1069         char *msg = (char *)recvdHeader;
1070         msg = &msg[sizeof(BufferedTicketRequestHeader)];
1071         DEBUG_MEM(CmiMemoryCheck());
1072         
1073         TicketReply *_reply = (TicketReply *)msg;
1074
1075         
1076         for(int i=0;i<numTickets;i++){
1077                 TicketReply *reply = (TicketReply *)msg;
1078                 _ticketHandler(reply);
1079                 
1080                 msg = &msg[sizeof(TicketReply)];
1081         }
1082         
1083         CmiFree(recvdHeader);
1084 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1085         DEBUG_MEM(CmiMemoryCheck());
1086 };
1087
1088 /**
1089  * Stores the metadata of a local message from its buddy.
1090  */
1091 void _localMessageCopyHandler(LocalMessageLog *msgLog){
1092         double _startTime = CkWallTimer();
1093         
1094         char senderString[100],recverString[100];
1095         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1096 /*      if(!fault_aware(msgLog->recver)){
1097                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1098         }*/
1099         CpvAccess(_localMessageLog)->enq(*msgLog);
1100         
1101         LocalMessageLogAck ack;
1102         ack.entry = msgLog->entry;
1103         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1104         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1105         CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
1106         
1107 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1108 };
1109
1110 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1111         double _startTime = CkWallTimer();
1112         DEBUG_MEM(CmiMemoryCheck());
1113         
1114         int numLogs = recvdHeader->numberLogs;
1115         char *msg = (char *)recvdHeader;
1116
1117         //piggy back the logs already stored on this processor
1118         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1119         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1120 /*      if(numPiggyLogs > 0){
1121                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1122                         CmiAssert(0);
1123                 }
1124         }*/
1125         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1126         
1127         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1128         void *buf = CmiAlloc(totalSize);
1129         char *ptr = (char *)buf;
1130         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1131         
1132         msg = &msg[sizeof(BufferedLocalLogHeader)];
1133         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1134
1135         DEBUG_MEM(CmiMemoryCheck());
1136         int PE;
1137         for(int i=0;i<numLogs;i++){
1138                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1139                 CpvAccess(_localMessageLog)->enq(*msgLog);
1140                 PE = msgLog->senderPE;
1141                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1142
1143                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1144                 ack->entry = msgLog->entry;
1145                 
1146                 msg = &msg[sizeof(LocalMessageLog)];
1147                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1148         }
1149         DEBUG_MEM(CmiMemoryCheck());
1150
1151         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1152         piggyHeader->numberLogs = numPiggyLogs;
1153         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1154         if(numPiggyLogs > 0){
1155                 countPiggy++;
1156         }
1157
1158         for(int i=0;i<numPiggyLogs;i++){
1159                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1160                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1161                 ptr = &ptr[sizeof(LocalMessageLog)];
1162         }
1163         DEBUG_MEM(CmiMemoryCheck());
1164         
1165         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1166         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1167                 
1168 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1169                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1170                         if(!fault_aware(localLogEntry.recver)){
1171                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1172                         }
1173         }*/
1174         if(freeHeader){
1175                 CmiFree(recvdHeader);
1176         }
1177         DEBUG_MEM(CmiMemoryCheck());
1178 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1179 }
1180
1181
1182 void _localMessageAckHandler(LocalMessageLogAck *ack){
1183         
1184         double _startTime = CkWallTimer();
1185         
1186         MlogEntry *entry = ack->entry;
1187         if(entry == NULL){
1188                 CkExit();
1189         }
1190         entry->unackedLocal = 0;
1191         envelope *env = entry->env;
1192         char recverName[100];
1193         char senderString[100];
1194         DEBUG_MEM(CmiMemoryCheck());
1195         
1196         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1197         if(env == NULL)
1198                 return;
1199         CkAssert(env->SN > 0);
1200         CkAssert(env->TN > 0);
1201         env->sender.toString(senderString);
1202         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1203         env->recver.toString(recverName);
1204
1205         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1206         
1207 /*      
1208         void *origMsg = EnvToUsr(env);
1209         void *copyMsg = CkCopyMsg(&origMsg);
1210         envelope *copyEnv = UsrToEnv(copyMsg);
1211         entry->env = UsrToEnv(origMsg);*/
1212
1213 //      envelope *copyEnv = copyEnvelope(env);
1214
1215         envelope *copyEnv = env;
1216         copyEnv->localMlogEntry = entry;
1217
1218         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1219         if(CmiMyPe() != entry->destPE){
1220                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1221         }
1222 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1223         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1224         
1225         
1226 #ifdef BUFFERED_LOCAL
1227 #else
1228         CmiFree(ack);
1229 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1230 #endif
1231         
1232         DEBUG_MEM(CmiMemoryCheck());
1233         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1234 }
1235
1236
1237 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1238
1239         double _startTime=CkWallTimer();
1240         DEBUG_MEM(CmiMemoryCheck());
1241
1242         int numLogs = recvdHeader->numberLogs;
1243         char *msg = (char *)recvdHeader;
1244         msg = &msg[sizeof(BufferedLocalLogHeader)];
1245
1246         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1247         
1248         for(int i=0;i<numLogs;i++){
1249                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1250                 _localMessageAckHandler(ack);
1251                 
1252                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1253         }
1254
1255         //deal with piggy backed local logs
1256         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1257         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1258         if(piggyHeader->numberLogs > 0){
1259                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1260         }
1261         
1262         CmiFree(recvdHeader);
1263         DEBUG_MEM(CmiMemoryCheck());
1264 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1265 }
1266
1267 bool fault_aware(CkObjID &recver){
1268         switch(recver.type){
1269                 case TypeChare:
1270                         return false;
1271                 case TypeMainChare:
1272                         return false;
1273                 case TypeGroup:
1274                 case TypeNodeGroup:
1275                 case TypeArray:
1276                         return true;
1277                 default:
1278                         return false;
1279         }
1280 };
1281
1282 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1283         char recverString[100];
1284         char senderString[100];
1285         
1286         DEBUG_MEM(CmiMemoryCheck());
1287         CkObjID recver = env->recver;
1288         if(!fault_aware(recver))
1289                 return 1;
1290
1291
1292         Chare *obj = (Chare *)recver.getObject();
1293         *objPointer = obj;
1294         if(obj == NULL){
1295                 int possiblePE = recver.guessPE();
1296                 if(possiblePE != CkMyPe()){
1297                         int totalSize = env->getTotalsize();                    
1298                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1299                 }
1300                 return 0;
1301         }
1302
1303
1304         double _startTime = CkWallTimer();
1305 //env->sender.updatePosition(env->getSrcPe());
1306         if(env->TN == obj->mlogData->tProcessed+1){
1307                 //the message that needs to be processed now
1308                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1309                 // once we find a message that we can process we put back all the messages in the out of order queue
1310                 // back into the main scheduler queue. 
1311                 if(env->sender.guessPE() == CkMyPe()){
1312                         *logEntryPointer = env->localMlogEntry;
1313                 }
1314         DEBUG_MEM(CmiMemoryCheck());
1315                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1316                         void *qMsgPtr;
1317                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1318                         envelope *qEnv = (envelope *)qMsgPtr;
1319                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1320         DEBUG_MEM(CmiMemoryCheck());
1321                 }
1322 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1323                 //TODO: this might be a problem.. change made for leanMD
1324 //              CpvAccess(_currentObj) = obj;
1325         DEBUG_MEM(CmiMemoryCheck());
1326                 return 1;
1327         }
1328         if(env->TN <= obj->mlogData->tProcessed){
1329                 //message already processed
1330                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1331 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1332         DEBUG_MEM(CmiMemoryCheck());
1333                 return 0;
1334         }
1335         //message that needs to be processed in the future
1336
1337 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1338         //the message cant be processed now put it back in the out of order message Q, 
1339         //It will be transferred to the main queue later
1340         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1341 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1342         DEBUG_MEM(CmiMemoryCheck());
1343         
1344         return 0;
1345 }
1346
1347 /**
1348  * @brief Updates a few variables once a message has been processed.
1349  */
1350 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1351         DEBUG(char senderString[100]);
1352         if(obj){
1353                 if(sender.guessPE() == CkMyPe()){
1354                         if(entry != NULL){
1355                                 entry->env = NULL;
1356                         }
1357                 }
1358                 obj->mlogData->tProcessed++;
1359 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1360                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1361 //              CpvAccess(_currentObj)= NULL;
1362         }
1363         DEBUG_MEM(CmiMemoryCheck());
1364 }
1365
1366 /***
1367         Helpers for the handlers and message logging methods
1368 ***/
1369
1370 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1371 //      double _startTime = CkWallTimer();
1372         if(env->recver.type != TypeNodeGroup){
1373         //This repeats a step performed in skipCldEnq for messages sent to
1374         //other processors. I do this here so that messages to local processors
1375         //undergo the same transformation.. It lets the resend be uniform for 
1376         //all messages
1377 //              CmiSetXHandler(env,CmiGetHandler(env));
1378                 _skipCldEnqueue(destPE,env,_infoIdx);
1379         }else{
1380                 _noCldNodeEnqueue(destPE,env);
1381         }
1382 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1383 }
1384 //extern "C" int CmiGetNonLocalLength();
1385 /** This method is used to retry the ticket requests
1386  * that had been queued up earlier
1387  * */
1388
1389 int calledRetryTicketRequest=0;
1390
1391 void retryTicketRequestTimer(void *_dummy,double _time){
1392                 calledRetryTicketRequest=0;
1393                 retryTicketRequest(_dummy,_time);
1394 }
1395
1396 void retryTicketRequest(void *_dummy,double curWallTime){       
1397         double start = CkWallTimer();
1398         DEBUG_MEM(CmiMemoryCheck());
1399         int length = CpvAccess(_delayedTicketRequests)->length();
1400         for(int i=0;i<length;i++){
1401                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1402                 if(ticketRequest){
1403                         char senderString[100],recverString[100];
1404                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1405                         DEBUG_MEM(CmiMemoryCheck());
1406                         _processTicketRequest(ticketRequest);
1407                   CmiFree(ticketRequest);
1408                         DEBUG_MEM(CmiMemoryCheck());
1409                 }       
1410         }       
1411         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1412                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1413                 ticketLogLocalMessage(entry);
1414         }
1415         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1416 //      int converse_qLength = CmiGetNonLocalLength();
1417         
1418 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1419
1420 /*      PingMsg pingMsg;
1421         pingMsg.PE = CkMyPe();
1422         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1423         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1424                 for(int i=0;i<CkNumPes();i++){
1425                         if(i != CkMyPe()){
1426                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1427                         }
1428                 }
1429         }*/     
1430         //TODO: change this back to 100
1431         if(calledRetryTicketRequest == 0){
1432                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1433                 calledRetryTicketRequest =1;
1434         }
1435         DEBUG_MEM(CmiMemoryCheck());
1436 }
1437
1438 void _pingHandler(CkPingMsg *msg){
1439         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1440         CmiFree(msg);
1441 }
1442
1443
1444 /*****************************************************************************
1445         Checkpointing methods..
1446                 Pack all the data on a processor and send it to the buddy periodically
1447                 Also used to throw away message logs
1448 *****************************************************************************/
1449 CkVec<TProcessedLog> processedTicketLog;
1450 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1451 void clearUpMigratedRetainedLists(int PE);
1452
1453 void checkpointAlarm(void *_dummy,double curWallTime){
1454         double diff = curWallTime-lastCompletedAlarm;
1455         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1456 /*      if(CkWallTimer()-lastRestart < 50){
1457                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1458                 return;
1459         }*/
1460         if(diff < ((chkptPeriod) - 2)){
1461                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1462                 return;
1463         }
1464         CheckpointRequest request;
1465         request.PE = CkMyPe();
1466         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1467         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1468 };
1469
1470 void _checkpointRequestHandler(CheckpointRequest *request){
1471         startMlogCheckpoint(NULL,CmiWallTimer());       
1472 }
1473
1474 void startMlogCheckpoint(void *_dummy,double curWallTime){
1475         double _startTime = CkWallTimer();
1476         checkpointCount++;
1477 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1478                 kill(getpid(),SIGKILL);
1479         }*/
1480         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1481                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1482         }
1483         PUP::sizer psizer;
1484         DEBUG_MEM(CmiMemoryCheck());
1485
1486         psizer | checkpointCount;
1487         
1488         CkPupROData(psizer);
1489         DEBUG_MEM(CmiMemoryCheck());
1490         CkPupGroupData(psizer,CmiTrue);
1491         DEBUG_MEM(CmiMemoryCheck());
1492         CkPupNodeGroupData(psizer,CmiTrue);
1493         DEBUG_MEM(CmiMemoryCheck());
1494         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1495         DEBUG_MEM(CmiMemoryCheck());
1496
1497         int dataSize = psizer.size();
1498         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1499         char *msg = (char *)CmiAlloc(totalSize);
1500         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1501         chkMsg->PE = CkMyPe();
1502         chkMsg->dataSize = dataSize;
1503
1504         
1505         char *buf = &msg[sizeof(CheckPointDataMsg)];
1506         PUP::toMem pBuf(buf);   
1507
1508         pBuf | checkpointCount;
1509         
1510         CkPupROData(pBuf);
1511         CkPupGroupData(pBuf,CmiTrue);
1512         CkPupNodeGroupData(pBuf,CmiTrue);
1513         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1514
1515         unAckedCheckpoint=1;
1516         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1517         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1518         
1519         /*
1520                 Store the highest Ticket number processed for each chare on this processor
1521         */
1522         processedTicketLog.removeAll();
1523         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1524         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1525                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1526         }
1527
1528         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1529                 lastCompletedAlarm = curWallTime;
1530                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1531         }
1532         traceUserBracketEvent(28,_startTime,CkWallTimer());
1533 };
1534
1535 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1536         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1537         TProcessedLog logEntry;
1538         logEntry.recver = mlogData->objID;
1539         logEntry.tProcessed = mlogData->tProcessed;
1540         log->push_back(logEntry);
1541         char objString[100];
1542         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1543 }
1544
1545 class ElementPacker : public CkLocIterator {
1546 private:
1547         CkLocMgr *locMgr;
1548         PUP::er &p;
1549 public:
1550                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1551                 void addLocation(CkLocation &loc) {
1552                         CkArrayIndexMax idx=loc.getIndex();
1553                         CkGroupID gID = locMgr->ckGetGroupID();
1554                         p|gID;      // store loc mgr's GID as well for easier restore
1555                         p|idx;
1556                         p|loc;
1557     }
1558 };
1559
1560 /**
1561  * Pups all the array elements in this processor.
1562  */
1563 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1564         int numElements,i;
1565         int numGroups = CkpvAccess(_groupIDTable)->size();      
1566         if(!p.isUnpacking()){
1567                 numElements = CkCountArrayElements();
1568         }       
1569         p | numElements;
1570         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1571         if(!p.isUnpacking()){
1572                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1573         }else{
1574                 //Flush all recs of all LocMgrs before putting in new elements
1575 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1576                 for(int j=0;j<listsize;j++){
1577                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1578                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1579                                 listToSkip[j].idx.print();
1580                         }
1581                 }
1582                 
1583                 printf("numElements = %d\n",numElements);
1584         
1585                 for (int i=0; i<numElements; i++) {
1586                         CkGroupID gID;
1587                         CkArrayIndexMax idx;
1588                         p|gID;
1589                 p|idx;
1590                         int flag=0;
1591                         int matchedIdx=0;
1592                         for(int j=0;j<listsize;j++){
1593                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1594                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1595                                                 matchedIdx = j;
1596                                                 flag = 1;
1597                                                 break;
1598                                         }
1599                                 }
1600                         }
1601                         if(flag == 1){
1602                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1603                         }else{
1604                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1605                         }
1606                                 
1607                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1608                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1609                         mgr->resume(idx,p,create,flag);
1610                         if(flag == 1){
1611                                 int homePE = mgr->homePe(idx);
1612                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1613                         }
1614                 }
1615         }
1616 };
1617
1618
1619 void writeCheckpointToDisk(int size,char *chkpt){
1620         char fNameTemp[100];
1621         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1622         int fd = creat(fNameTemp,S_IRWXU);
1623         int ret = write(fd,chkpt,size);
1624         CkAssert(ret == size);
1625         close(fd);
1626         
1627         char fName[100];
1628         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1629         unlink(fName);
1630
1631         rename(fNameTemp,fName);
1632         
1633 }
1634
1635 //handler that receives the checkpoint from a processor
1636 //it stores it and acks it
1637 void _storeCheckpointHandler(char *msg){
1638         
1639         double _startTime=CkWallTimer();
1640                 
1641         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1642         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1643         
1644         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1645         
1646         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1647         if(oldChkpt != NULL){
1648                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1649                 CmiFree(oldmsg);
1650         }
1651         //turning off storing checkpoints
1652         
1653         int sendingPE = chkMsg->PE;
1654         
1655         CpvAccess(_storedCheckpointData)->buf = chkpt;
1656         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1657         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1658
1659 #ifdef CHECKPOINT_DISK
1660         //store the checkpoint on disk
1661         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1662         CpvAccess(_storedCheckpointData)->buf = NULL;
1663         CmiFree(msg);
1664 #endif
1665
1666         int count=0;
1667         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1668                 if(migratedNoticeList[j].fromPE == sendingPE){
1669                         migratedNoticeList[j].ackFrom = 1;
1670                 }else{
1671                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1672                 }
1673                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1674                         migratedNoticeList.remove(j);
1675                         count++;
1676                 }
1677                 
1678         }
1679         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1680         
1681         CheckPointAck ackMsg;
1682         ackMsg.PE = CkMyPe();
1683         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1684         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1685         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1686         
1687         
1688         
1689         traceUserBracketEvent(29,_startTime,CkWallTimer());
1690 };
1691
1692
1693 void sendRemoveLogRequests(){
1694         double _startTime = CkWallTimer();      
1695         //send out the messages asking senders to throw away message logs below a certain ticket number
1696         /*
1697                 The remove log request message looks like
1698                 |RemoveLogRequest||List of TProcessedLog|
1699         */
1700         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1701         char *requestMsg = (char *)CmiAlloc(totalSize);
1702         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1703         request->PE = CkMyPe();
1704         request->numberObjects = processedTicketLog.size();
1705         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1706         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1707         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1708         
1709         DEBUG_MEM(CmiMemoryCheck());
1710         for(int i=0;i<CkNumPes();i++){
1711                 CmiSyncSend(i,totalSize,requestMsg);
1712         }
1713         CmiFree(requestMsg);
1714
1715         clearUpMigratedRetainedLists(CmiMyPe());
1716         //TODO: clear ticketTable
1717         
1718         traceUserBracketEvent(30,_startTime,CkWallTimer());
1719         DEBUG_MEM(CmiMemoryCheck());
1720 }
1721
1722
1723 void _checkpointAckHandler(CheckPointAck *ackMsg){
1724         DEBUG_MEM(CmiMemoryCheck());
1725         unAckedCheckpoint=0;
1726         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1727         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1728         if(onGoingLoadBalancing){
1729                 onGoingLoadBalancing = 0;
1730                 finishedCheckpointLoadBalancing();
1731         }else{
1732                 sendRemoveLogRequests();
1733         }
1734         CmiFree(ackMsg);
1735         
1736 };
1737
1738 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1739         DEBUG_MEM(CmiMemoryCheck());
1740         CmiMemoryCheck();
1741         char *data = (char *)_data;
1742         RemoveLogRequest *request = (RemoveLogRequest *)data;
1743         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1744         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1745
1746         int count=0;
1747         for(int i=0;i<mlog->length();i++){
1748                 MlogEntry *logEntry = mlog->deq();
1749                 int match=0;
1750                 for(int j=0;j<request->numberObjects;j++){
1751                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1752                                 //this log Entry should be removed
1753                                 match = 1;
1754                                 break;
1755                         }
1756                 }
1757                 char senderString[100],recverString[100];
1758 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1759                 if(match){
1760                         count++;
1761                         delete logEntry;
1762                 }else{
1763                         mlog->enq(logEntry);
1764                 }
1765         }
1766         if(count > 0){
1767                 char nameString[100];
1768                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1769         }
1770         DEBUG_MEM(CmiMemoryCheck());
1771         CmiMemoryCheck();
1772 }
1773
1774 void _removeProcessedLogHandler(char *requestMsg){
1775         double start = CkWallTimer();
1776         forAllCharesDo(removeProcessedLogs,requestMsg);
1777         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1778         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1779         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1780         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1781         if(request->PE == getCheckPointPE()){
1782                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1783                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1784                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1785                 int count=0;
1786 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1787                 DEBUG(char nameString[100];)
1788                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1789                 DEBUG(})*/
1790                 for(int i=0;i<localQ->length();i++){
1791                         LocalMessageLog localLogEntry = (*localQ)[i];
1792                         if(!fault_aware(localLogEntry.recver)){
1793                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1794                         }
1795                         bool keep = true;
1796                         for(int j=0;j<request->numberObjects;j++){                              
1797                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1798                                         keep = false;
1799                                         break;
1800                                 }
1801                         }       
1802                         if(keep){
1803                                 tempQ->enq(localLogEntry);
1804                         }else{
1805                                 count++;
1806                         }
1807                 }
1808                 delete localQ;
1809                 CpvAccess(_localMessageLog) = tempQ;
1810                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1811         }
1812
1813         /*
1814                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1815         */
1816         CmiMemoryCheck();
1817         clearUpMigratedRetainedLists(request->PE);
1818         
1819         traceUserBracketEvent(20,start,CkWallTimer());
1820         CmiFree(requestMsg);    
1821 };
1822
1823
1824 void clearUpMigratedRetainedLists(int PE){
1825         int count=0;
1826         CmiMemoryCheck();
1827         
1828         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1829                 if(migratedNoticeList[j].toPE == PE){
1830                         migratedNoticeList[j].ackTo = 1;
1831                 }
1832                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1833                         migratedNoticeList.remove(j);
1834                         count++;
1835                 }
1836         }
1837         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1838         
1839         for(int j=retainedObjectList.size()-1;j>=0;j--){
1840                 if(retainedObjectList[j]->migRecord.toPE == PE){
1841                         RetainedMigratedObject *obj = retainedObjectList[j];
1842                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1843                         retainedObjectList.remove(j);
1844                         if(obj->msg != NULL){
1845                                 CmiMemoryCheck();
1846                                 CmiFree(obj->msg);
1847                         }
1848                         delete obj;
1849                 }
1850         }
1851 }
1852
1853 /***************************************************************
1854         Restart Methods and handlers
1855 ***************************************************************/        
1856
1857 /**
1858  * Function for restarting the crashed processor.
1859  * It sets the restart flag and contacts the buddy
1860  * processor to get the latest checkpoint.
1861  */
1862 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1863         RestartRequest msg;
1864
1865         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1866
1867         // setting the restart flag
1868         _restartFlag = 1;
1869
1870         // if we are using team-based message logging, all members of the group have to be restarted
1871         if(teamSize > 1){
1872                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1873                         if(i != CkMyPe() && i < CkNumPes()){
1874                                 // sending a message to the team member
1875                                 msg.PE = CkMyPe();
1876                             CmiSetHandler(&msg,_restartHandlerIdx);
1877                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1878                         }
1879                 }
1880         }
1881
1882         // requesting the latest checkpoint from its buddy
1883         msg.PE = CkMyPe();
1884         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1885         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1886 };
1887
1888 /**
1889  * Function to restart this processor.
1890  * The handler is invoked by a member of its same team in message logging.
1891  */
1892 void _restartHandler(RestartRequest *restartMsg){
1893         int i;
1894         int numGroups = CkpvAccess(_groupIDTable)->size();
1895         RestartRequest msg;
1896         
1897         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1898
1899     // setting the restart flag
1900         _restartFlag = 1;
1901
1902         // flushing all buffers
1903         //TEST END
1904 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1905         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1906         for(int i=0;i<numGroups;i++){
1907         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1908                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1909                 obj->flushStates();
1910                 obj->ckJustMigrated();
1911         }*/
1912
1913     // requesting the latest checkpoint from its buddy
1914         msg.PE = CkMyPe();
1915         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1916         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1917 }
1918
1919
1920 /**
1921  * Gets the stored checkpoint but calls another function in the sender.
1922  */
1923 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1924
1925         // retrieving the stored checkpoint
1926         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1927
1928         // making sure it is its buddy who is requesting the checkpoint
1929         CkAssert(restartMsg->PE == storedChkpt->PE);
1930
1931         storedRequest = restartMsg;
1932         verifyAckTotal = 0;
1933
1934         for(int i=0;i<migratedNoticeList.size();i++){
1935                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1936 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1937                         if(migratedNoticeList[i].ackFrom == 0){
1938                                 //need to verify if the object actually exists .. it might not
1939                                 //have been acked but it might exist on it
1940                                 VerifyAckMsg msg;
1941                                 msg.migRecord = migratedNoticeList[i];
1942                                 msg.index = i;
1943                                 msg.fromPE = CmiMyPe();
1944                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1945                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1946                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1947                                 verifyAckTotal++;
1948                         }
1949                 }
1950         }
1951
1952         // sending the checkpoint back to its buddy     
1953         if(verifyAckTotal == 0){
1954                 sendCheckpointData(MLOG_RESTARTED);
1955         }
1956         verifyAckCount = 0;
1957 }
1958
1959 /**
1960  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1961  */
1962 void _recvRestartCheckpointHandler(char *_restartData){
1963         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1964         MigrationRecord *migratedAwayElements;
1965
1966         globalLBID = restartData->lbGroupID;
1967         
1968         restartData->restartWallTime *= 1000;
1969         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1970         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1971         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1972
1973         
1974         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1975         char *buf = &_restartData[sizeof(RestartProcessorData)];
1976         
1977         if(restartData->numMigratedAwayElements != 0){
1978                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1979                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1980                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1981                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1982         }
1983
1984         // turning on the team recovery flag
1985         forAllCharesDo(setTeamRecovery,NULL);
1986         
1987         PUP::fromMem pBuf(buf);
1988         pBuf | checkpointCount;
1989         CkPupROData(pBuf);
1990         CkPupGroupData(pBuf,CmiFalse);
1991         CkPupNodeGroupData(pBuf,CmiFalse);
1992         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1993         CkAssert(pBuf.size() == restartData->checkPointSize);
1994         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1995         
1996         // turning off the team recovery flag
1997         forAllCharesDo(unsetTeamRecovery,NULL);
1998
1999         // initializing a few variables for handling local messages
2000         forAllCharesDo(initializeRestart,NULL);
2001         
2002         //store the restored local message log in a vector
2003         buf = &buf[restartData->checkPointSize];        
2004         for(int i=0;i<restartData->numLocalMessages;i++){
2005                 LocalMessageLog logEntry;
2006                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2007                 
2008                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2009                 if(recverObj!=NULL){
2010                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2011                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2012                         char senderString[100];
2013                         char recverString[100];
2014                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2015                 }else{
2016 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2017                 }
2018                 buf = &buf[sizeof(LocalMessageLog)];
2019         }
2020
2021         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2022         CmiFree(_restartData);  
2023
2024         /*HERE _initDone();
2025
2026         getGlobalStep(globalLBID);
2027         
2028         countUpdateHomeAcks = 0;
2029         RestartRequest updateHomeRequest;
2030         updateHomeRequest.PE = CmiMyPe();
2031         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2032         for(int i=0;i<CmiNumPes();i++){
2033                 if(i != CmiMyPe()){
2034                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2035                 }
2036         }
2037 */
2038
2039
2040         // Send out the request to resend logged messages to all other processors
2041         CkVec<TProcessedLog> objectVec;
2042         forAllCharesDo(createObjIDList, (void *)&objectVec);
2043         int numberObjects = objectVec.size();
2044         
2045         /*
2046                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2047         */
2048         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2049         char *resendMsg = (char *)CmiAlloc(totalSize);  
2050
2051         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2052         resendReq->PE =CkMyPe(); 
2053         resendReq->numberObjects = numberObjects;
2054         char *objList = &resendMsg[sizeof(ResendRequest)];
2055         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2056         
2057
2058         /* test for parallel restart migrate away object**/
2059 //      if(parallelRestart){
2060 //              distributeRestartedObjects();
2061 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2062 //      }
2063         
2064         /*      To make restart work for load balancing.. should only
2065         be used when checkpoint happens along with load balancing
2066         **/
2067 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2068
2069         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2070         CpvAccess(_currentObj) = lb;
2071         lb->ReceiveDummyMigration(restartDecisionNumber);
2072
2073         sleep(10);
2074         
2075         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2076         for(int i=0;i<CkNumPes();i++){
2077                 if(i != CkMyPe()){
2078                         CmiSyncSend(i,totalSize,resendMsg);
2079                 }       
2080         }
2081         _resendMessagesHandler(resendMsg);
2082
2083 }
2084
2085
2086 void CkMlogRestartDouble(void *,double){
2087         CkMlogRestart(NULL,NULL);
2088 };
2089
2090 //TML: restarting from local (group) failure
2091 void CkMlogRestartLocal(){
2092     CkMlogRestart(NULL,NULL);
2093 };
2094
2095
2096 void readCheckpointFromDisk(int size,char *buf){
2097         char fName[100];
2098         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
2099
2100         int fd = open(fName,O_RDONLY);
2101         int count=0;
2102         while(count < size){
2103                 count += read(fd,&buf[count],size-count);
2104         }
2105         close(fd);
2106         
2107 };
2108
2109
2110
2111 /**
2112  * Gets the stored checkpoint for its buddy processor.
2113  */
2114 void _getCheckpointHandler(RestartRequest *restartMsg){
2115
2116         // retrieving the stored checkpoint
2117         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
2118
2119         // making sure it is its buddy who is requesting the checkpoint
2120         CkAssert(restartMsg->PE == storedChkpt->PE);
2121
2122         storedRequest = restartMsg;
2123         verifyAckTotal = 0;
2124
2125         for(int i=0;i<migratedNoticeList.size();i++){
2126                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
2127 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
2128                         if(migratedNoticeList[i].ackFrom == 0){
2129                                 //need to verify if the object actually exists .. it might not
2130                                 //have been acked but it might exist on it
2131                                 VerifyAckMsg msg;
2132                                 msg.migRecord = migratedNoticeList[i];
2133                                 msg.index = i;
2134                                 msg.fromPE = CmiMyPe();
2135                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
2136                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
2137                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
2138                                 verifyAckTotal++;
2139                         }
2140                 }
2141         }
2142
2143         // sending the checkpoint back to its buddy     
2144         if(verifyAckTotal == 0){
2145                 sendCheckpointData(MLOG_CRASHED);
2146         }
2147         verifyAckCount = 0;
2148 }
2149
2150
2151 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
2152         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
2153         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
2154         if(rec != NULL && rec->type() == CkLocRec::local){
2155                         //this location exists on this processor
2156                         //and needs to be removed       
2157                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
2158                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
2159                         
2160                         int localIdx = localRec->getLocalIndex();
2161                         LBDatabase *lbdb = localRec->getLBDB();
2162                         LDObjHandle ldHandle = localRec->getLdHandle();
2163                                 
2164                         locMgr->setDuringMigration(true);
2165                         
2166                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
2167                         lbdb->UnregisterObj(ldHandle);
2168                         
2169                         locMgr->setDuringMigration(false);
2170                         
2171                         verifyAckedRequests++;
2172
2173         }
2174         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
2175         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
2176 };
2177
2178
2179 void _verifyAckHandler(VerifyAckMsg *verifyReply){
2180         int index =     verifyReply->index;
2181         migratedNoticeList[index] = verifyReply->migRecord;
2182         verifyAckCount++;
2183         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
2184         if(verifyAckCount == verifyAckTotal){
2185                 sendCheckpointData(MLOG_CRASHED);
2186         }
2187 }
2188
2189
2190 /**
2191  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
2192  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
2193  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
2194  */
2195 void sendCheckpointData(int mode){      
2196         RestartRequest *restartMsg = storedRequest;
2197         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
2198         int numMigratedAwayElements = migratedNoticeList.size();
2199         if(migratedNoticeList.size() != 0){
2200                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
2201 //                      CkAssert(migratedNoticeList.size() == 0);
2202         }
2203         
2204         
2205         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2206         
2207         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2208         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2209         
2210         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
2211         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
2212         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2213         
2214         char *msg = (char *)CmiAlloc(totalSize);
2215         
2216         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2217         dataMsg->PE = CkMyPe();
2218         dataMsg->restartWallTime = CmiTimer();
2219         dataMsg->checkPointSize = storedChkpt->bufSize;
2220         
2221         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2222 //      dataMsg->numMigratedAwayElements = 0;
2223         
2224         dataMsg->numMigratedInElements = 0;
2225         dataMsg->migratedElementSize = 0;
2226         dataMsg->lbGroupID = globalLBID;
2227         /*msg layout 
2228                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2229                 Local MessageLog|
2230         */
2231         //store checkpoint data
2232         char *buf = &msg[sizeof(RestartProcessorData)];
2233
2234         if(dataMsg->numMigratedAwayElements != 0){
2235                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2236                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2237         }
2238         
2239
2240 #ifdef CHECKPOINT_DISK
2241         readCheckpointFromDisk(storedChkpt->bufSize,buf);
2242 #else   
2243         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2244 #endif
2245         buf = &buf[storedChkpt->bufSize];
2246
2247
2248         //store localmessage Log
2249         dataMsg->numLocalMessages = localMsgQ->length();
2250         for(int i=0;i<localMsgQ->length();i++){
2251                 if(!fault_aware(((*localMsgQ)[i]).recver )){
2252                         CmiAbort("Non fault aware localMsgQ");
2253                 }
2254                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
2255                 buf = &buf[sizeof(LocalMessageLog)];
2256         }
2257         
2258         if(mode == MLOG_RESTARTED){
2259                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2260                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2261                 CmiFree(restartMsg);
2262         }else{
2263                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2264                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2265                 CmiFree(restartMsg);
2266         }
2267 };
2268
2269
2270 // this list is used to create a vector of the object ids of all
2271 //the chares on this processor currently and the highest TN processed by them 
2272 //the first argument is actually a CkVec<TProcessedLog> *
2273 void createObjIDList(void *data,ChareMlogData *mlogData){
2274         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2275         TProcessedLog entry;
2276         entry.recver = mlogData->objID;
2277         entry.tProcessed = mlogData->tProcessed;
2278         list->push_back(entry);
2279         DEBUG_TEAM(char objString[100]);
2280         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2281 }
2282
2283
2284 /**
2285  * Receives the checkpoint data from its buddy, restores the state of all the objects
2286  * and asks everyone else to update its home.
2287  */
2288 void _recvCheckpointHandler(char *_restartData){
2289         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2290         MigrationRecord *migratedAwayElements;
2291
2292         globalLBID = restartData->lbGroupID;
2293         
2294         restartData->restartWallTime *= 1000;
2295         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2296         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2297         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2298
2299         
2300         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2301         char *buf = &_restartData[sizeof(RestartProcessorData)];
2302         
2303         if(restartData->numMigratedAwayElements != 0){
2304                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2305                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2306                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2307                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2308         }
2309         
2310         PUP::fromMem pBuf(buf);
2311
2312         pBuf | checkpointCount;
2313
2314         CkPupROData(pBuf);
2315         CkPupGroupData(pBuf,CmiTrue);
2316         CkPupNodeGroupData(pBuf,CmiTrue);
2317         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2318         CkAssert(pBuf.size() == restartData->checkPointSize);
2319         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2320         
2321         forAllCharesDo(initializeRestart,NULL);
2322         
2323         //store the restored local message log in a vector
2324         buf = &buf[restartData->checkPointSize];        
2325         for(int i=0;i<restartData->numLocalMessages;i++){
2326                 LocalMessageLog logEntry;
2327                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2328                 
2329                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2330                 if(recverObj!=NULL){
2331                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2332                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2333                         char senderString[100];
2334                         char recverString[100];
2335                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2336                 }else{
2337 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2338                 }
2339                 buf = &buf[sizeof(LocalMessageLog)];
2340         }
2341
2342         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2343
2344         CmiFree(_restartData);
2345         
2346         
2347         _initDone();
2348
2349         getGlobalStep(globalLBID);
2350         
2351         countUpdateHomeAcks = 0;
2352         RestartRequest updateHomeRequest;
2353         updateHomeRequest.PE = CmiMyPe();
2354         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2355         for(int i=0;i<CmiNumPes();i++){
2356                 if(i != CmiMyPe()){
2357                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2358                 }
2359         }
2360
2361 }
2362
2363 /**
2364  * Receives the updateHome ACKs from all other processors. Once everybody
2365  * has replied, it sends a request to resend the logged messages.
2366  */
2367 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2368         countUpdateHomeAcks++;
2369         CmiFree(updateHomeAck);
2370         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2371         if(countUpdateHomeAcks != CmiNumPes()){
2372                 return;
2373         }
2374
2375         // Send out the request to resend logged messages to all other processors
2376         CkVec<TProcessedLog> objectVec;
2377         forAllCharesDo(createObjIDList, (void *)&objectVec);
2378         int numberObjects = objectVec.size();
2379         
2380         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2381         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2382         char *resendMsg = (char *)CmiAlloc(totalSize);  
2383
2384         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2385         resendReq->PE =CkMyPe(); 
2386         resendReq->numberObjects = numberObjects;
2387         char *objList = &resendMsg[sizeof(ResendRequest)];
2388         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2389
2390         /* test for parallel restart migrate away object**/
2391         if(parallelRestart){
2392                 distributeRestartedObjects();
2393                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2394         }
2395         
2396         /*      To make restart work for load balancing.. should only
2397         be used when checkpoint happens along with load balancing
2398         **/
2399 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2400
2401         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2402         CpvAccess(_currentObj) = lb;
2403         lb->ReceiveDummyMigration(restartDecisionNumber);
2404
2405         sleep(10);
2406         
2407         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2408         for(int i=0;i<CkNumPes();i++){
2409                 if(i != CkMyPe()){
2410                         CmiSyncSend(i,totalSize,resendMsg);
2411                 }       
2412         }
2413         _resendMessagesHandler(resendMsg);
2414         CmiFree(resendMsg);
2415 };
2416
2417 /**
2418  * @brief Initializes variables and flags for restarting procedure.
2419  */
2420 void initializeRestart(void *data, ChareMlogData *mlogData){
2421         mlogData->resendReplyRecvd = 0;
2422         mlogData->receivedTNs = new CkVec<MCount>;
2423         mlogData->restartFlag = 1;
2424         mlogData->restoredLocalMsgLog.removeAll();
2425         mlogData->mapTable.empty();
2426 };
2427
2428 /**
2429  * Updates the homePe of chare array elements.
2430  */
2431 void updateHomePE(void *data,ChareMlogData *mlogData){
2432         RestartRequest *updateRequest = (RestartRequest *)data;
2433         int PE = updateRequest->PE; //restarted PE
2434         //if this object is an array Element and its home is the restarted processor
2435         // the home processor needs to know its current location
2436         if(mlogData->objID.type == TypeArray){
2437                 //it is an array element
2438                 CkGroupID myGID = mlogData->objID.data.array.id;
2439                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2440                 CkArrayID aid(mlogData->objID.data.array.id);           
2441                 //check if the restarted processor is the home processor for this object
2442                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2443                 if(locMgr->homePe(myIdx) == PE){
2444                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2445                         DEBUGRESTART(myIdx.print());
2446                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2447                 }
2448         }
2449 };
2450
2451
2452 /**
2453  * Updates the homePe for all chares in this processor.
2454  */
2455 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2456         int sender = updateRequest->PE;
2457         
2458         forAllCharesDo(updateHomePE,updateRequest);
2459         
2460         updateRequest->PE = CmiMyPe();
2461         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2462         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2463         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2464                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2465                 checkpointCount--;
2466                 startMlogCheckpoint(NULL,0);
2467         }
2468         if(sender == getCheckPointPE()){
2469                 for(int i=0;i<retainedObjectList.size();i++){
2470                         if(retainedObjectList[i]->acked == 0){
2471                                 MigrationNotice migMsg;
2472                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2473                                 migMsg.record = retainedObjectList[i];
2474                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2475                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2476                         }
2477                 }
2478         }
2479 }
2480
2481 /**
2482  * @brief Fills up the ticket vector for each chare.
2483  */
2484 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2485         ResendData *resendData = (ResendData *)data;
2486         int PE = resendData->PE; //restarted PE
2487         int count=0;
2488         CkHashtableIterator *iterator;
2489         void *objp;
2490         void *objkey;
2491         CkObjID *objID;
2492         SNToTicket *snToTicket;
2493         Ticket ticket;
2494         
2495         // traversing the team table looking up for the maximum TN received     
2496         iterator = mlogData->teamTable.iterator();
2497         while( (objp = iterator->next(&objkey)) != NULL ){
2498                 objID = (CkObjID *)objkey;
2499         
2500                 // traversing the resendData structure to add ticket numbers
2501                 for(int j=0;j<resendData->numberObjects;j++){
2502                         if((*objID) == (resendData->listObjects)[j].recver){
2503 char name[100];
2504                                 snToTicket = *(SNToTicket **)objp;
2505 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2506                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2507                                         ticket = snToTicket->get(snIndex);      
2508                                         if(ticket.TN > resendData->maxTickets[j]){
2509                                                 resendData->maxTickets[j] = ticket.TN;
2510                                         }
2511                                         if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2512                                                 //store the TNs that have been since the recver last checkpointed
2513                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2514                                         }
2515                                 }
2516                         }
2517                 }
2518         }
2519
2520         //releasing the memory for the iterator
2521         delete iterator;
2522 }
2523
2524
2525 /**
2526  * @brief Turns on the flag for team recovery that selectively restores
2527  * particular metadata information.
2528  */
2529 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2530         char name[100];
2531         mlogData->teamRecoveryFlag = 1; 
2532 }
2533
2534 /**
2535  * @brief Turns off the flag for team recovery.
2536  */
2537 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2538         mlogData->teamRecoveryFlag = 0;
2539 }
2540
2541 //the data argument is of type ResendData which contains the 
2542 //array of objects on  the restartedProcessor
2543 //this method resends the messages stored in this chare's message log 
2544 //to the restarted processor. It also accumulates the maximum TN
2545 //for all the objects on the restarted processor
2546 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2547         char nameString[100];
2548         ResendData *resendData = (ResendData *)data;
2549         int PE = resendData->PE; //restarted PE
2550         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2551         int count=0;
2552         int ticketRequests=0;
2553         CkQ<MlogEntry *> *log = mlogData->getMlog();
2554         
2555         for(int i=0;i<log->length();i++){
2556                 MlogEntry *logEntry = (*log)[i];
2557                 
2558                 // if we sent out the logs of a local message to buddy and he crashed
2559                 //before acking
2560                 envelope *env = logEntry->env;
2561                 if(env == NULL){
2562                         continue;
2563                 }
2564                 if(logEntry->unackedLocal){
2565                         char recverString[100];
2566                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2567                         sendLocalMessageCopy(logEntry);
2568                 }
2569                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2570                 if(env->TN <= 0){
2571                         //ticket not yet replied send it out again
2572                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,0,1);
2573                 }
2574                 
2575                 if(env->recver.type != TypeInvalid){
2576                         int flag = 0;//marks if any of the restarted objects matched this log entry
2577                         for(int j=0;j<resendData->numberObjects;j++){
2578                                 if(env->recver == (resendData->listObjects)[j].recver){
2579                                         flag = 1;
2580                                         //message has a valid TN
2581                                         if(env->TN > 0){
2582                                                 //store maxTicket
2583                                                 if(env->TN > resendData->maxTickets[j]){
2584                                                         resendData->maxTickets[j] = env->TN;
2585                                                 }
2586                                                 //if the TN for this entry is more than the TN processed, send the message out
2587                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2588                                                         //store the TNs that have been since the recver last checkpointed
2589                                                         resendData->ticketVecs[j].push_back(env->TN);
2590                                                         
2591                                                         if(PE != CkMyPe()){
2592                                                                 if(env->recver.type == TypeNodeGroup){
2593                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2594                                                                 }else{
2595                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2596                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2597                                                                 }
2598                                                         }else{
2599                                                                 envelope *copyEnv = copyEnvelope(env);
2600                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2601                                                         }
2602                                                         char senderString[100];
2603                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2604                                                         count++;
2605                                                 }       
2606                                         }else{
2607 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2608                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2609                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2610                                                 CkAssert(logEntry->destPE != CkMyPe());
2611                                                 
2612                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2613                                                 
2614                                                 ticketRequests++;*/
2615                                         }
2616                                 }
2617                         }//end of for loop of objects
2618                         
2619                 }       
2620         }
2621         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2622 }
2623
2624 /**
2625  * Resends the messages since the last checkpoint to the list of objects included in the 
2626  * request.
2627  */
2628 void _resendMessagesHandler(char *msg){
2629         ResendData d;
2630         ResendRequest *resendReq = (ResendRequest *)msg;
2631
2632         // building the reply message
2633         char *listObjects = &msg[sizeof(ResendRequest)];
2634         d.numberObjects = resendReq->numberObjects;
2635         d.PE = resendReq->PE;
2636         d.listObjects = (TProcessedLog *)listObjects;
2637         d.maxTickets = new MCount[d.numberObjects];
2638         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2639         for(int i=0;i<d.numberObjects;i++){
2640                 d.maxTickets[i] = 0;
2641         }
2642
2643         //Check if any of the retained objects need to be recreated
2644         //If they have not been recreated on the restarted processor
2645         //they need to be recreated on this processor
2646         int count=0;
2647         for(int i=0;i<retainedObjectList.size();i++){
2648                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2649                         count++;
2650                         int recreate=1;
2651                         for(int j=0;j<d.numberObjects;j++){
2652                                 if(d.listObjects[j].recver.type != TypeArray ){
2653                                         continue;
2654                                 }
2655                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2656                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2657                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2658                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2659                                                 recreate = 0;
2660                                                 break;
2661                                         }
2662                                 }
2663                         }
2664                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2665                         if(recreate){
2666                                 donotCountMigration=1;
2667                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2668                                 donotCountMigration=0;
2669                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2670                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2671                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2672                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2673                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2674                                 CmiAssert(rec->type() == CkLocRec::local);
2675                                 CkVec<CkMigratable *> eltList;
2676                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2677                                 for(int j=0;j<eltList.size();j++){
2678                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2679                                                 CpvAccess(_currentObj) = eltList[j];
2680                                                 eltList[j]->ResumeFromSync();
2681                                         }
2682                                 }
2683                                 retainedObjectList[i]->msg=NULL;        
2684                         }
2685                 }
2686         }
2687         
2688         if(count > 0){
2689 //              CmiAbort("retainedObjectList for restarted processor not empty");
2690         }
2691         
2692         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2693
2694
2695         //TML: examines the origin processor to determine if it belongs to the same group.
2696         // In that case, it only returns the maximum ticket received for each object in the list.
2697         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2698                 forAllCharesDo(fillTicketForChare,&d);
2699         else
2700                 forAllCharesDo(resendMessageForChare,&d);
2701
2702         //send back the maximum ticket number for a message sent to each object on the 
2703         //restarted processor
2704         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2705         
2706         int totalTNStored=0;
2707         for(int i=0;i<d.numberObjects;i++){
2708                 totalTNStored += d.ticketVecs[i].size();
2709         }
2710         
2711         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2712         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2713         
2714         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2715         resendReply->PE = CkMyPe();
2716         resendReply->numberObjects = d.numberObjects;
2717         
2718         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2719         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2720         for(int i=0;i<d.numberObjects;i++){
2721                 replyObjects[i] = d.listObjects[i].recver;
2722         }
2723         
2724         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2725         for(int i=0;i<d.numberObjects;i++){
2726                 int vecsize = d.ticketVecs[i].size();
2727                 memcpy(ticketList,&vecsize,sizeof(int));
2728                 ticketList = &ticketList[sizeof(int)];
2729                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2730                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2731         }       
2732
2733         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2734         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2735         
2736 /*      
2737         if(verifyAckRequestsUnacked){
2738                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2739                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2740                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2741                         LDObjHandle h;
2742                         lb->Migrated(h,1);
2743                 }
2744         }
2745         
2746         verifyAckRequestsUnacked=0;*/
2747         
2748         delete [] d.maxTickets;
2749         delete [] d.ticketVecs;
2750         if(resendReq->PE != CkMyPe()){
2751                 CmiFree(msg);
2752         }       
2753 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2754         lastRestart = CmiWallTimer();
2755 }
2756
2757 void sortVec(CkVec<MCount> *TNvec);
2758 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2759
2760 /**
2761  * @brief Receives the tickets assigned to message to other objects.
2762  */
2763 void _resendReplyHandler(char *msg){    
2764         /**
2765                 need to rewrite this method to deal with parallel restart
2766         */
2767         ResendRequest *resendReply = (ResendRequest *)msg;
2768         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2769
2770         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2771         
2772 //      DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2773         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2774         for(int i =0; i< resendReply->numberObjects;i++){       
2775                 Chare *obj = (Chare *)listObjects[i].getObject();
2776                 
2777                 int vecsize;
2778                 memcpy(&vecsize,listTickets,sizeof(int));
2779                 listTickets = &listTickets[sizeof(int)];
2780                 MCount *listTNs = (MCount *)listTickets;        
2781                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2782                 
2783                 if(obj != NULL){
2784                         //the object was restarted on the processor on which it existed
2785                         processReceivedTN(obj,vecsize,listTNs);
2786                 }else{
2787                 //pack up objID vecsize and listTNs and send it to the correct processor
2788                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2789                         char *TNMsg = (char *)CmiAlloc(totalSize);
2790                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2791                         receivedTNData->recver = listObjects[i];
2792                         receivedTNData->numTNs = vecsize;
2793                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2794                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2795
2796                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2797                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2798                 }       
2799         }
2800 };
2801
2802 void _receivedTNDataHandler(ReceivedTNData *msg){
2803         char objName[100];
2804         Chare *obj = (Chare *) msg->recver.getObject();
2805         if(obj){                
2806                 char *_msg = (char *)msg;
2807                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2808                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2809                 processReceivedTN(obj,msg->numTNs,listTNs);
2810         }else{
2811                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2812                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2813         }
2814 };
2815
2816 /**
2817  * @brief Processes the received list of tickets from a particular PE.
2818  */
2819 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2820         // increases the number of resendReply received
2821         obj->mlogData->resendReplyRecvd++;
2822
2823         DEBUG(char objName[100]);
2824         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2825         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2826         //if(obj->mlogData->receivedTNs == NULL)
2827         //      CkPrintf("NULL\n");     
2828         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2829
2830         // includes the tickets into the receivedTN structure
2831         for(int j=0;j<listSize;j++){
2832                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2833         }
2834         
2835         //if this object has received all the replies find the ticket numbers
2836         //that senders know about. Those less than the ticket number processed 
2837         //by the receiver can be thrown away. The rest need not be consecutive
2838         // ie there can be holes in the list of ticket numbers seen by senders
2839         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2840                 obj->mlogData->resendReplyRecvd = 0;
2841                 //sort the received TNS
2842                 sortVec(obj->mlogData->receivedTNs);
2843         
2844                 //after all the received tickets are in we need to sort them and then 
2845                 // calculate the holes  
2846                 if(obj->mlogData->receivedTNs->size() > 0){
2847                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2848                         int vecsize = obj->mlogData->receivedTNs->size();
2849                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2850                         
2851                         // updating tCount with the highest ticket handed out
2852                         if(teamSize > 1){
2853                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2854                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2855                         }else{
2856                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2857                         }
2858                         
2859                         if(numberHoles == 0){
2860                         }else{
2861                                 char objName[100];                                      
2862                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2863                                 obj->mlogData->numberHoles = numberHoles;
2864                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2865                                 int countHoles=0;
2866                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2867                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2868                                                 //the TNs are not consecutive at this point
2869                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2870                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2871                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2872                                                         countHoles++;
2873                                                 }       
2874                                         }
2875                                 }
2876                                 //Holes have been given new TN
2877                                 if(countHoles != numberHoles){
2878                                         char str[100];
2879                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2880                                 }
2881                                 CkAssert(countHoles == numberHoles);                                    
2882                                 obj->mlogData->currentHoles = numberHoles;
2883                         }
2884                 }
2885         
2886                 // cleaning up structures and getting ready to continue execution       
2887                 delete obj->mlogData->receivedTNs;
2888                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2889                 obj->mlogData->receivedTNs = NULL;
2890                 obj->mlogData->restartFlag = 0;
2891
2892                 DEBUGRESTART(char objString[100]);
2893                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2894         }
2895
2896 }
2897
2898
2899 void sortVec(CkVec<MCount> *TNvec){
2900         //sort it ->its bloddy bubble sort
2901         //TODO: use quicksort
2902         for(int i=0;i<TNvec->size();i++){
2903                 for(int j=i+1;j<TNvec->size();j++){
2904                         if((*TNvec)[j] < (*TNvec)[i]){
2905                                 MCount temp;
2906                                 temp = (*TNvec)[i];
2907                                 (*TNvec)[i] = (*TNvec)[j];
2908                                 (*TNvec)[j] = temp;
2909                         }
2910                 }
2911         }
2912         //make it unique .. since its sorted all equal units will be consecutive
2913         MCount *tempArray = new MCount[TNvec->size()];
2914         int     uniqueCount=-1;
2915         for(int i=0;i<TNvec->size();i++){
2916                 tempArray[i] = 0;
2917                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2918                         uniqueCount++;
2919                         tempArray[uniqueCount] = (*TNvec)[i];
2920                 }
2921         }
2922         uniqueCount++;
2923         TNvec->removeAll();
2924         for(int i=0;i<uniqueCount;i++){
2925                 TNvec->push_back(tempArray[i]);
2926         }
2927         delete [] tempArray;
2928 }       
2929
2930 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2931         if(TNVec->size() == 0){
2932                 return -1; //not found in an empty vec
2933         }
2934         //binary search to find 
2935         int left=0;
2936         int right = TNVec->size();
2937         int mid = (left +right)/2;
2938         while(searchTN != (*TNVec)[mid] && left < right){
2939                 if((*TNVec)[mid] > searchTN){
2940                         right = mid-1;
2941                 }else{
2942                         left = mid+1;
2943                 }
2944                 mid = (left + right)/2;
2945         }
2946         if(left < right){
2947                 //mid is the element to be returned
2948                 return mid;
2949         }else{
2950                 if(mid < TNVec->size() && mid >=0){
2951                         if((*TNVec)[mid] == searchTN){
2952                                 return mid;
2953                         }else{
2954                                 return -1;
2955                         }
2956                 }else{
2957                         return -1;
2958                 }
2959         }
2960 };
2961
2962
2963 /*
2964         Method to do parallel restart. Distribute some of the array elements to other processors.
2965         The problem is that we cant use to charm entry methods to do migration as it will get
2966         stuck in the protocol that is going to restart
2967 */
2968
2969 class ElementDistributor: public CkLocIterator{
2970         CkLocMgr *locMgr;
2971         int *targetPE;
2972         void pupLocation(CkLocation &loc,PUP::er &p){
2973                 CkArrayIndexMax idx=loc.getIndex();
2974                 CkGroupID gID = locMgr->ckGetGroupID();
2975                 p|gID;      // store loc mgr's GID as well for easier restore
2976                 p|idx;
2977                 p|loc;
2978         };
2979         public:
2980                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2981                 void addLocation(CkLocation &loc){
2982                         if(*targetPE == CkMyPe()){
2983                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2984                                 return;
2985                         }
2986                         
2987                         CkArrayIndexMax idx=loc.getIndex();
2988                         CkLocRec_local *rec = loc.getLocalRecord();
2989                         
2990                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2991                         idx.print();
2992                         
2993
2994                         //TODO: an element that is being moved should leave some trace behind so that
2995                         // the arraybroadcaster can forward messages to it
2996                         
2997                         //pack up this location and send it across
2998                         PUP::sizer psizer;
2999                         pupLocation(loc,psizer);
3000                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
3001                         char *msg = (char *)CmiAlloc(totalSize);
3002                         char *buf = &msg[CmiMsgHeaderSizeBytes];
3003                         PUP::toMem pmem(buf);
3004                         pmem.becomeDeleting();
3005                         pupLocation(loc,pmem);
3006                         
3007                         locMgr->setDuringMigration(CmiTrue);                    
3008                         delete rec;
3009                         locMgr->setDuringMigration(CmiFalse);                   
3010                         locMgr->inform(idx,*targetPE);
3011
3012                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
3013                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
3014
3015                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
3016                         //decide on the target processor for the next object
3017                         *targetPE = (*targetPE +1)%CkNumPes();
3018                 }
3019                 
3020 };
3021
3022 void distributeRestartedObjects(){
3023         int numGroups = CkpvAccess(_groupIDTable)->size();      
3024         int i;
3025         int targetPE=CkMyPe();
3026         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
3027 };
3028
3029 void _distributedLocationHandler(char *receivedMsg){
3030         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
3031         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
3032         PUP::fromMem pmem(buf);
3033         CkGroupID gID;
3034         CkArrayIndexMax idx;
3035         pmem |gID;
3036         pmem |idx;
3037         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3038         donotCountMigration=1;
3039         mgr->resume(idx,pmem,CmiTrue);
3040         donotCountMigration=0;
3041         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3042         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
3043         idx.print();
3044
3045         CkLocRec *rec = mgr->elementRec(idx);
3046         CmiAssert(rec->type() == CkLocRec::local);
3047         
3048         CkVec<CkMigratable *> eltList;
3049         mgr->migratableList((CkLocRec_local *)rec,eltList);
3050         for(int i=0;i<eltList.size();i++){
3051                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
3052                         CpvAccess(_currentObj) = eltList[i];
3053                         eltList[i]->ResumeFromSync();
3054                 }
3055         }
3056         
3057         
3058 }
3059
3060
3061 /** this method is used to send messages to a restarted processor to tell
3062  * it that a particular expected object is not going to get to it */
3063 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
3064         DummyMigrationMsg buf;
3065         buf.flag = MLOG_OBJECT;
3066         buf.lbID = lbID;
3067         buf.mgrID = locMgrID;
3068         buf.idx = idx;
3069         buf.locationPE = locationPE;
3070         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3071         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3072 };
3073
3074
3075 /**this method is used by a restarted processor to tell other processors
3076  * that they are not going to receive these many objects.. just the count
3077  * not the objects themselves ***/
3078
3079 void sendDummyMigrationCounts(int *dummyCounts){
3080         DummyMigrationMsg buf;
3081         buf.flag = MLOG_COUNT;
3082         buf.lbID = globalLBID;
3083         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3084         for(int i=0;i<CmiNumPes();i++){
3085                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3086                         buf.count = dummyCounts[i];
3087                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3088                 }
3089         }
3090 }
3091
3092
3093 /** this handler is used to process a dummy migration msg.
3094  * it looks up the load balancer and calls migrated for it */
3095
3096 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3097         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3098         if(msg->flag == MLOG_OBJECT){
3099                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3100                 LDObjHandle h;
3101                 lb->Migrated(h,1);
3102         }
3103         if(msg->flag == MLOG_COUNT){
3104                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3105                 msg->count -= verifyAckedRequests;
3106                 for(int i=0;i<msg->count;i++){
3107                         LDObjHandle h;
3108                         lb->Migrated(h,1);
3109                 }
3110         }
3111         verifyAckedRequests=0;
3112         CmiFree(msg);
3113 };
3114
3115 /*****************************************************
3116         Implementation of a method that can be used to call
3117         any method on the ChareMlogData of all the chares on
3118         a processor currently
3119 ******************************************************/
3120
3121
3122 class ElementCaller :  public CkLocIterator {
3123 private:
3124         CkLocMgr *locMgr;
3125         MlogFn fnPointer;
3126         void *data;
3127 public:
3128         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3129                 locMgr = _locMgr;
3130                 fnPointer = _fnPointer;
3131                 data = _data;
3132         };
3133         void addLocation(CkLocation &loc){
3134                 CkVec<CkMigratable *> list;
3135                 CkLocRec_local *local = loc.getLocalRecord();
3136                 locMgr->migratableList (local,list);
3137                 for(int i=0;i<list.size();i++){
3138                         CkMigratable *migratableElement = list[i];
3139                         fnPointer(data,migratableElement->mlogData);
3140                 }
3141         }
3142 };
3143
3144 /**
3145  * Map function pointed by fnPointer over all the chares living in this processor.
3146  */
3147 void forAllCharesDo(MlogFn fnPointer,void *data){
3148         int numGroups = CkpvAccess(_groupIDTable)->size();
3149         for(int i=0;i<numGroups;i++){
3150                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3151                 fnPointer(data,obj->mlogData);
3152         }
3153         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3154         for(int i=0;i<numNodeGroups;i++){
3155                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3156                 fnPointer(data,obj->mlogData);
3157         }
3158         int i;
3159         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3160 };
3161
3162
3163 /******************************************************************
3164  Load Balancing
3165 ******************************************************************/
3166
3167 void initMlogLBStep(CkGroupID gid){
3168         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3169         countLBMigratedAway = 0;
3170         countLBToMigrate=0;
3171         onGoingLoadBalancing=1;
3172         migrationDoneCalled=0;
3173         checkpointBarrierCount=0;
3174         if(globalLBID.idx != 0){
3175                 CmiAssert(globalLBID.idx == gid.idx);
3176         }
3177         globalLBID = gid;
3178 }
3179
3180 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3181         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3182         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3183         
3184         resumeLbFnPtr = _fnPtr;
3185         centralLb = _centralLb;
3186         migrationDoneCalled = 1;
3187         if(countLBToMigrate == countLBMigratedAway){
3188                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3189                 startMlogCheckpoint(NULL,CmiWallTimer());       
3190         }
3191 };
3192
3193 void finishedCheckpointLoadBalancing(){
3194         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3195         CheckpointBarrierMsg msg;
3196         msg.fromPE = CmiMyPe();
3197         msg.checkpointCount = checkpointCount;
3198
3199         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3200         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3201         
3202 };
3203
3204
3205 void sendMlogLocation(int targetPE,envelope *env){
3206         void *_msg = EnvToUsr(env);
3207         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3208
3209
3210         int existing = 0;
3211         //if this object is already in the retainedobjectlust destined for this
3212         //processor it should not be sent
3213         
3214         for(int i=0;i<retainedObjectList.size();i++){
3215                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3216                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3217                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3218                         existing = 1;
3219                         break;
3220                 }
3221         }
3222
3223         if(existing){
3224                 return;
3225         }
3226         
3227         
3228         countLBToMigrate++;
3229         
3230         MigrationNotice migMsg;
3231         migMsg.migRecord.gID = msg->gid;
3232         migMsg.migRecord.idx = msg->idx;
3233         migMsg.migRecord.fromPE = CkMyPe();
3234         migMsg.migRecord.toPE =  targetPE;
3235         
3236         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3237         
3238         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3239         retainedObject->migRecord = migMsg.migRecord;
3240         retainedObject->acked  = 0;
3241         
3242         CkPackMessage(&env);
3243         
3244         migMsg.record = retainedObject;
3245         retainedObject->msg = env;
3246         int size = retainedObject->size = env->getTotalsize();
3247         
3248         retainedObjectList.push_back(retainedObject);
3249         
3250         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3251         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3252         
3253         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3254
3255 }
3256
3257 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3258         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3259         migratedNoticeList.push_back(msg->migRecord);
3260
3261         MigrationNoticeAck buf;
3262         buf.record = msg->record;
3263         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3264         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3265 }
3266
3267 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3268         
3269         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3270         retainedObject->acked = 1;
3271
3272         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3273         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3274
3275         //inform home about the new location of this object
3276         CkGroupID gID = retainedObject->migRecord.gID ;
3277         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3278         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3279         
3280         countLBMigratedAway++;
3281         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3282                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3283                 startMlogCheckpoint(NULL,CmiWallTimer());
3284         }
3285 };
3286
3287 void _receiveMlogLocationHandler(void *buf){
3288         envelope *env = (envelope *)buf;
3289         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3290         CkUnpackMessage(&env);
3291         void *_msg = EnvToUsr(env);
3292         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3293         CkGroupID gID= msg->gid;
3294         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3295         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3296         CpvAccess(_currentObj)=mgr;
3297         mgr->immigrate(msg);
3298 };
3299
3300
3301 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3302 /*      if(mlogData->objID.type == TypeArray){
3303                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3304         //      TODO: make sure later that atSync has been called and it needs 
3305         //      to be resumed from sync
3306         //
3307                 CpvAccess(_currentObj) = elt;
3308                 elt->ResumeFromSync();
3309         }*/
3310 }
3311
3312 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3313         if(checkpointBarrierCount == CmiNumPes()){
3314                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3315                 for(int i=0;i<CmiNumPes();i++){
3316                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3317                 }
3318         }
3319 }
3320
3321 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3322         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3323         if(msg->checkpointCount == checkpointCount){
3324                 checkpointBarrierCount++;
3325                 checkAndSendCheckpointBarrierAcks(msg);
3326         }else{
3327                 if(msg->checkpointCount-1 == checkpointCount){
3328                         checkpointBarrierCount++;
3329                         checkAndSendCheckpointBarrierAcks(msg);
3330                 }else{
3331                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3332                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3333                 }
3334         }
3335         CmiFree(msg);
3336 }
3337
3338 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3339         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3340         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3341         sendRemoveLogRequests();
3342         (*resumeLbFnPtr)(centralLb);
3343         CmiFree(msg);
3344 }
3345
3346 /**
3347         method that informs an array elements home processor of its current location
3348         It is a converse method to bypass the charm++ message logging framework
3349 */
3350
3351 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
3352         double _startTime = CmiWallTimer();
3353         CurrentLocationMsg msg;
3354         msg.mgrID = locMgrID;
3355         msg.idx = idx;
3356         msg.locationPE = currentPE;
3357         msg.fromPE = CkMyPe();
3358
3359         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3360         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3361         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3362         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3363 }
3364
3365
3366 void _receiveLocationHandler(CurrentLocationMsg *data){
3367         double _startTime = CmiWallTimer();
3368         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3369         if(mgr == NULL){
3370                 CmiFree(data);
3371                 return;
3372         }
3373         CkLocRec *rec = mgr->elementNrec(data->idx);
3374         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3375         if(rec != NULL){
3376                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3377                         if(data->fromPE == data->locationPE){
3378                                 CmiAbort("Another processor has the same object");
3379                         }
3380                 }
3381         }
3382         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3383                 int targetPE = data->fromPE;
3384                 data->fromPE = CmiMyPe();
3385                 data->locationPE = CmiMyPe();
3386                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3387                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3388         }else{
3389                 mgr->inform(data->idx,data->locationPE);
3390         }
3391         CmiFree(data);
3392         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3393 }
3394
3395
3396
3397 void getGlobalStep(CkGroupID gID){
3398         LBStepMsg msg;
3399         int destPE = 0;
3400         msg.lbID = gID;
3401         msg.fromPE = CmiMyPe();
3402         msg.step = -1;
3403         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3404         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3405 };
3406
3407 void _getGlobalStepHandler(LBStepMsg *msg){
3408         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3409         msg->step = lb->step();
3410         CmiAssert(msg->fromPE != CmiMyPe());
3411         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3412         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3413         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3414 };
3415
3416 void _recvGlobalStepHandler(LBStepMsg *msg){
3417         
3418         restartDecisionNumber=msg->step;
3419         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3420         _updateHomeAckHandler(dummyAck);
3421 };
3422
3423 /**
3424  * @brief Function to wrap up performance information.
3425  */
3426 void _messageLoggingExit(){
3427 /*      if(CkMyPe() == 0){
3428                 if(countBuffered != 0){
3429                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3430                 }
3431
3432 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3433         }
3434         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3435         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3436
3437         //TML: printing some statistics for group approach
3438         //if(teamSize > 1)
3439                 CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
3440
3441 }
3442
3443 /**
3444         The method for returning the actual object pointed to by an id
3445         If the object doesnot exist on the processor it returns NULL
3446 **/
3447
3448 void* CkObjID::getObject(){
3449         
3450                 switch(type){
3451                         case TypeChare: 
3452                                 return CkLocalChare(&data.chare.id);
3453                         case TypeMainChare:
3454                                 return CkLocalChare(&data.chare.id);
3455                         case TypeGroup:
3456         
3457                                 CkAssert(data.group.onPE == CkMyPe());
3458                                 return CkLocalBranch(data.group.id);
3459                         case TypeNodeGroup:
3460                                 CkAssert(data.group.onPE == CkMyNode());
3461                                 //CkLocalNodeBranch(data.group.id);
3462                                 {
3463                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3464                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3465                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3466         
3467                                         return retval;
3468                                 }       
3469                         case TypeArray:
3470                                 {
3471         
3472         
3473                                         CkArrayID aid(data.array.id);
3474         
3475                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3476         
3477                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
3478         
3479                                         return aProxy.ckLocal();
3480                                 }
3481                         default:
3482                                 CkAssert(0);
3483                 }
3484 }
3485
3486
3487 int CkObjID::guessPE(){
3488                 switch(type){
3489                         case TypeChare:
3490                         case TypeMainChare:
3491                                 return data.chare.id.onPE;
3492                         case TypeGroup:
3493                         case TypeNodeGroup:
3494                                 return data.group.onPE;
3495                         case TypeArray:
3496                                 {
3497                                         CkArrayID aid(data.array.id);
3498                                         if(aid.ckLocalBranch() == NULL){
3499                                                 return -1;
3500                                         }
3501                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
3502                                 }
3503                         default:
3504                                 CkAssert(0);
3505                 }
3506 };
3507
3508 char *CkObjID::toString(char *buf) const {
3509         
3510         switch(type){
3511                 case TypeChare:
3512                         sprintf(buf