Changes due to the new type for ChareType struct.
[charm.git] / src / ck-core / ckmessagelogging.C
1 /**
2  * Message Logging Fault Tolerance Protocol
3  * It includes the main functions for the basic and team-based schemes.
4  */
5
6 #include "charm.h"
7 #include "ck.h"
8 #include "ckmessagelogging.h"
9 #include "queueing.h"
10 #include <sys/types.h>
11 #include <signal.h>
12 #include "CentralLB.h"
13
14 #ifdef _FAULT_MLOG_
15
16 //#define DEBUG(x)  if(_restartFlag) {x;}
17 #define DEBUG(x)  //x
18 #define DEBUGRESTART(x)  //x
19 #define DEBUGLB(x) // x
20 #define DEBUG_TEAM(x)  //x
21
22 #define BUFFERED_LOCAL
23 #define BUFFERED_REMOTE 
24
25 extern const char *idx2str(const CkArrayIndex &ind);
26 extern const char *idx2str(const ArrayElement *el);
27 const char *idx2str(const CkArrayIndexMax &ind){
28         return idx2str((const CkArrayIndex &)ind);
29 };
30
31 void getGlobalStep(CkGroupID gID);
32
33 bool fault_aware(CkObjID &recver);
34 void sendCheckpointData(int mode);
35 void createObjIDList(void *data,ChareMlogData *mlogData);
36 inline bool isLocal(int destPE);
37 inline bool isTeamLocal(int destPE);
38
39 int _restartFlag=0;
40 //ERASE int restarted=0; // it's not being used anywhere
41
42 //TML: variables for measuring savings with groups in message logging
43 float MLOGFT_totalLogSize = 0.0;
44 float MLOGFT_totalMessages = 0.0;
45 float MLOGFT_totalObjects = 0.0;
46
47 //TODO: remove for perf runs
48 int countHashRefs=0; //count the number of gets
49 int countHashCollisions=0;
50
51 //#define CHECKPOINT_DISK
52 char *checkpointDirectory=".";
53 int unAckedCheckpoint=0;
54
55 int countLocal=0,countBuffered=0;
56 int countPiggy=0;
57 int countClearBufferedLocalCalls=0;
58
59 int countUpdateHomeAcks=0;
60
61 extern int chkptPeriod;
62 extern bool parallelRestart;
63
64 char *killFile;
65 int killFlag=0;
66 int restartingMlogFlag=0;
67 void readKillFile();
68 double killTime=0.0;
69 int checkpointCount=0;
70
71
72 CpvDeclare(Chare *,_currentObj);
73 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
74 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
75 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
76 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
77 CpvDeclare(Queue, _outOfOrderMessageQueue);
78 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
79 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
80 CpvDeclare(char **,_bufferedTicketRequests);
81 CpvDeclare(int *,_numBufferedTicketRequests);
82 CpvDeclare(char *,_bufferTicketReply);
83
84
85
86 static double adjustChkptPeriod=0.0; //in ms
87 static double nextCheckpointTime=0.0;//in seconds
88
89 double lastBufferedLocalMessageCopyTime;
90
91 int _maxBufferedMessages;
92 int _maxBufferedTicketRequests;
93 int BUFFER_TIME=2; // in ms
94
95
96 int _ticketRequestHandlerIdx;
97 int _ticketHandlerIdx;
98 int _localMessageCopyHandlerIdx;
99 int _localMessageAckHandlerIdx;
100 int _pingHandlerIdx;
101 int _bufferedLocalMessageCopyHandlerIdx;
102 int _bufferedLocalMessageAckHandlerIdx;
103 int _bufferedTicketRequestHandlerIdx;
104 int _bufferedTicketHandlerIdx;
105
106
107 char objString[100];
108 int _checkpointRequestHandlerIdx;
109 int _storeCheckpointHandlerIdx;
110 int _checkpointAckHandlerIdx;
111 int _getCheckpointHandlerIdx;
112 int _recvCheckpointHandlerIdx;
113 int _removeProcessedLogHandlerIdx;
114
115 int _verifyAckRequestHandlerIdx;
116 int _verifyAckHandlerIdx;
117 int _dummyMigrationHandlerIdx;
118
119
120 int     _getGlobalStepHandlerIdx;
121 int     _recvGlobalStepHandlerIdx;
122
123 int _updateHomeRequestHandlerIdx;
124 int _updateHomeAckHandlerIdx;
125 int _resendMessagesHandlerIdx;
126 int _resendReplyHandlerIdx;
127 int _receivedTNDataHandlerIdx;
128 int _distributedLocationHandlerIdx;
129
130 //TML: integer constants for group-based message logging
131 int _restartHandlerIdx;
132 int _getRestartCheckpointHandlerIdx;
133 int _recvRestartCheckpointHandlerIdx;
134
135 int verifyAckTotal;
136 int verifyAckCount;
137
138 int verifyAckedRequests=0;
139
140 RestartRequest *storedRequest;
141
142
143
144 int _falseRestart =0; /**
145                                                                                                         For testing on clusters we might carry out restarts on 
146                                                                                                         a porcessor without actually starting it
147                                                                                                         1 -> false restart
148                                                                                                         0 -> restart after an actual crash
149                                                                                                 */                                                                                                                              
150
151 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
152 int _lockNewTicket=0;
153
154
155 //Load balancing globals
156 int onGoingLoadBalancing=0;
157 void *centralLb;
158 void (*resumeLbFnPtr)(void *);
159 int _receiveMlogLocationHandlerIdx;
160 int _receiveMigrationNoticeHandlerIdx;
161 int _receiveMigrationNoticeAckHandlerIdx;
162 int _checkpointBarrierHandlerIdx;
163 int _checkpointBarrierAckHandlerIdx;
164
165 CkVec<MigrationRecord> migratedNoticeList;
166 CkVec<RetainedMigratedObject *> retainedObjectList;
167 int donotCountMigration=0;
168 int countLBMigratedAway=0;
169 int countLBToMigrate=0;
170 int migrationDoneCalled=0;
171 int checkpointBarrierCount=0;
172 int globalResumeCount=0;
173 CkGroupID globalLBID;
174 int restartDecisionNumber=-1;
175
176
177 double lastCompletedAlarm=0;
178 double lastRestart=0;
179
180
181 //update location globals
182 int _receiveLocationHandlerIdx;
183
184
185
186 // initialize message logging datastructures and register handlers
187 void _messageLoggingInit(){
188         //current object
189         CpvInitialize(Chare *,_currentObj);
190         
191         //registering handlers for message logging
192         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
193         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
194         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
195         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
196         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
197         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
198         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
199   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
200         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
201
202                 
203         //handlers for checkpointing
204         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
205         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
206         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
207         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
208
209
210         //handlers for restart
211         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
212         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
213         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
214         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
215         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
216         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
217         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
218         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
219         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
220         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
221         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
222
223         //TML: handlers for group-based message logging
224         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
225         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
226         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
227
228         
229         //handlers for load balancing
230         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
231         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
232         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
233         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
234         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
235         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
236         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
237         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
238         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
239
240         
241         //handlers for updating locations
242         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
243         
244         //Cpv variables for message logging
245         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
246         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
247         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
248         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
249         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
250         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
251         CpvInitialize(Queue, _outOfOrderMessageQueue);
252         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
253         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
254         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
255         
256         CpvInitialize(char **,_bufferedTicketRequests);
257         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
258         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
259         for(int i=0;i<CkNumPes();i++){
260                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
261                 CpvAccess(_numBufferedTicketRequests)[i]=0;
262         }
263   CpvInitialize(char *,_bufferTicketReply);
264         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
265         
266 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
267         CcdCallFnAfter(retryTicketRequest,NULL,100);    
268         
269         
270         //Cpv variables for checkpoint
271         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
272         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
273         
274 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
275 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
276 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
277         if(CkMyPe() == 0){
278 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
279 #ifdef  BUFFERED_LOCAL
280                 if(CmiMyPe() == 0){
281                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
282                 }
283 #endif
284         }
285 #ifdef  BUFFERED_REMOTE
286         if(CmiMyPe() == 0){
287                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
288         }
289 #endif
290
291         traceRegisterUserEvent("Remove Logs", 20);
292         traceRegisterUserEvent("Ticket Request Handler", 21);
293         traceRegisterUserEvent("Ticket Handler", 22);
294         traceRegisterUserEvent("Local Message Copy Handler", 23);
295         traceRegisterUserEvent("Local Message Ack Handler", 24);        
296         traceRegisterUserEvent("Preprocess current message",25);
297         traceRegisterUserEvent("Preprocess past message",26);
298         traceRegisterUserEvent("Preprocess future message",27);
299         traceRegisterUserEvent("Checkpoint",28);
300         traceRegisterUserEvent("Checkpoint Store",29);
301         traceRegisterUserEvent("Checkpoint Ack",30);
302         
303         traceRegisterUserEvent("Send Ticket Request",31);
304         traceRegisterUserEvent("Generalticketrequest1",32);
305         traceRegisterUserEvent("TicketLogLocal",33);
306         traceRegisterUserEvent("next_ticket and SN",34);
307         traceRegisterUserEvent("Timeout for buffered remote messages",35);
308         traceRegisterUserEvent("Timeout for buffered local messages",36);
309         traceRegisterUserEvent("Inform Location Home",37);
310         traceRegisterUserEvent("Receive Location Handler",38);
311         
312         lastCompletedAlarm=CmiWallTimer();
313         lastRestart = CmiWallTimer();
314 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
315         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
316 }
317
318 void killLocal(void *_dummy,double curWallTime);        
319
320 void readKillFile(){
321         FILE *fp=fopen(killFile,"r");
322         if(!fp){
323                 return;
324         }
325         int proc;
326         double sec;
327         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
328                 if(proc == CkMyPe()){
329                         killTime = CmiWallTimer()+sec;
330                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
331                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
332                 }
333         }
334         fclose(fp);
335 }
336
337 void killLocal(void *_dummy,double curWallTime){
338         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
339         if(CmiWallTimer()<killTime-1){
340                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
341         }else{  
342                 kill(getpid(),SIGKILL);
343         }
344 }
345
346
347
348 /************************ Message logging methods ****************/
349
350 // send a ticket request to a group on a processor
351 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
352         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
353                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
354                 void *origMsg = EnvToUsr(env);
355                 for(int i=0;i<CmiNumPes();i++){
356                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
357                                 void *copyMsg = CkCopyMsg(&origMsg);
358                                 envelope *copyEnv = UsrToEnv(copyMsg);
359                                 copyEnv->SN=0;
360                                 copyEnv->TN=0;
361                                 copyEnv->sender.type = TypeInvalid;
362                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
363                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
364                         }
365                 }
366                 return;
367         }
368         CkObjID recver;
369         recver.type = TypeGroup;
370         recver.data.group.id = env->getGroupNum();
371         recver.data.group.onPE = destPE;
372 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
373                 CmiPrintStackTrace(0);
374         }*/
375         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
376 }
377
378 //send a ticket request to a nodegroup
379 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
380         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
381                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
382                 void *origMsg = EnvToUsr(env);
383                 for(int i=0;i<CmiNumNodes();i++){
384                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
385                                 void *copyMsg = CkCopyMsg(&origMsg);
386                                 envelope *copyEnv = UsrToEnv(copyMsg);
387                                 copyEnv->SN=0;
388                                 copyEnv->TN=0;
389                                 copyEnv->sender.type = TypeInvalid;
390                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
391                         }
392                 }
393                 return;
394         }
395         CkObjID recver;
396         recver.type = TypeNodeGroup;
397         recver.data.group.id = env->getGroupNum();
398         recver.data.group.onPE = destNode;
399         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
400 }
401
402 //send a ticket request to an array element
403 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
404         CkObjID recver;
405         recver.type = TypeArray;
406         recver.data.array.id = env->getsetArrayMgr();
407         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
408
409         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
410                 char recverString[100],senderString[100];
411                 
412                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
413         }
414
415         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
416 };
417
418 /**
419  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
420  */
421 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
422         envelope *env = _env;
423         int resend=0; //is it a resend
424         char recverName[100];
425         double _startTime=CkWallTimer();
426         
427         if(CpvAccess(_currentObj) == NULL){
428 //              CkAssert(0);
429                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
430                 generalCldEnqueue(destPE,env,_infoIdx);
431                 return;
432         }
433         
434         if(env->sender.type == TypeInvalid){
435                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
436                 //Set message logging data in the envelope
437         }else{
438                 envelope *copyEnv = copyEnvelope(env);
439                 env = copyEnv;
440                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
441                 env->SN = 0;
442         }
443         
444         CkObjID &sender = env->sender;
445         env->recver = recver;
446
447         Chare *obj = (Chare *)env->sender.getObject();
448           
449         if(env->SN == 0){
450                 env->SN = obj->mlogData->nextSN(recver);
451         }else{
452                 resend = 1;
453         }
454         
455         char senderString[100];
456 //      if(env->SN != 1){
457                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
458         //      CmiPrintStackTrace(0);
459 /*      }else{
460                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
461         }*/
462                 
463         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
464 //      CkPackMessage(&(mEntry->env));
465 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
466         
467         _startTime = CkWallTimer();
468
469         // uses the proper ticketing mechanism for local, group and general messages
470         if(isLocal(destPE)){
471                 ticketLogLocalMessage(mEntry);
472         }else{
473                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
474         }
475 }
476
477 /**
478  * Determines if the message is local or not. A message is local if:
479  * 1) Both the destination and origin are the same PE.
480  */
481 inline bool isLocal(int destPE){
482         // both the destination and the origin are the same PE
483         if(destPE == CkMyPe())
484                 return true;
485
486         return false;
487 }
488
489 /**
490  * Determines if the message is group local or not. A message is group local if:
491  * 1) They belong to the same group in the group-based message logging.
492  */
493 inline bool isTeamLocal(int destPE){
494
495         // they belong to the same group
496         if(TEAM_SIZE_MLOG > 1 && destPE/TEAM_SIZE_MLOG == CkMyPe()/TEAM_SIZE_MLOG)
497                 return true;
498
499         return false;
500 }
501
502
503
504 /**
505  * Method that does the actual send by creating a ticket request filling it up and sending it.
506  */
507 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
508         char recverString[100],senderString[100];
509         envelope *env = entry->env;
510         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
511 /*      envelope *env = entry->env;
512         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
513
514         Chare *obj = (Chare *)entry->env->sender.getObject();
515         if(!resend){
516                 //TML: only stores message if either it goes to this processor or to a processor in a different group
517                 if(!isTeamLocal(entry->destPE)){
518                         obj->mlogData->addLogEntry(entry);
519                         MLOGFT_totalMessages += 1.0;
520                         MLOGFT_totalLogSize += entry->env->getTotalsize();
521                 }else{
522                         // the message has to be deleted after it has been sent
523                         entry->env->freeMsg = true;
524                 }
525         }
526
527 #ifdef BUFFERED_REMOTE
528         //buffer the ticket request 
529         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
530                 //first message to this processor, buffer needs to be created
531                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
532                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
533                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
534         }
535         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
536         //Buffer the ticketrequests
537         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
538         ticketRequest->sender = sender;
539         ticketRequest->recver = recver;
540         ticketRequest->logEntry = entry;
541         ticketRequest->SN = SN;
542         ticketRequest->senderPE = CkMyPe();
543
544         CpvAccess(_numBufferedTicketRequests)[destPE]++;
545         
546         
547         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
548                 sendBufferedTicketRequests(destPE);
549         }else{
550                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
551                         int *checkPE = new int;
552                         *checkPE = destPE;
553                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
554                 }
555         }
556 #else
557
558         TicketRequest ticketRequest;
559         ticketRequest.sender = sender;
560         ticketRequest.recver = recver;
561         ticketRequest.logEntry = entry;
562         ticketRequest.SN = SN;
563         ticketRequest.senderPE = CkMyPe();
564         
565         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
566 //      CmiBecomeImmediate(&ticketRequest);
567         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
568 #endif
569         DEBUG(CmiMemoryCheck());
570 };
571
572 /**
573  * Send the ticket requests buffered for processor PE
574  **/
575 void sendBufferedTicketRequests(int destPE){
576         DEBUG(CmiMemoryCheck());
577         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
578         if(numberRequests == 0){
579                 return;
580         }
581         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
582         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
583         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
584         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
585         header->numberLogs = numberRequests;
586         
587         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
588         CmiSyncSend(destPE,totalSize,(char *)buf);
589         
590         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
591         DEBUG(CmiMemoryCheck());
592 };
593
594 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
595         int destPE = *(int *)_destPE;
596   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
597                 sendBufferedTicketRequests(destPE);
598 //              traceUserEvent(35);
599         }
600         delete (int *)_destPE;
601         DEBUG(CmiMemoryCheck());
602 };
603
604 /**
605  * Gets a ticket for a local message and then sends a copy to the buddy.
606  * This method is always in the main thread(not interrupt).. so it should 
607  * never find itself locked out of a newTicket.
608  */
609 void ticketLogLocalMessage(MlogEntry *entry){
610         double _startTime=CkWallTimer();
611         DEBUG(CmiMemoryCheck());
612
613         Chare *recverObj = (Chare *)entry->env->recver.getObject();
614         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
615         if(recverObj){
616                 //Consider the case, after a restart when this message has already been allotted a ticket number
617                 // and should get the same one as the old one.
618                 Ticket ticket;
619                 if(recverObj->mlogData->mapTable.numObjects() > 0){
620                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
621                 }else{
622                         ticket.TN = 0;
623                 }
624                 
625                 char senderString[100], recverString[100] ;
626                 
627                 if(ticket.TN == 0){
628                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
629         
630                         if(ticket.TN == 0){
631                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
632                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
633                                 
634         //              _lockNewTicket = 0;
635 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
636                                 return;
637                         }
638                 }       
639                 //TODO: check for the case when an invalid ticket is returned
640                 //TODO: check for OLD or RECEIVED TICKETS
641                 entry->env->TN = ticket.TN;
642                 CkAssert(entry->env->TN > 0);
643                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
644         
645                 // sends a copy of the metadata to the buddy    
646                 sendLocalMessageCopy(entry);
647                 
648                 DEBUG(CmiMemoryCheck());
649
650                 // sets the unackedLocal flag and stores the message in the log
651                 entry->unackedLocal = 1;
652                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
653
654                 DEBUG(CmiMemoryCheck());
655         }else{
656                 CkPrintf("[%d] Local message in group-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
657                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
658         }
659         _lockNewTicket=0;
660 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
661 };
662
663 /**
664  * Sends the metadata of a local message to its buddy.
665  */
666 void sendLocalMessageCopy(MlogEntry *entry){
667         LocalMessageLog msgLog;
668         msgLog.sender = entry->env->sender;
669         msgLog.recver = entry->env->recver;
670         msgLog.SN = entry->env->SN;
671         msgLog.TN = entry->env->TN;
672         msgLog.entry = entry;
673         msgLog.senderPE = CkMyPe();
674         
675         char recvString[100];
676         char senderString[100];
677         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
678
679 #ifdef BUFFERED_LOCAL
680         countLocal++;
681         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
682         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
683                 sendBufferedLocalMessageCopy();
684         }else{
685                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
686                         lastBufferedLocalMessageCopyTime = CkWallTimer();
687                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
688                         countClearBufferedLocalCalls++;
689                 }       
690         }
691 #else   
692         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
693         
694         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
695 #endif
696         DEBUG(CmiMemoryCheck());
697 };
698
699
700 void sendBufferedLocalMessageCopy(){
701         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
702         if(numberLogs == 0){
703                 return;
704         }
705         countBuffered++;
706         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
707         void *buf=CmiAlloc(totalSize);
708         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
709         header->numberLogs=numberLogs;
710
711         DEBUG(CmiMemoryCheck());
712         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
713         
714         char *ptr = (char *)buf;
715         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
716         
717         for(int i=0;i<numberLogs;i++){
718                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
719                 memcpy(ptr,&log,sizeof(LocalMessageLog));
720                 ptr = &ptr[sizeof(LocalMessageLog)];
721         }
722
723         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
724
725         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
726         DEBUG(CmiMemoryCheck());
727 };
728
729 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
730         countClearBufferedLocalCalls--;
731         if(countClearBufferedLocalCalls > 10){
732                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
733         }
734         DEBUG(CmiMemoryCheck());
735         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
736         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
737                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
738                         sendBufferedLocalMessageCopy();
739 //                      traceUserEvent(36);
740                 }
741         }
742         DEBUG(CmiMemoryCheck());
743 }
744
745 /****
746         The handler functions
747 *****/
748
749
750 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
751 /**
752  *  If there are any delayed requests, process them first before 
753  *  processing this request
754  * */
755 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
756         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
757         double  _startTime = CkWallTimer();
758         if(CpvAccess(_delayedTicketRequests)->length() > 0){
759                 retryTicketRequest(NULL,_startTime);
760         }
761         _processTicketRequest(ticketRequest);
762         CmiFree(ticketRequest);
763 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
764 }
765 /** Handler used for dealing with a bunch of ticket requests
766  * from one processor. The replies are also bunched together
767  * Does not use _ticketRequestHandler
768  * */
769 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
770         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
771         DEBUG(CmiMemoryCheck());
772         double _startTime = CkWallTimer();
773         if(CpvAccess(_delayedTicketRequests)->length() > 0){
774                 retryTicketRequest(NULL,_startTime);
775         }
776         DEBUG(CmiMemoryCheck());
777   int numRequests = recvdHeader->numberLogs;
778         char *msg = (char *)recvdHeader;
779         msg = &msg[sizeof(BufferedTicketRequestHeader)];
780         int senderPE=((TicketRequest *)msg)->senderPE;
781
782         
783         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
784         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
785         
786         char *ptr = (char *)buf;
787         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
788         header->numberLogs = 0;
789
790         DEBUG(CmiMemoryCheck());
791         
792         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
793         
794         for(int i=0;i<numRequests;i++){
795                 TicketRequest *request = (TicketRequest *)msg;
796                 msg = &msg[sizeof(TicketRequest)];
797                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
798
799                 if(replied){
800                         //the ticket request has been processed and 
801                         //the reply will be stored in the ptr
802                         header->numberLogs++;
803                         ptr = &ptr[sizeof(TicketReply)];
804                 }
805         }
806 /*      if(header->numberLogs == 0){
807                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
808         }*/
809
810         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
811         CmiSyncSend(senderPE,totalSize,(char *)buf);
812         CmiFree(recvdHeader);
813 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
814         DEBUG(CmiMemoryCheck());
815 };
816
817 /**Process the ticket request. 
818  * If it is processed and a reply is being sent 
819  * by this processor return true
820  * else return false.
821  * If a reply buffer is specified put the reply into that
822  * else send the reply
823  * */
824 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
825
826 /*      if(_lockNewTicket){
827                 printf("ddeded %d\n",CkMyPe());
828                 if(CmiIsImmediate(ticketRequest)){
829                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
830                 }
831                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
832                 
833         }else{
834                 _lockNewTicket = 1;
835         }*/
836
837         DEBUG(CmiMemoryCheck());
838         CkObjID sender = ticketRequest->sender;
839         CkObjID recver = ticketRequest->recver;
840         MCount SN = ticketRequest->SN;
841         
842         
843         Chare *recverObj = (Chare *)recver.getObject();
844         
845         
846         char recverName[100];
847         DEBUG(recver.toString(recverName);)
848         if(recverObj == NULL){
849                 int estPE = recver.guessPE();
850                 if(estPE == CkMyPe() || estPE == -1){           
851                         //try to fulfill the request after some time
852                         char senderString[100];
853                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
854                         if(estPE == CkMyPe() && recver.type == TypeArray){
855                                 CkArrayID aid(recver.data.array.id);            
856                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
857                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
858
859                         }
860                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
861                         *delayed = *ticketRequest;
862                         CpvAccess(_delayedTicketRequests)->enq(delayed);
863                         
864                 }else{
865                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
866                         TicketRequest forward = *ticketRequest;
867                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
868                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
869                         
870                         
871                 }
872         DEBUG(CmiMemoryCheck());
873                 return false; // if the receverObj does not exist the ticket request cannot have been 
874                               // processed successfully
875         }else{
876                 char senderString[100];
877                 Ticket ticket;
878                 //check if a ticket for this has been already handed out to an object that used to be local but 
879                 // is no longer so.. need for parallel restart
880                 
881                 if(recverObj->mlogData->mapTable.numObjects() > 0){
882                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
883                 }
884                 
885                 if(ticket.TN == 0){
886                         ticket = recverObj->mlogData->next_ticket(sender,SN);
887                 }
888                 if(ticket.TN > recverObj->mlogData->tProcessed){
889                         ticket.state = NEW_TICKET;
890                 }else{
891                         ticket.state = OLD_TICKET;
892                 }
893                 //TODO: check for the case when an invalid ticket is returned
894                 if(ticket.TN == 0){
895                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
896                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
897                         *delayed = *ticketRequest;
898                         CpvAccess(_delayedTicketRequests)->enq(delayed);
899                         return false;
900                 }
901 /*              if(ticket.TN < SN){ //error state this really should not happen
902                         recver.toString(recverName);
903                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
904                 }*/
905 //              CkAssert(ticket.TN >= SN);
906                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
907
908 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
909     if(reply == NULL){ 
910                         //There is no reply buffer and the ticketreply is going to be 
911                         //sent immediately
912                         TicketReply ticketReply;
913                         ticketReply.request = *ticketRequest;
914                         ticketReply.ticket = ticket;
915                         ticketReply.recverPE = CkMyPe();
916                 
917                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
918 //              CmiBecomeImmediate(&ticketReply);
919                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
920          }else{ // Store ticket reply in the buffer provided
921                  reply->request = *ticketRequest;
922                  reply->ticket = ticket;
923                  reply->recverPE = CkMyPe();
924                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
925                                                          // in case the ticket needs to be forwarded or something
926
927          }
928                 DEBUG(CmiMemoryCheck());
929                 return true;
930         }
931 //      _lockNewTicket=0;
932 };
933
934
935 /**
936  * @brief This function handles the ticket received after a request.
937  */
938 inline void _ticketHandler(TicketReply *ticketReply){
939
940         double _startTime = CkWallTimer();
941         DEBUG(CmiMemoryCheck());        
942         
943         char senderString[100];
944         CkObjID sender = ticketReply->request.sender;
945         CkObjID recver = ticketReply->request.recver;
946         
947         if(sender.guessPE() != CkMyPe()){       
948                 DEBUG(CkAssert(sender.guessPE()>= 0));
949                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
950         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
951                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
952                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
953 #ifdef BUFFERED_REMOTE
954                 //will be freed by the buffered ticket handler most of the time
955                 //this might lead to a leak just after migration
956                 //when the ticketHandler is directly used without going through the buffered handler
957                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
958 #else
959                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
960 #endif  
961         }else{
962                 char recverName[100];
963                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
964                 MlogEntry *logEntry=NULL;
965                 if(ticketReply->ticket.state & FORWARDED_TICKET){
966                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
967                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
968                         Chare *senderObj = (Chare *)sender.getObject();
969                         if(senderObj){
970                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
971                                 for(int i=0;i<mlog->length();i++){
972                                         MlogEntry *tempEntry = (*mlog)[i];
973                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
974                                                 logEntry = tempEntry;
975                                                 break;
976                                         }
977                                 }
978                                 if(logEntry == NULL){
979 #ifdef BUFFERED_REMOTE
980 #else
981                                         CmiFree(ticketReply);
982 #endif                                  
983                                         return;
984                                 }
985                         }else{
986                                 CmiAbort("This processor thinks it should have the sender\n");
987                         }
988                         ticketReply->ticket.state ^= FORWARDED_TICKET;
989                 }else{
990                         logEntry = ticketReply->request.logEntry;
991                 }
992                 if(logEntry->env->TN <= 0){
993                         //This logEntry has not received a TN earlier
994                         char recverString[100];
995                         logEntry->env->TN = ticketReply->ticket.TN;
996                         logEntry->env->setSrcPe(CkMyPe());
997                         if(ticketReply->ticket.state == NEW_TICKET){
998
999                                 // if message is group local, we store its metadata in teamTable
1000                                 if(isTeamLocal(ticketReply->recverPE)){
1001                                         DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
1002                                         Chare *senderObj = (Chare *)sender.getObject();
1003                                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
1004                                         if(ticketRow == NULL){
1005                                                 ticketRow = new SNToTicket();
1006                                                 senderObj->mlogData->teamTable.put(recver) = ticketRow; 
1007                                         }
1008                                         ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
1009                                 }
1010
1011                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
1012                                 if(ticketReply->recverPE != CkMyPe()){
1013                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
1014                                 }else{
1015                                         //It is now a local message use the local message protocol
1016                                         sendLocalMessageCopy(logEntry);
1017                                 }       
1018                         }
1019                 }else{
1020                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
1021                 }
1022                 recver.updatePosition(ticketReply->recverPE);
1023 #ifdef BUFFERED_REMOTE
1024 #else
1025                 CmiFree(ticketReply);
1026 #endif
1027         }
1028         CmiMemoryCheck();
1029
1030 //      traceUserBracketEvent(22,_startTime,CkWallTimer());     
1031 };
1032
1033 /**
1034  * Message to handle the bunch of tickets 
1035  * that we get from one processor. We send 
1036  * the tickets to be handled one at a time
1037  * */
1038
1039 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
1040         double _startTime=CmiWallTimer();
1041         int numTickets = recvdHeader->numberLogs;
1042         char *msg = (char *)recvdHeader;
1043         msg = &msg[sizeof(BufferedTicketRequestHeader)];
1044         DEBUG(CmiMemoryCheck());
1045         
1046         TicketReply *_reply = (TicketReply *)msg;
1047
1048         
1049         for(int i=0;i<numTickets;i++){
1050                 TicketReply *reply = (TicketReply *)msg;
1051                 _ticketHandler(reply);
1052                 
1053                 msg = &msg[sizeof(TicketReply)];
1054         }
1055         
1056         CmiFree(recvdHeader);
1057 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1058         DEBUG(CmiMemoryCheck());
1059 };
1060
1061 /**
1062  * Stores the metadata of a local message from its buddy.
1063  */
1064 void _localMessageCopyHandler(LocalMessageLog *msgLog){
1065         double _startTime = CkWallTimer();
1066         
1067         char senderString[100],recverString[100];
1068         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1069 /*      if(!fault_aware(msgLog->recver)){
1070                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1071         }*/
1072         CpvAccess(_localMessageLog)->enq(*msgLog);
1073         
1074         LocalMessageLogAck ack;
1075         ack.entry = msgLog->entry;
1076         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1077         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1078         CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
1079         
1080 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1081 };
1082
1083 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1084         double _startTime = CkWallTimer();
1085         DEBUG(CmiMemoryCheck());
1086         
1087         int numLogs = recvdHeader->numberLogs;
1088         char *msg = (char *)recvdHeader;
1089
1090         //piggy back the logs already stored on this processor
1091         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1092         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1093 /*      if(numPiggyLogs > 0){
1094                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1095                         CmiAssert(0);
1096                 }
1097         }*/
1098         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1099         
1100         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1101         void *buf = CmiAlloc(totalSize);
1102         char *ptr = (char *)buf;
1103         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1104         
1105         msg = &msg[sizeof(BufferedLocalLogHeader)];
1106         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1107
1108         DEBUG(CmiMemoryCheck());
1109         int PE;
1110         for(int i=0;i<numLogs;i++){
1111                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1112                 CpvAccess(_localMessageLog)->enq(*msgLog);
1113                 PE = msgLog->senderPE;
1114                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1115
1116                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1117                 ack->entry = msgLog->entry;
1118                 
1119                 msg = &msg[sizeof(LocalMessageLog)];
1120                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1121         }
1122         DEBUG(CmiMemoryCheck());
1123
1124         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1125         piggyHeader->numberLogs = numPiggyLogs;
1126         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1127         if(numPiggyLogs > 0){
1128                 countPiggy++;
1129         }
1130
1131         for(int i=0;i<numPiggyLogs;i++){
1132                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1133                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1134                 ptr = &ptr[sizeof(LocalMessageLog)];
1135         }
1136         DEBUG(CmiMemoryCheck());
1137         
1138         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1139         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1140                 
1141 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1142                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1143                         if(!fault_aware(localLogEntry.recver)){
1144                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1145                         }
1146         }*/
1147         if(freeHeader){
1148                 CmiFree(recvdHeader);
1149         }
1150         DEBUG(CmiMemoryCheck());
1151 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1152 }
1153
1154
1155 void _localMessageAckHandler(LocalMessageLogAck *ack){
1156         
1157         double _startTime = CkWallTimer();
1158         
1159         MlogEntry *entry = ack->entry;
1160         if(entry == NULL){
1161                 CkExit();
1162         }
1163         entry->unackedLocal = 0;
1164         envelope *env = entry->env;
1165         char recverName[100];
1166         char senderString[100];
1167         DEBUG(CmiMemoryCheck());
1168         
1169         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1170         if(env == NULL)
1171                 return;
1172         CkAssert(env->SN > 0);
1173         CkAssert(env->TN > 0);
1174         env->sender.toString(senderString);
1175         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1176         env->recver.toString(recverName);
1177
1178         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1179         
1180 /*      
1181         void *origMsg = EnvToUsr(env);
1182         void *copyMsg = CkCopyMsg(&origMsg);
1183         envelope *copyEnv = UsrToEnv(copyMsg);
1184         entry->env = UsrToEnv(origMsg);*/
1185
1186 //      envelope *copyEnv = copyEnvelope(env);
1187
1188         envelope *copyEnv = env;
1189         copyEnv->localMlogEntry = entry;
1190
1191         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1192         if(CmiMyPe() != entry->destPE){
1193                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1194         }
1195 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1196         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1197         
1198         
1199 #ifdef BUFFERED_LOCAL
1200 #else
1201         CmiFree(ack);
1202 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1203 #endif
1204         
1205         DEBUG(CmiMemoryCheck());
1206         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1207 }
1208
1209
1210 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1211
1212         double _startTime=CkWallTimer();
1213         DEBUG(CmiMemoryCheck());
1214
1215         int numLogs = recvdHeader->numberLogs;
1216         char *msg = (char *)recvdHeader;
1217         msg = &msg[sizeof(BufferedLocalLogHeader)];
1218
1219         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1220         
1221         for(int i=0;i<numLogs;i++){
1222                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1223                 _localMessageAckHandler(ack);
1224                 
1225                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1226         }
1227
1228         //deal with piggy backed local logs
1229         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1230         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1231         if(piggyHeader->numberLogs > 0){
1232                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1233         }
1234         
1235         CmiFree(recvdHeader);
1236         DEBUG(CmiMemoryCheck());
1237 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1238 }
1239
1240
1241
1242
1243
1244
1245 bool fault_aware(CkObjID &recver){
1246         switch(recver.type){
1247                 case TypeChare:
1248                         return false;
1249                 case TypeMainChare:
1250                         return false;
1251                 case TypeGroup:
1252                 case TypeNodeGroup:
1253                 case TypeArray:
1254                         return true;
1255                 default:
1256                         return false;
1257         }
1258 };
1259
1260 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1261         char recverString[100];
1262         char senderString[100];
1263         
1264         DEBUG(CmiMemoryCheck());
1265         CkObjID recver = env->recver;
1266         if(!fault_aware(recver))
1267                 return 1;
1268
1269
1270         Chare *obj = (Chare *)recver.getObject();
1271         *objPointer = obj;
1272         if(obj == NULL){
1273                 int possiblePE = recver.guessPE();
1274                 if(possiblePE != CkMyPe()){
1275                         int totalSize = env->getTotalsize();                    
1276                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1277                 }
1278                 return 0;
1279         }
1280
1281
1282         double _startTime = CkWallTimer();
1283 //env->sender.updatePosition(env->getSrcPe());
1284         if(env->TN == obj->mlogData->tProcessed+1){
1285                 //the message that needs to be processed now
1286                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1287                 // once we find a message that we can process we put back all the messages in the out of order queue
1288                 // back into the main scheduler queue. 
1289                 if(env->sender.guessPE() == CkMyPe()){
1290                         *logEntryPointer = env->localMlogEntry;
1291                 }
1292         DEBUG(CmiMemoryCheck());
1293                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1294                         void *qMsgPtr;
1295                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1296                         envelope *qEnv = (envelope *)qMsgPtr;
1297                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1298         DEBUG(CmiMemoryCheck());
1299                 }
1300 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1301                 //TODO: this might be a problem.. change made for leanMD
1302 //              CpvAccess(_currentObj) = obj;
1303         DEBUG(CmiMemoryCheck());
1304                 return 1;
1305         }
1306         if(env->TN <= obj->mlogData->tProcessed){
1307                 //message already processed
1308                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1309 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1310         DEBUG(CmiMemoryCheck());
1311                 return 0;
1312         }
1313         //message that needs to be processed in the future
1314
1315 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1316         //the message cant be processed now put it back in the out of order message Q, 
1317         //It will be transferred to the main queue later
1318         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1319 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1320         DEBUG(CmiMemoryCheck());
1321         
1322         return 0;
1323 }
1324
1325 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1326         char senderString[100];
1327         if(obj){
1328                 if(sender.guessPE() == CkMyPe()){
1329                         if(entry != NULL){
1330                                 entry->env = NULL;
1331                         }
1332                 }
1333                 obj->mlogData->tProcessed++;
1334 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1335                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1336 //              CpvAccess(_currentObj)= NULL;
1337         }
1338         DEBUG(CmiMemoryCheck());
1339 }
1340
1341 /***
1342         Helpers for the handlers and message logging methods
1343 ***/
1344
1345 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1346 //      double _startTime = CkWallTimer();
1347         if(env->recver.type != TypeNodeGroup){
1348         //This repeats a step performed in skipCldEnq for messages sent to
1349         //other processors. I do this here so that messages to local processors
1350         //undergo the same transformation.. It lets the resend be uniform for 
1351         //all messages
1352 //              CmiSetXHandler(env,CmiGetHandler(env));
1353                 _skipCldEnqueue(destPE,env,_infoIdx);
1354         }else{
1355                 _noCldNodeEnqueue(destPE,env);
1356         }
1357 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1358 }
1359 //extern "C" int CmiGetNonLocalLength();
1360 /** This method is used to retry the ticket requests
1361  * that had been queued up earlier
1362  * */
1363
1364 int calledRetryTicketRequest=0;
1365
1366 void retryTicketRequestTimer(void *_dummy,double _time){
1367                 calledRetryTicketRequest=0;
1368                 retryTicketRequest(_dummy,_time);
1369 }
1370
1371 void retryTicketRequest(void *_dummy,double curWallTime){       
1372         double start = CkWallTimer();
1373         DEBUG(CmiMemoryCheck());
1374         int length = CpvAccess(_delayedTicketRequests)->length();
1375         for(int i=0;i<length;i++){
1376                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1377                 if(ticketRequest){
1378                         char senderString[100],recverString[100];
1379                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1380                         DEBUG(CmiMemoryCheck());
1381                         _processTicketRequest(ticketRequest);
1382                   CmiFree(ticketRequest);
1383                         DEBUG(CmiMemoryCheck());
1384                 }       
1385         }       
1386         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1387                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1388                 ticketLogLocalMessage(entry);
1389         }
1390         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1391 //      int converse_qLength = CmiGetNonLocalLength();
1392         
1393 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1394
1395 /*      PingMsg pingMsg;
1396         pingMsg.PE = CkMyPe();
1397         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1398         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1399                 for(int i=0;i<CkNumPes();i++){
1400                         if(i != CkMyPe()){
1401                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1402                         }
1403                 }
1404         }*/     
1405         //TODO: change this back to 100
1406         if(calledRetryTicketRequest == 0){
1407                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1408                 calledRetryTicketRequest =1;
1409         }
1410         DEBUG(CmiMemoryCheck());
1411 }
1412
1413 void _pingHandler(CkPingMsg *msg){
1414         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1415         CmiFree(msg);
1416 }
1417
1418
1419 /*****************************************************************************
1420         Checkpointing methods..
1421                 Pack all the data on a processor and send it to the buddy periodically
1422                 Also used to throw away message logs
1423 *****************************************************************************/
1424 CkVec<TProcessedLog> processedTicketLog;
1425 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1426 void clearUpMigratedRetainedLists(int PE);
1427
1428 void checkpointAlarm(void *_dummy,double curWallTime){
1429         double diff = curWallTime-lastCompletedAlarm;
1430         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1431 /*      if(CkWallTimer()-lastRestart < 50){
1432                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1433                 return;
1434         }*/
1435         if(diff < ((chkptPeriod) - 2)){
1436                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1437                 return;
1438         }
1439         CheckpointRequest request;
1440         request.PE = CkMyPe();
1441         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1442         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1443 };
1444
1445 void _checkpointRequestHandler(CheckpointRequest *request){
1446         startMlogCheckpoint(NULL,CmiWallTimer());       
1447 }
1448
1449 void startMlogCheckpoint(void *_dummy,double curWallTime){
1450         double _startTime = CkWallTimer();
1451         checkpointCount++;
1452 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1453                 kill(getpid(),SIGKILL);
1454         }*/
1455         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1456                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1457         }
1458         PUP::sizer psizer;
1459         DEBUG(CmiMemoryCheck());
1460
1461         psizer | checkpointCount;
1462         
1463         CkPupROData(psizer);
1464         DEBUG(CmiMemoryCheck());
1465         CkPupGroupData(psizer);
1466         DEBUG(CmiMemoryCheck());
1467         CkPupNodeGroupData(psizer);
1468         DEBUG(CmiMemoryCheck());
1469         pupArrayElementsSkip(psizer,NULL);
1470         DEBUG(CmiMemoryCheck());
1471
1472         int dataSize = psizer.size();
1473         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1474         char *msg = (char *)CmiAlloc(totalSize);
1475         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1476         chkMsg->PE = CkMyPe();
1477         chkMsg->dataSize = dataSize;
1478
1479         
1480         char *buf = &msg[sizeof(CheckPointDataMsg)];
1481         PUP::toMem pBuf(buf);   
1482
1483         pBuf | checkpointCount;
1484         
1485         CkPupROData(pBuf);
1486         CkPupGroupData(pBuf);
1487         CkPupNodeGroupData(pBuf);
1488         pupArrayElementsSkip(pBuf,NULL);
1489
1490         unAckedCheckpoint=1;
1491         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1492         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1493         
1494         /*
1495                 Store the highest Ticket number processed for each chare on this processor
1496         */
1497         processedTicketLog.removeAll();
1498         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1499         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1500                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1501         }
1502
1503         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1504                 lastCompletedAlarm = curWallTime;
1505                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1506         }
1507         traceUserBracketEvent(28,_startTime,CkWallTimer());
1508 };
1509
1510 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1511         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1512         TProcessedLog logEntry;
1513         logEntry.recver = mlogData->objID;
1514         logEntry.tProcessed = mlogData->tProcessed;
1515         log->push_back(logEntry);
1516         char objString[100];
1517         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1518 }
1519
1520 class ElementPacker : public CkLocIterator {
1521 private:
1522         CkLocMgr *locMgr;
1523         PUP::er &p;
1524 public:
1525                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1526                 void addLocation(CkLocation &loc) {
1527                         CkArrayIndexMax idx=loc.getIndex();
1528                         CkGroupID gID = locMgr->ckGetGroupID();
1529                         p|gID;      // store loc mgr's GID as well for easier restore
1530                         p|idx;
1531                         p|loc;
1532     }
1533 };
1534
1535
1536 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1537         int numElements,i;
1538   int numGroups = CkpvAccess(_groupIDTable)->size();    
1539         if(!p.isUnpacking()){
1540                 numElements = CkCountArrayElements();
1541         }       
1542         p | numElements;
1543         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1544         if(!p.isUnpacking()){
1545                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1546         }else{
1547                 //Flush all recs of all LocMgrs before putting in new elements
1548 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1549                 for(int j=0;j<listsize;j++){
1550                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1551                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1552                                 listToSkip[j].idx.print();
1553                         }
1554                 }
1555                 
1556  printf("numElements = %d\n",numElements);
1557         
1558           for (int i=0; i<numElements; i++) {
1559                         CkGroupID gID;
1560                         CkArrayIndexMax idx;
1561                         p|gID;
1562             p|idx;
1563                         int flag=0;
1564                         int matchedIdx=0;
1565                         for(int j=0;j<listsize;j++){
1566                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1567                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1568                                                 matchedIdx = j;
1569                                                 flag = 1;
1570                                                 break;
1571                                         }
1572                                 }
1573                         }
1574                         if(flag == 1){
1575                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1576                         }else{
1577                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1578                         }
1579                                 
1580                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1581                         mgr->resume(idx,p,flag);
1582                         if(flag == 1){
1583                                 int homePE = mgr->homePe(idx);
1584                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1585                         }
1586           }
1587         }
1588 };
1589
1590
1591 void writeCheckpointToDisk(int size,char *chkpt){
1592         char fNameTemp[100];
1593         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1594         int fd = creat(fNameTemp,S_IRWXU);
1595         int ret = write(fd,chkpt,size);
1596         CkAssert(ret == size);
1597         close(fd);
1598         
1599         char fName[100];
1600         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1601         unlink(fName);
1602
1603         rename(fNameTemp,fName);
1604         
1605 }
1606
1607 //handler that receives the checkpoint from a processor
1608 //it stores it and acks it
1609 void _storeCheckpointHandler(char *msg){
1610         
1611         double _startTime=CkWallTimer();
1612                 
1613         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1614         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1615         
1616         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1617         
1618         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1619         if(oldChkpt != NULL){
1620                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1621                 CmiFree(oldmsg);
1622         }
1623         //turning off storing checkpoints
1624         
1625         int sendingPE = chkMsg->PE;
1626         
1627         CpvAccess(_storedCheckpointData)->buf = chkpt;
1628         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1629         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1630
1631 #ifdef CHECKPOINT_DISK
1632         //store the checkpoint on disk
1633         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1634         CpvAccess(_storedCheckpointData)->buf = NULL;
1635         CmiFree(msg);
1636 #endif
1637
1638         int count=0;
1639         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1640                 if(migratedNoticeList[j].fromPE == sendingPE){
1641                         migratedNoticeList[j].ackFrom = 1;
1642                 }else{
1643                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1644                 }
1645                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1646                         migratedNoticeList.remove(j);
1647                         count++;
1648                 }
1649                 
1650         }
1651         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1652         
1653         CheckPointAck ackMsg;
1654         ackMsg.PE = CkMyPe();
1655         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1656         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1657         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1658         
1659         
1660         
1661         traceUserBracketEvent(29,_startTime,CkWallTimer());
1662 };
1663
1664
1665 void sendRemoveLogRequests(){
1666         double _startTime = CkWallTimer();      
1667         //send out the messages asking senders to throw away message logs below a certain ticket number
1668         /*
1669                 The remove log request message looks like
1670                 |RemoveLogRequest||List of TProcessedLog|
1671         */
1672         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1673         char *requestMsg = (char *)CmiAlloc(totalSize);
1674         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1675         request->PE = CkMyPe();
1676         request->numberObjects = processedTicketLog.size();
1677         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1678         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1679         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1680         
1681         DEBUG(CmiMemoryCheck());
1682         for(int i=0;i<CkNumPes();i++){
1683                 CmiSyncSend(i,totalSize,requestMsg);
1684         }
1685         CmiFree(requestMsg);
1686
1687         clearUpMigratedRetainedLists(CmiMyPe());
1688         //TODO: clear ticketTable
1689         
1690         traceUserBracketEvent(30,_startTime,CkWallTimer());
1691         DEBUG(CmiMemoryCheck());
1692 }
1693
1694
1695 void _checkpointAckHandler(CheckPointAck *ackMsg){
1696         DEBUG(CmiMemoryCheck());
1697         unAckedCheckpoint=0;
1698         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1699         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1700         if(onGoingLoadBalancing){
1701                 onGoingLoadBalancing = 0;
1702                 finishedCheckpointLoadBalancing();
1703         }else{
1704                 sendRemoveLogRequests();
1705         }
1706         CmiFree(ackMsg);
1707         
1708 };
1709
1710 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1711         DEBUG(CmiMemoryCheck());
1712         CmiMemoryCheck();
1713         char *data = (char *)_data;
1714         RemoveLogRequest *request = (RemoveLogRequest *)data;
1715         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1716         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1717
1718         int count=0;
1719         for(int i=0;i<mlog->length();i++){
1720                 MlogEntry *logEntry = mlog->deq();
1721                 int match=0;
1722                 for(int j=0;j<request->numberObjects;j++){
1723                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1724                                 //this log Entry should be removed
1725                                 match = 1;
1726                                 break;
1727                         }
1728                 }
1729                 char senderString[100],recverString[100];
1730 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1731                 if(match){
1732                         count++;
1733                         delete logEntry;
1734                 }else{
1735                         mlog->enq(logEntry);
1736                 }
1737         }
1738         if(count > 0){
1739                 char nameString[100];
1740                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1741         }
1742         DEBUG(CmiMemoryCheck());
1743         CmiMemoryCheck();
1744 }
1745
1746 void _removeProcessedLogHandler(char *requestMsg){
1747         double start = CkWallTimer();
1748         forAllCharesDo(removeProcessedLogs,requestMsg);
1749         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1750         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1751         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1752         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1753         if(request->PE == getCheckPointPE()){
1754                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1755                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1756                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1757                 int count=0;
1758 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1759                 DEBUG(char nameString[100];)
1760                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1761                 DEBUG(})*/
1762                 for(int i=0;i<localQ->length();i++){
1763                         LocalMessageLog localLogEntry = (*localQ)[i];
1764                         if(!fault_aware(localLogEntry.recver)){
1765                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1766                         }
1767                         bool keep = true;
1768                         for(int j=0;j<request->numberObjects;j++){                              
1769                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1770                                         keep = false;
1771                                         break;
1772                                 }
1773                         }       
1774                         if(keep){
1775                                 tempQ->enq(localLogEntry);
1776                         }else{
1777                                 count++;
1778                         }
1779                 }
1780                 delete localQ;
1781                 CpvAccess(_localMessageLog) = tempQ;
1782                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1783         }
1784
1785         /*
1786                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1787         */
1788         CmiMemoryCheck();
1789         clearUpMigratedRetainedLists(request->PE);
1790         
1791         traceUserBracketEvent(20,start,CkWallTimer());
1792         CmiFree(requestMsg);    
1793 };
1794
1795
1796 void clearUpMigratedRetainedLists(int PE){
1797         int count=0;
1798         CmiMemoryCheck();
1799         
1800         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1801                 if(migratedNoticeList[j].toPE == PE){
1802                         migratedNoticeList[j].ackTo = 1;
1803                 }
1804                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1805                         migratedNoticeList.remove(j);
1806                         count++;
1807                 }
1808         }
1809         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1810         
1811         for(int j=retainedObjectList.size()-1;j>=0;j--){
1812                 if(retainedObjectList[j]->migRecord.toPE == PE){
1813                         RetainedMigratedObject *obj = retainedObjectList[j];
1814                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1815                         retainedObjectList.remove(j);
1816                         if(obj->msg != NULL){
1817                                 CmiMemoryCheck();
1818                                 CmiFree(obj->msg);
1819                         }
1820                         delete obj;
1821                 }
1822         }
1823 }
1824
1825 /***************************************************************
1826         Restart Methods and handlers
1827 ***************************************************************/        
1828
1829 /**
1830  * Function for restarting the crashed processor.
1831  * It sets the restart flag and contacts the buddy
1832  * processor to get the latest checkpoint.
1833  */
1834 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1835         RestartRequest msg;
1836
1837         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1838
1839         // setting the restart flag
1840         _restartFlag = 1;
1841
1842         // if we are using group-based message logging, all members of the group have to be restarted
1843         if(TEAM_SIZE_MLOG > 1){
1844                 for(int i=(CkMyPe()/TEAM_SIZE_MLOG)*TEAM_SIZE_MLOG; i<((CkMyPe()/TEAM_SIZE_MLOG)+1)*TEAM_SIZE_MLOG; i++){
1845                         if(i != CkMyPe() && i < CkNumPes()){
1846                                 // sending a message to the group member
1847                                 msg.PE = CkMyPe();
1848                             CmiSetHandler(&msg,_restartHandlerIdx);
1849                             //HERE CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1850                         }
1851                 }
1852         }
1853
1854         // requesting the latest checkpoint from its buddy
1855         msg.PE = CkMyPe();
1856         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1857         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1858 };
1859
1860 /**
1861  * Function to restart this processor.
1862  * The handler is invoked by a member of its same group in message logging.
1863  */
1864 void _restartHandler(RestartRequest *restartMsg){
1865         int i;
1866         int numGroups = CkpvAccess(_groupIDTable)->size();
1867         RestartRequest msg;
1868         
1869         fprintf(stderr,"[%d] Restart-group started at %.6lf \n",CkMyPe(),CmiWallTimer());
1870
1871     // setting the restart flag
1872         _restartFlag = 1;
1873
1874         // flushing all buffers 
1875 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1876         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1877         for(int i=0;i<numGroups;i++){
1878         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1879                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1880                 obj->flushStates();
1881                 obj->ckJustMigrated();
1882         }*/
1883
1884     // requesting the latest checkpoint from its buddy
1885         msg.PE = CkMyPe();
1886         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1887         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1888 }
1889
1890
1891 /**
1892  * Gets the stored checkpoint but calls another function in the sender.
1893  */
1894 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1895
1896         // retrieving the stored checkpoint
1897         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1898
1899         // making sure it is its buddy who is requesting the checkpoint
1900         CkAssert(restartMsg->PE == storedChkpt->PE);
1901
1902         storedRequest = restartMsg;
1903         verifyAckTotal = 0;
1904
1905         for(int i=0;i<migratedNoticeList.size();i++){
1906                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1907 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1908                         if(migratedNoticeList[i].ackFrom == 0){
1909                                 //need to verify if the object actually exists .. it might not
1910                                 //have been acked but it might exist on it
1911                                 VerifyAckMsg msg;
1912                                 msg.migRecord = migratedNoticeList[i];
1913                                 msg.index = i;
1914                                 msg.fromPE = CmiMyPe();
1915                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1916                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1917                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1918                                 verifyAckTotal++;
1919                         }
1920                 }
1921         }
1922
1923         // sending the checkpoint back to its buddy     
1924         if(verifyAckTotal == 0){
1925                 sendCheckpointData(MLOG_RESTARTED);
1926         }
1927         verifyAckCount = 0;
1928 }
1929
1930 /**
1931  * Receives the checkpoint coming from its buddy.
1932  */
1933 void _recvRestartCheckpointHandler(char *_restartData){
1934         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1935         MigrationRecord *migratedAwayElements;
1936
1937         globalLBID = restartData->lbGroupID;
1938         
1939         restartData->restartWallTime *= 1000;
1940         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1941         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1942         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1943
1944         
1945         printf("[%d] Group Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1946         char *buf = &_restartData[sizeof(RestartProcessorData)];
1947         
1948         if(restartData->numMigratedAwayElements != 0){
1949                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1950                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1951                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1952                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1953         }
1954         
1955         PUP::fromMem pBuf(buf);
1956
1957         pBuf | checkpointCount;
1958
1959         CkPupROData(pBuf);
1960         CkPupGroupData(pBuf);
1961         CkPupNodeGroupData(pBuf);
1962 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1963         pupArrayElementsSkip(pBuf,NULL);
1964         CkAssert(pBuf.size() == restartData->checkPointSize);
1965         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1966         
1967         forAllCharesDo(initializeRestart,NULL);
1968         
1969         //store the restored local message log in a vector
1970         buf = &buf[restartData->checkPointSize];        
1971         for(int i=0;i<restartData->numLocalMessages;i++){
1972                 LocalMessageLog logEntry;
1973                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
1974                 
1975                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
1976                 if(recverObj!=NULL){
1977                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
1978                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
1979                         char senderString[100];
1980                         char recverString[100];
1981                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
1982                 }else{
1983 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
1984                 }
1985                 buf = &buf[sizeof(LocalMessageLog)];
1986         }
1987
1988         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
1989         CmiFree(_restartData);  
1990         /*HERE _initDone();
1991
1992         getGlobalStep(globalLBID);
1993         
1994         countUpdateHomeAcks = 0;
1995         RestartRequest updateHomeRequest;
1996         updateHomeRequest.PE = CmiMyPe();
1997         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1998         for(int i=0;i<CmiNumPes();i++){
1999                 if(i != CmiMyPe()){
2000                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2001                 }
2002         }
2003 */
2004
2005
2006         // Send out the request to resend logged messages to all other processors
2007         CkVec<TProcessedLog> objectVec;
2008         forAllCharesDo(createObjIDList, (void *)&objectVec);
2009         int numberObjects = objectVec.size();
2010         
2011         /*
2012                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2013         */
2014         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2015         char *resendMsg = (char *)CmiAlloc(totalSize);
2016         
2017
2018         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2019         resendReq->PE =CkMyPe(); 
2020         resendReq->numberObjects = numberObjects;
2021         char *objList = &resendMsg[sizeof(ResendRequest)];
2022         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2023         
2024
2025         /* test for parallel restart migrate away object**/
2026         if(parallelRestart){
2027                 distributeRestartedObjects();
2028                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2029         }
2030         
2031         /*      To make restart work for load balancing.. should only
2032         be used when checkpoint happens along with load balancing
2033         **/
2034 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2035
2036         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2037         CpvAccess(_currentObj) = lb;
2038         lb->ReceiveDummyMigration(restartDecisionNumber);
2039
2040         sleep(10);
2041         
2042         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2043         for(int i=0;i<CkNumPes();i++){
2044                 if(i != CkMyPe()){
2045                         CmiSyncSend(i,totalSize,resendMsg);
2046                 }       
2047         }
2048         _resendMessagesHandler(resendMsg);
2049
2050
2051 }
2052
2053
2054 void CkMlogRestartDouble(void *,double){
2055         CkMlogRestart(NULL,NULL);
2056 };
2057
2058 //TML: restarting from local (group) failure
2059 void CkMlogRestartLocal(){
2060     CkMlogRestart(NULL,NULL);
2061 };
2062
2063
2064 void readCheckpointFromDisk(int size,char *buf){
2065         char fName[100];
2066         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
2067
2068         int fd = open(fName,O_RDONLY);
2069         int count=0;
2070         while(count < size){
2071                 count += read(fd,&buf[count],size-count);
2072         }
2073         close(fd);
2074         
2075 };
2076
2077
2078
2079 /**
2080  * Gets the stored checkpoint for its buddy processor.
2081  */
2082 void _getCheckpointHandler(RestartRequest *restartMsg){
2083
2084         // retrieving the stored checkpoint
2085         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
2086
2087         // making sure it is its buddy who is requesting the checkpoint
2088         CkAssert(restartMsg->PE == storedChkpt->PE);
2089
2090         storedRequest = restartMsg;
2091         verifyAckTotal = 0;
2092
2093         for(int i=0;i<migratedNoticeList.size();i++){
2094                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
2095 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
2096                         if(migratedNoticeList[i].ackFrom == 0){
2097                                 //need to verify if the object actually exists .. it might not
2098                                 //have been acked but it might exist on it
2099                                 VerifyAckMsg msg;
2100                                 msg.migRecord = migratedNoticeList[i];
2101                                 msg.index = i;
2102                                 msg.fromPE = CmiMyPe();
2103                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
2104                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
2105                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
2106                                 verifyAckTotal++;
2107                         }
2108                 }
2109         }
2110
2111         // sending the checkpoint back to its buddy     
2112         if(verifyAckTotal == 0){
2113                 sendCheckpointData(MLOG_CRASHED);
2114         }
2115         verifyAckCount = 0;
2116 }
2117
2118
2119 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
2120         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
2121         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
2122         if(rec != NULL && rec->type() == CkLocRec::local){
2123                         //this location exists on this processor
2124                         //and needs to be removed       
2125                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
2126                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
2127                         
2128                         int localIdx = localRec->getLocalIndex();
2129                         LBDatabase *lbdb = localRec->getLBDB();
2130                         LDObjHandle ldHandle = localRec->getLdHandle();
2131                                 
2132                         locMgr->setDuringMigration(true);
2133                         
2134                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
2135                         lbdb->UnregisterObj(ldHandle);
2136                         
2137                         locMgr->setDuringMigration(false);
2138                         
2139                         verifyAckedRequests++;
2140
2141         }
2142         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
2143         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
2144 };
2145
2146
2147 void _verifyAckHandler(VerifyAckMsg *verifyReply){
2148         int index =     verifyReply->index;
2149         migratedNoticeList[index] = verifyReply->migRecord;
2150         verifyAckCount++;
2151         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
2152         if(verifyAckCount == verifyAckTotal){
2153                 sendCheckpointData(MLOG_CRASHED);
2154         }
2155 }
2156
2157
2158
2159 void sendCheckpointData(int mode){      
2160         RestartRequest *restartMsg = storedRequest;
2161         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
2162         int numMigratedAwayElements = migratedNoticeList.size();
2163         if(migratedNoticeList.size() != 0){
2164                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
2165 //                      CkAssert(migratedNoticeList.size() == 0);
2166         }
2167         
2168         
2169         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2170         
2171         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2172         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2173         
2174         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
2175         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
2176         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2177         
2178         char *msg = (char *)CmiAlloc(totalSize);
2179         
2180         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2181         dataMsg->PE = CkMyPe();
2182         dataMsg->restartWallTime = CmiTimer();
2183         dataMsg->checkPointSize = storedChkpt->bufSize;
2184         
2185         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2186 //      dataMsg->numMigratedAwayElements = 0;
2187         
2188         dataMsg->numMigratedInElements = 0;
2189         dataMsg->migratedElementSize = 0;
2190         dataMsg->lbGroupID = globalLBID;
2191         /*msg layout 
2192                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2193                 Local MessageLog|
2194         */
2195         //store checkpoint data
2196         char *buf = &msg[sizeof(RestartProcessorData)];
2197
2198         if(dataMsg->numMigratedAwayElements != 0){
2199                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2200                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2201         }
2202         
2203
2204 #ifdef CHECKPOINT_DISK
2205         readCheckpointFromDisk(storedChkpt->bufSize,buf);
2206 #else   
2207         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2208 #endif
2209         buf = &buf[storedChkpt->bufSize];
2210
2211
2212         //store localmessage Log
2213         dataMsg->numLocalMessages = localMsgQ->length();
2214         for(int i=0;i<localMsgQ->length();i++){
2215                 if(!fault_aware(((*localMsgQ)[i]).recver )){
2216                         CmiAbort("Non fault aware localMsgQ");
2217                 }
2218                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
2219                 buf = &buf[sizeof(LocalMessageLog)];
2220         }
2221         
2222         if(mode == MLOG_RESTARTED){
2223                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2224                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2225                 CmiFree(restartMsg);
2226         }else{
2227                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2228                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2229                 CmiFree(restartMsg);
2230         }
2231 };
2232
2233
2234 // this list is used to create a vector of the object ids of all
2235 //the chares on this processor currently and the highest TN processed by them 
2236 //the first argument is actually a CkVec<TProcessedLog> *
2237 void createObjIDList(void *data,ChareMlogData *mlogData){
2238         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2239         TProcessedLog entry;
2240         entry.recver = mlogData->objID;
2241         entry.tProcessed = mlogData->tProcessed;
2242         list->push_back(entry);
2243         char objString[100];
2244         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2245 }
2246
2247
2248 /**
2249  * Receives the checkpoint data from its buddy, restores the state of all the objects
2250  * and asks everyone else to update its home.
2251  */
2252 void _recvCheckpointHandler(char *_restartData){
2253         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2254         MigrationRecord *migratedAwayElements;
2255
2256         globalLBID = restartData->lbGroupID;
2257         
2258         restartData->restartWallTime *= 1000;
2259         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2260         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2261         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2262
2263         
2264         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2265         char *buf = &_restartData[sizeof(RestartProcessorData)];
2266         
2267         if(restartData->numMigratedAwayElements != 0){
2268                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2269                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2270                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2271                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2272         }
2273         
2274         PUP::fromMem pBuf(buf);
2275
2276         pBuf | checkpointCount;
2277
2278         CkPupROData(pBuf);
2279         CkPupGroupData(pBuf);
2280         CkPupNodeGroupData(pBuf);
2281 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
2282         pupArrayElementsSkip(pBuf,NULL);
2283         CkAssert(pBuf.size() == restartData->checkPointSize);
2284         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2285         
2286         forAllCharesDo(initializeRestart,NULL);
2287         
2288         //store the restored local message log in a vector
2289         buf = &buf[restartData->checkPointSize];        
2290         for(int i=0;i<restartData->numLocalMessages;i++){
2291                 LocalMessageLog logEntry;
2292                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2293                 
2294                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2295                 if(recverObj!=NULL){
2296                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2297                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2298                         char senderString[100];
2299                         char recverString[100];
2300                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2301                 }else{
2302 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2303                 }
2304                 buf = &buf[sizeof(LocalMessageLog)];
2305         }
2306
2307         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2308
2309         CmiFree(_restartData);
2310         
2311         
2312         _initDone();
2313
2314         getGlobalStep(globalLBID);
2315         
2316         countUpdateHomeAcks = 0;
2317         RestartRequest updateHomeRequest;
2318         updateHomeRequest.PE = CmiMyPe();
2319         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2320         for(int i=0;i<CmiNumPes();i++){
2321                 if(i != CmiMyPe()){
2322                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2323                 }
2324         }
2325
2326 }
2327
2328 /**
2329  * Receives the updateHome ACKs from all other processors. Once everybody
2330  * has replied, it sends a request to resend the logged messages.
2331  */
2332 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2333         countUpdateHomeAcks++;
2334         CmiFree(updateHomeAck);
2335         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2336         if(countUpdateHomeAcks != CmiNumPes()){
2337                 return;
2338         }
2339
2340         // Send out the request to resend logged messages to all other processors
2341         CkVec<TProcessedLog> objectVec;
2342         forAllCharesDo(createObjIDList, (void *)&objectVec);
2343         int numberObjects = objectVec.size();
2344         
2345         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2346         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2347         char *resendMsg = (char *)CmiAlloc(totalSize);  
2348
2349         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2350         resendReq->PE =CkMyPe(); 
2351         resendReq->numberObjects = numberObjects;
2352         char *objList = &resendMsg[sizeof(ResendRequest)];
2353         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2354
2355         /* test for parallel restart migrate away object**/
2356         if(parallelRestart){
2357                 distributeRestartedObjects();
2358                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2359         }
2360         
2361         /*      To make restart work for load balancing.. should only
2362         be used when checkpoint happens along with load balancing
2363         **/
2364 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2365
2366         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2367         CpvAccess(_currentObj) = lb;
2368         lb->ReceiveDummyMigration(restartDecisionNumber);
2369
2370         sleep(10);
2371         
2372         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2373         for(int i=0;i<CkNumPes();i++){
2374                 if(i != CkMyPe()){
2375                         CmiSyncSend(i,totalSize,resendMsg);
2376                 }       
2377         }
2378         _resendMessagesHandler(resendMsg);
2379         CmiFree(resendMsg);
2380 };
2381
2382 /**
2383  * @brief Initializes variables and flags for restarting procedure.
2384  */
2385 void initializeRestart(void *data, ChareMlogData *mlogData){
2386         mlogData->resendReplyRecvd = 0;
2387         mlogData->receivedTNs = new CkVec<MCount>;
2388         mlogData->restartFlag = 1;
2389         mlogData->restoredLocalMsgLog.removeAll();
2390         mlogData->mapTable.empty();
2391 };
2392
2393 /**
2394  * Updates the homePe of chare array elements.
2395  */
2396 void updateHomePE(void *data,ChareMlogData *mlogData){
2397         RestartRequest *updateRequest = (RestartRequest *)data;
2398         int PE = updateRequest->PE; //restarted PE
2399         //if this object is an array Element and its home is the restarted processor
2400         // the home processor needs to know its current location
2401         if(mlogData->objID.type == TypeArray){
2402                 //it is an array element
2403                 CkGroupID myGID = mlogData->objID.data.array.id;
2404                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2405                 CkArrayID aid(mlogData->objID.data.array.id);           
2406                 //check if the restarted processor is the home processor for this object
2407                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2408                 if(locMgr->homePe(myIdx) == PE){
2409                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2410                         DEBUGRESTART(myIdx.print());
2411                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2412                 }
2413         }
2414 };
2415
2416
2417 /**
2418  * Updates the homePe for all chares in this processor.
2419  */
2420 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2421         int sender = updateRequest->PE;
2422         
2423         forAllCharesDo(updateHomePE,updateRequest);
2424         
2425         updateRequest->PE = CmiMyPe();
2426         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2427         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2428         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2429                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2430                 checkpointCount--;
2431                 startMlogCheckpoint(NULL,0);
2432         }
2433         if(sender == getCheckPointPE()){
2434                 for(int i=0;i<retainedObjectList.size();i++){
2435                         if(retainedObjectList[i]->acked == 0){
2436                                 MigrationNotice migMsg;
2437                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2438                                 migMsg.record = retainedObjectList[i];
2439                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2440                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2441                         }
2442                 }
2443         }
2444 }
2445
2446 /**
2447  * @brief Fills up the ticket vector for each chare.
2448  */
2449 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2450         ResendData *resendData = (ResendData *)data;
2451         int PE = resendData->PE; //restarted PE
2452         int count=0;
2453         CkHashtableIterator *iterator;
2454         void *objp;
2455         void *objkey;
2456         CkObjID *objID;
2457         SNToTicket *snToTicket;
2458         Ticket ticket;
2459         
2460         // traversing the team table looking up for the maximum TN received     
2461         iterator = mlogData->teamTable.iterator();
2462         while( (objp = iterator->next(&objkey)) != NULL ){
2463                 objID = (CkObjID *)objkey;
2464         
2465                 // traversing the resendData structure to add ticket numbers
2466                 for(int j=0;j<resendData->numberObjects;j++){
2467                         if((*objID) == (resendData->listObjects)[j].recver){
2468 char name[100];
2469                                 snToTicket = *(SNToTicket **)objp;
2470 CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2471                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2472 CkPrintf("[%d] ---> Zero\n",CkMyPe());
2473                                         ticket = snToTicket->get(snIndex);      
2474                                         if(ticket.TN > resendData->maxTickets[j]){
2475 CkPrintf("[%d] ---> One\n",CkMyPe());
2476                                                 resendData->maxTickets[j] = ticket.TN;
2477                                         }
2478                                         if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2479 CkPrintf("[%d] ---> Two\n",CkMyPe());
2480                                                 //store the TNs that have been since the recver last checkpointed
2481                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2482                                         }
2483                                 }
2484                         }
2485                 }
2486         }
2487
2488         //releasing the memory for the iterator
2489         delete iterator;
2490 }
2491
2492 //the data argument is of type ResendData which contains the 
2493 //array of objects on  the restartedProcessor
2494 //this method resends the messages stored in this chare's message log 
2495 //to the restarted processor. It also accumulates the maximum TN
2496 //for all the objects on the restarted processor
2497 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2498         char nameString[100];
2499         ResendData *resendData = (ResendData *)data;
2500         int PE = resendData->PE; //restarted PE
2501         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2502         int count=0;
2503         int ticketRequests=0;
2504         CkQ<MlogEntry *> *log = mlogData->getMlog();
2505         
2506         for(int i=0;i<log->length();i++){
2507                 MlogEntry *logEntry = (*log)[i];
2508                 
2509                 // if we sent out the logs of a local message to buddy and he crashed
2510                 //before acking
2511                 envelope *env = logEntry->env;
2512                 if(env == NULL){
2513                         continue;
2514                 }
2515                 if(logEntry->unackedLocal){
2516                         char recverString[100];
2517                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2518                         sendLocalMessageCopy(logEntry);
2519                 }
2520                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2521                 if(env->TN <= 0){
2522                         //ticket not yet replied send it out again
2523                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2524                 }
2525                 
2526                 if(env->recver.type != TypeInvalid){
2527                         int flag = 0;//marks if any of the restarted objects matched this log entry
2528                         for(int j=0;j<resendData->numberObjects;j++){
2529                                 if(env->recver == (resendData->listObjects)[j].recver){
2530                                         flag = 1;
2531                                         //message has a valid TN
2532                                         if(env->TN > 0){
2533                                                 //store maxTicket
2534                                                 if(env->TN > resendData->maxTickets[j]){
2535                                                         resendData->maxTickets[j] = env->TN;
2536                                                 }
2537                                                 //if the TN for this entry is more than the TN processed, send the message out
2538                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2539                                                         //store the TNs that have been since the recver last checkpointed
2540                                                         resendData->ticketVecs[j].push_back(env->TN);
2541                                                         
2542                                                         if(PE != CkMyPe()){
2543                                                                 if(env->recver.type == TypeNodeGroup){
2544                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2545                                                                 }else{
2546                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2547                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2548                                                                 }
2549                                                         }else{
2550                                                                 envelope *copyEnv = copyEnvelope(env);
2551                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2552                                                         }
2553                                                         char senderString[100];
2554                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2555                                                         count++;
2556                                                 }       
2557                                         }else{
2558 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2559                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2560                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2561                                                 CkAssert(logEntry->destPE != CkMyPe());
2562                                                 
2563                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2564                                                 
2565                                                 ticketRequests++;*/
2566                                         }
2567                                 }
2568                         }//end of for loop of objects
2569                         
2570                 }       
2571         }
2572         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2573 }
2574
2575 /**
2576  * Resends the messages since the last checkpoint to the list of objects included in the 
2577  * request.
2578  */
2579 void _resendMessagesHandler(char *msg){
2580         ResendData d;
2581         ResendRequest *resendReq = (ResendRequest *)msg;
2582
2583         // building the reply message
2584         char *listObjects = &msg[sizeof(ResendRequest)];
2585         d.numberObjects = resendReq->numberObjects;
2586         d.PE = resendReq->PE;
2587         d.listObjects = (TProcessedLog *)listObjects;
2588         d.maxTickets = new MCount[d.numberObjects];
2589         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2590         for(int i=0;i<d.numberObjects;i++){
2591                 d.maxTickets[i] = 0;
2592         }
2593
2594         //Check if any of the retained objects need to be recreated
2595         //If they have not been recreated on the restarted processor
2596         //they need to be recreated on this processor
2597         int count=0;
2598         for(int i=0;i<retainedObjectList.size();i++){
2599                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2600                         count++;
2601                         int recreate=1;
2602                         for(int j=0;j<d.numberObjects;j++){
2603                                 if(d.listObjects[j].recver.type != TypeArray ){
2604                                         continue;
2605                                 }
2606                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2607                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2608                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2609                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2610                                                 recreate = 0;
2611                                                 break;
2612                                         }
2613                                 }
2614                         }
2615                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2616                         if(recreate){
2617                                 donotCountMigration=1;
2618                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2619                                 donotCountMigration=0;
2620                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2621                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2622                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2623                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2624                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2625                                 CmiAssert(rec->type() == CkLocRec::local);
2626                                 CkVec<CkMigratable *> eltList;
2627                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2628                                 for(int j=0;j<eltList.size();j++){
2629                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2630                                                 CpvAccess(_currentObj) = eltList[j];
2631                                                 eltList[j]->ResumeFromSync();
2632                                         }
2633                                 }
2634                                 retainedObjectList[i]->msg=NULL;        
2635                         }
2636                 }
2637         }
2638         
2639         if(count > 0){
2640 //              CmiAbort("retainedObjectList for restarted processor not empty");
2641         }
2642         
2643         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2644
2645
2646         //TML: examines the origin processor to determine if it belongs to the same group.
2647         // In that case, it only returns the maximum ticket received for each object in the list.
2648         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2649                 forAllCharesDo(fillTicketForChare,&d);
2650         else
2651                 forAllCharesDo(resendMessageForChare,&d);
2652
2653         //send back the maximum ticket number for a message sent to each object on the 
2654         //restarted processor
2655         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2656         
2657         int totalTNStored=0;
2658         for(int i=0;i<d.numberObjects;i++){
2659                 totalTNStored += d.ticketVecs[i].size();
2660         }
2661         
2662         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2663         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2664         
2665         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2666         resendReply->PE = CkMyPe();
2667         resendReply->numberObjects = d.numberObjects;
2668         
2669         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2670         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2671         for(int i=0;i<d.numberObjects;i++){
2672                 replyObjects[i] = d.listObjects[i].recver;
2673         }
2674         
2675         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2676         for(int i=0;i<d.numberObjects;i++){
2677                 int vecsize = d.ticketVecs[i].size();
2678                 memcpy(ticketList,&vecsize,sizeof(int));
2679                 ticketList = &ticketList[sizeof(int)];
2680                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2681                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2682         }       
2683
2684         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2685         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2686         
2687 /*      
2688         if(verifyAckRequestsUnacked){
2689                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2690                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2691                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2692                         LDObjHandle h;
2693                         lb->Migrated(h,1);
2694                 }
2695         }
2696         
2697         verifyAckRequestsUnacked=0;*/
2698         
2699         delete [] d.maxTickets;
2700         delete [] d.ticketVecs;
2701         if(resendReq->PE != CkMyPe()){
2702                 CmiFree(msg);
2703         }       
2704 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2705         lastRestart = CmiWallTimer();
2706 }
2707
2708 void sortVec(CkVec<MCount> *TNvec);
2709 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2710
2711 /**
2712  * @brief Receives the tickets assigned to message to other objects.
2713  */
2714 void _resendReplyHandler(char *msg){    
2715         /**
2716                 need to rewrite this method to deal with parallel restart
2717         */
2718         ResendRequest *resendReply = (ResendRequest *)msg;
2719         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2720
2721         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2722         
2723         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2724         for(int i =0; i< resendReply->numberObjects;i++){       
2725                 Chare *obj = (Chare *)listObjects[i].getObject();
2726                 
2727                 int vecsize;
2728                 memcpy(&vecsize,listTickets,sizeof(int));
2729                 listTickets = &listTickets[sizeof(int)];
2730                 MCount *listTNs = (MCount *)listTickets;        
2731                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2732                 
2733                 if(obj != NULL){
2734                         //the object was restarted on the processor on which it existed
2735                         processReceivedTN(obj,vecsize,listTNs);
2736                 }else{
2737                 //pack up objID vecsize and listTNs and send it to the correct processor
2738                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2739                         char *TNMsg = (char *)CmiAlloc(totalSize);
2740                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2741                         receivedTNData->recver = listObjects[i];
2742                         receivedTNData->numTNs = vecsize;
2743                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2744                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2745
2746                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2747                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2748                 }
2749                 
2750         }
2751 };
2752
2753 void _receivedTNDataHandler(ReceivedTNData *msg){
2754         char objName[100];
2755         Chare *obj = (Chare *) msg->recver.getObject();
2756         if(obj){                
2757                 char *_msg = (char *)msg;
2758                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2759                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2760                 processReceivedTN(obj,msg->numTNs,listTNs);
2761         }else{
2762                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2763                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2764         }
2765 };
2766
2767 /**
2768  * @brief Processes the received list of tickets from a particular PE.
2769  */
2770 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2771         // increases the number of resendReply received
2772         obj->mlogData->resendReplyRecvd++;
2773         
2774         // includes the tickets into the receivedTN structure
2775         for(int j=0;j<listSize;j++){
2776                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2777         }
2778         
2779         //if this object has received all the replies find the ticket numbers
2780         //that senders know about. Those less than the ticket number processed 
2781         //by the receiver can be thrown away. The rest need not be consecutive
2782         // ie there can be holes in the list of ticket numbers seen by senders
2783         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2784                 obj->mlogData->resendReplyRecvd = 0;
2785                 //sort the received TNS
2786                 sortVec(obj->mlogData->receivedTNs);
2787         
2788                 //after all the received tickets are in we need to sort them and then 
2789                 // calculate the holes  
2790                 if(obj->mlogData->receivedTNs->size() > 0){
2791                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2792                         int vecsize = obj->mlogData->receivedTNs->size();
2793                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2794                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2795                         if(numberHoles == 0){
2796                         }else{
2797                                 char objName[100];                                      
2798                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2799                                 obj->mlogData->numberHoles = numberHoles;
2800                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2801                                 int countHoles=0;
2802                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2803                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2804                                                 //the TNs are not consecutive at this point
2805                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2806                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2807                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2808                                                         countHoles++;
2809                                                 }       
2810                                         }
2811                                 }
2812                                 //Holes have been given new TN
2813                                 if(countHoles != numberHoles){
2814                                         char str[100];
2815                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2816                                 }
2817                                 CkAssert(countHoles == numberHoles);                                    
2818                                 obj->mlogData->currentHoles = numberHoles;
2819                         }
2820                 }
2821         
2822                 // cleaning up structures and getting ready to continue execution       
2823                 delete obj->mlogData->receivedTNs;
2824                 obj->mlogData->receivedTNs = NULL;
2825                 obj->mlogData->restartFlag = 0;
2826
2827                 DEBUGRESTART(char objString[100]);
2828                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2829         }
2830
2831 }
2832
2833
2834 void sortVec(CkVec<MCount> *TNvec){
2835         //sort it ->its bloddy bubble sort
2836         //TODO: use quicksort
2837         for(int i=0;i<TNvec->size();i++){
2838                 for(int j=i+1;j<TNvec->size();j++){
2839                         if((*TNvec)[j] < (*TNvec)[i]){
2840                                 MCount temp;
2841                                 temp = (*TNvec)[i];
2842                                 (*TNvec)[i] = (*TNvec)[j];
2843                                 (*TNvec)[j] = temp;
2844                         }
2845                 }
2846         }
2847         //make it unique .. since its sorted all equal units will be consecutive
2848         MCount *tempArray = new MCount[TNvec->size()];
2849         int     uniqueCount=-1;
2850         for(int i=0;i<TNvec->size();i++){
2851                 tempArray[i] = 0;
2852                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2853                         uniqueCount++;
2854                         tempArray[uniqueCount] = (*TNvec)[i];
2855                 }
2856         }
2857         uniqueCount++;
2858         TNvec->removeAll();
2859         for(int i=0;i<uniqueCount;i++){
2860                 TNvec->push_back(tempArray[i]);
2861         }
2862         delete [] tempArray;
2863 }       
2864
2865 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2866         if(TNVec->size() == 0){
2867                 return -1; //not found in an empty vec
2868         }
2869         //binary search to find 
2870         int left=0;
2871         int right = TNVec->size();
2872         int mid = (left +right)/2;
2873         while(searchTN != (*TNVec)[mid] && left < right){
2874                 if((*TNVec)[mid] > searchTN){
2875                         right = mid-1;
2876                 }else{
2877                         left = mid+1;
2878                 }
2879                 mid = (left + right)/2;
2880         }
2881         if(left < right){
2882                 //mid is the element to be returned
2883                 return mid;
2884         }else{
2885                 if(mid < TNVec->size() && mid >=0){
2886                         if((*TNVec)[mid] == searchTN){
2887                                 return mid;
2888                         }else{
2889                                 return -1;
2890                         }
2891                 }else{
2892                         return -1;
2893                 }
2894         }
2895 };
2896
2897
2898 /*
2899         Method to do parallel restart. Distribute some of the array elements to other processors.
2900         The problem is that we cant use to charm entry methods to do migration as it will get
2901         stuck in the protocol that is going to restart
2902 */
2903
2904 class ElementDistributor: public CkLocIterator{
2905         CkLocMgr *locMgr;
2906         int *targetPE;
2907         void pupLocation(CkLocation &loc,PUP::er &p){
2908                 CkArrayIndexMax idx=loc.getIndex();
2909                 CkGroupID gID = locMgr->ckGetGroupID();
2910                 p|gID;      // store loc mgr's GID as well for easier restore
2911                 p|idx;
2912                 p|loc;
2913         };
2914         public:
2915                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2916                 void addLocation(CkLocation &loc){
2917                         if(*targetPE == CkMyPe()){
2918                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2919                                 return;
2920                         }
2921                         
2922                         CkArrayIndexMax idx=loc.getIndex();
2923                         CkLocRec_local *rec = loc.getLocalRecord();
2924                         
2925                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2926                         idx.print();
2927                         
2928
2929                         //TODO: an element that is being moved should leave some trace behind so that
2930                         // the arraybroadcaster can forward messages to it
2931                         
2932                         //pack up this location and send it across
2933                         PUP::sizer psizer;
2934                         pupLocation(loc,psizer);
2935                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2936                         char *msg = (char *)CmiAlloc(totalSize);
2937                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2938                         PUP::toMem pmem(buf);
2939                         pmem.becomeDeleting();
2940                         pupLocation(loc,pmem);
2941                         
2942                         locMgr->setDuringMigration(CmiTrue);                    
2943                         delete rec;
2944                         locMgr->setDuringMigration(CmiFalse);                   
2945                         locMgr->inform(idx,*targetPE);
2946
2947                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2948                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2949
2950                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2951                         //decide on the target processor for the next object
2952                         *targetPE = (*targetPE +1)%CkNumPes();
2953                 }
2954                 
2955 };
2956
2957 void distributeRestartedObjects(){
2958         int numGroups = CkpvAccess(_groupIDTable)->size();      
2959         int i;
2960         int targetPE=CkMyPe();
2961         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2962 };
2963
2964 void _distributedLocationHandler(char *receivedMsg){
2965         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2966         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2967         PUP::fromMem pmem(buf);
2968         CkGroupID gID;
2969         CkArrayIndexMax idx;
2970         pmem |gID;
2971         pmem |idx;
2972         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2973         donotCountMigration=1;
2974         mgr->resume(idx,pmem);
2975         donotCountMigration=0;
2976         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2977         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2978         idx.print();
2979
2980         CkLocRec *rec = mgr->elementRec(idx);
2981         CmiAssert(rec->type() == CkLocRec::local);
2982         
2983         CkVec<CkMigratable *> eltList;
2984         mgr->migratableList((CkLocRec_local *)rec,eltList);
2985         for(int i=0;i<eltList.size();i++){
2986                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2987                         CpvAccess(_currentObj) = eltList[i];
2988                         eltList[i]->ResumeFromSync();
2989                 }
2990         }
2991         
2992         
2993 }
2994
2995
2996 /** this method is used to send messages to a restarted processor to tell
2997  * it that a particular expected object is not going to get to it */
2998 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2999         DummyMigrationMsg buf;
3000         buf.flag = MLOG_OBJECT;
3001         buf.lbID = lbID;
3002         buf.mgrID = locMgrID;
3003         buf.idx = idx;
3004         buf.locationPE = locationPE;
3005         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3006         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3007 };
3008
3009
3010 /**this method is used by a restarted processor to tell other processors
3011  * that they are not going to receive these many objects.. just the count
3012  * not the objects themselves ***/
3013
3014 void sendDummyMigrationCounts(int *dummyCounts){
3015         DummyMigrationMsg buf;
3016         buf.flag = MLOG_COUNT;
3017         buf.lbID = globalLBID;
3018         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3019         for(int i=0;i<CmiNumPes();i++){
3020                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3021                         buf.count = dummyCounts[i];
3022                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3023                 }
3024         }
3025 }
3026
3027
3028 /** this handler is used to process a dummy migration msg.
3029  * it looks up the load balancer and calls migrated for it */
3030
3031 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3032         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3033         if(msg->flag == MLOG_OBJECT){
3034                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3035                 LDObjHandle h;
3036                 lb->Migrated(h,1);
3037         }
3038         if(msg->flag == MLOG_COUNT){
3039                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3040                 msg->count -= verifyAckedRequests;
3041                 for(int i=0;i<msg->count;i++){
3042                         LDObjHandle h;
3043                         lb->Migrated(h,1);
3044                 }
3045         }
3046         verifyAckedRequests=0;
3047         CmiFree(msg);
3048 };
3049
3050
3051
3052
3053
3054
3055
3056 /*****************************************************
3057         Implementation of a method that can be used to call
3058         any method on the ChareMlogData of all the chares on
3059         a processor currently
3060 ******************************************************/
3061
3062
3063 class ElementCaller :  public CkLocIterator {
3064 private:
3065         CkLocMgr *locMgr;
3066         MlogFn fnPointer;
3067         void *data;
3068 public:
3069         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3070                 locMgr = _locMgr;
3071                 fnPointer = _fnPointer;
3072                 data = _data;
3073         };
3074         void addLocation(CkLocation &loc){
3075                 CkVec<CkMigratable *> list;
3076                 CkLocRec_local *local = loc.getLocalRecord();
3077                 locMgr->migratableList (local,list);
3078                 for(int i=0;i<list.size();i++){
3079                         CkMigratable *migratableElement = list[i];
3080                         fnPointer(data,migratableElement->mlogData);
3081                 }
3082         }
3083 };
3084
3085 void forAllCharesDo(MlogFn fnPointer,void *data){
3086         int numGroups = CkpvAccess(_groupIDTable)->size();
3087         for(int i=0;i<numGroups;i++){
3088                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3089                 fnPointer(data,obj->mlogData);
3090         }
3091         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3092         for(int i=0;i<numNodeGroups;i++){
3093                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3094                 fnPointer(data,obj->mlogData);
3095         }
3096         int i;
3097         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3098         
3099         
3100 };
3101
3102
3103 /******************************************************************
3104  Load Balancing
3105 ******************************************************************/
3106
3107 void initMlogLBStep(CkGroupID gid){
3108         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3109         countLBMigratedAway = 0;
3110         countLBToMigrate=0;
3111         onGoingLoadBalancing=1;
3112         migrationDoneCalled=0;
3113         checkpointBarrierCount=0;
3114         if(globalLBID.idx != 0){
3115                 CmiAssert(globalLBID.idx == gid.idx);
3116         }
3117         globalLBID = gid;
3118 }
3119
3120 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3121         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3122         
3123         resumeLbFnPtr = _fnPtr;
3124         centralLb = _centralLb;
3125         migrationDoneCalled = 1;
3126         if(countLBToMigrate == countLBMigratedAway){
3127                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3128                 startMlogCheckpoint(NULL,CmiWallTimer());       
3129         }
3130 };
3131
3132 void finishedCheckpointLoadBalancing(){
3133         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3134         CheckpointBarrierMsg msg;
3135         msg.fromPE = CmiMyPe();
3136         msg.checkpointCount = checkpointCount;
3137
3138         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3139         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3140         
3141 };
3142
3143
3144 void sendMlogLocation(int targetPE,envelope *env){
3145         void *_msg = EnvToUsr(env);
3146         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3147
3148
3149         int existing = 0;
3150         //if this object is already in the retainedobjectlust destined for this
3151         //processor it should not be sent
3152         
3153         for(int i=0;i<retainedObjectList.size();i++){
3154                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3155                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3156                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3157                         existing = 1;
3158                         break;
3159                 }
3160         }
3161
3162         if(existing){
3163                 return;
3164         }
3165         
3166         
3167         countLBToMigrate++;
3168         
3169         MigrationNotice migMsg;
3170         migMsg.migRecord.gID = msg->gid;
3171         migMsg.migRecord.idx = msg->idx;
3172         migMsg.migRecord.fromPE = CkMyPe();
3173         migMsg.migRecord.toPE =  targetPE;
3174         
3175         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3176         
3177         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3178         retainedObject->migRecord = migMsg.migRecord;
3179         retainedObject->acked  = 0;
3180         
3181         CkPackMessage(&env);
3182         
3183         migMsg.record = retainedObject;
3184         retainedObject->msg = env;
3185         int size = retainedObject->size = env->getTotalsize();
3186         
3187         retainedObjectList.push_back(retainedObject);
3188         
3189         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3190         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3191         
3192         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3193
3194 }
3195
3196 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3197         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3198         migratedNoticeList.push_back(msg->migRecord);
3199
3200         MigrationNoticeAck buf;
3201         buf.record = msg->record;
3202         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3203         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3204 }
3205
3206 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3207         
3208         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3209         retainedObject->acked = 1;
3210
3211         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3212         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3213
3214         //inform home about the new location of this object
3215         CkGroupID gID = retainedObject->migRecord.gID ;
3216         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3217         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3218         
3219         countLBMigratedAway++;
3220         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3221                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3222                 startMlogCheckpoint(NULL,CmiWallTimer());
3223         }
3224 };
3225
3226 void _receiveMlogLocationHandler(void *buf){
3227         envelope *env = (envelope *)buf;
3228         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3229         CkUnpackMessage(&env);
3230         void *_msg = EnvToUsr(env);
3231         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3232         CkGroupID gID= msg->gid;
3233         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3234         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3235         CpvAccess(_currentObj)=mgr;
3236         mgr->immigrate(msg);
3237 };
3238
3239
3240 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3241 /*      if(mlogData->objID.type == TypeArray){
3242                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3243         //      TODO: make sure later that atSync has been called and it needs 
3244         //      to be resumed from sync
3245         //
3246                 CpvAccess(_currentObj) = elt;
3247                 elt->ResumeFromSync();
3248         }*/
3249 }
3250
3251 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3252         if(checkpointBarrierCount == CmiNumPes()){
3253                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3254                 for(int i=0;i<CmiNumPes();i++){
3255                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3256                 }
3257         }
3258 }
3259
3260 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3261         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3262         if(msg->checkpointCount == checkpointCount){
3263                 checkpointBarrierCount++;
3264                 checkAndSendCheckpointBarrierAcks(msg);
3265         }else{
3266                 if(msg->checkpointCount-1 == checkpointCount){
3267                         checkpointBarrierCount++;
3268                         checkAndSendCheckpointBarrierAcks(msg);
3269                 }else{
3270                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3271                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3272                 }
3273         }
3274         CmiFree(msg);
3275 }
3276
3277 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3278         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3279         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3280         sendRemoveLogRequests();
3281         (*resumeLbFnPtr)(centralLb);
3282         CmiFree(msg);
3283 }
3284
3285 /**
3286         method that informs an array elements home processor of its current location
3287         It is a converse method to bypass the charm++ message logging framework
3288 */
3289
3290 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
3291         double _startTime = CmiWallTimer();
3292         CurrentLocationMsg msg;
3293         msg.mgrID = locMgrID;
3294         msg.idx = idx;
3295         msg.locationPE = currentPE;
3296         msg.fromPE = CkMyPe();
3297
3298         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3299         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3300         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3301         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3302 }
3303
3304
3305 void _receiveLocationHandler(CurrentLocationMsg *data){
3306         double _startTime = CmiWallTimer();
3307         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3308         if(mgr == NULL){
3309                 CmiFree(data);
3310                 return;
3311         }
3312         CkLocRec *rec = mgr->elementNrec(data->idx);
3313         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3314         if(rec != NULL){
3315                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3316                         if(data->fromPE == data->locationPE){
3317                                 CmiAbort("Another processor has the same object");
3318                         }
3319                 }
3320         }
3321         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3322                 int targetPE = data->fromPE;
3323                 data->fromPE = CmiMyPe();
3324                 data->locationPE = CmiMyPe();
3325                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3326                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3327         }else{
3328                 mgr->inform(data->idx,data->locationPE);
3329         }
3330         CmiFree(data);
3331         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3332 }
3333
3334
3335
3336 void getGlobalStep(CkGroupID gID){
3337         LBStepMsg msg;
3338         int destPE = 0;
3339         msg.lbID = gID;
3340         msg.fromPE = CmiMyPe();
3341         msg.step = -1;
3342         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3343         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3344 };
3345
3346 void _getGlobalStepHandler(LBStepMsg *msg){
3347         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3348         msg->step = lb->step();
3349         CmiAssert(msg->fromPE != CmiMyPe());
3350         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3351         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3352         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3353 };
3354
3355 void _recvGlobalStepHandler(LBStepMsg *msg){
3356         
3357         restartDecisionNumber=msg->step;
3358         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3359         _updateHomeAckHandler(dummyAck);
3360 };
3361
3362 /**
3363  * @brief Function to wrap up performance information.
3364  */
3365 void _messageLoggingExit(){
3366 /*      if(CkMyPe() == 0){
3367                 if(countBuffered != 0){
3368                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3369                 }
3370
3371 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3372         }
3373         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3374         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3375
3376         //TML: printing some statistics for group approach
3377         //if(TEAM_SIZE_MLOG > 1)
3378                 CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
3379
3380 }
3381
3382 /**
3383         The method for returning the actual object pointed to by an id
3384         If the object doesnot exist on the processor it returns NULL
3385 **/
3386
3387 void* CkObjID::getObject(){
3388         
3389                 switch(type){
3390                         case TypeChare: 
3391                                 return CkLocalChare(&data.chare.id);
3392                         case TypeMainChare:
3393                                 return CkLocalChare(&data.chare.id);
3394                         case TypeGroup:
3395         
3396                                 CkAssert(data.group.onPE == CkMyPe());
3397                                 return CkLocalBranch(data.group.id);
3398                         case TypeNodeGroup:
3399                                 CkAssert(data.group.onPE == CkMyNode());
3400                                 //CkLocalNodeBranch(data.group.id);
3401                                 {
3402                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3403                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3404                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3405         
3406                                         return retval;
3407                                 }       
3408                         case TypeArray:
3409                                 {
3410         
3411         
3412                                         CkArrayID aid(data.array.id);
3413         
3414                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3415         
3416                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
3417         
3418                                         return aProxy.ckLocal();
3419                                 }
3420                         default:
3421                                 CkAssert(0);
3422                 }
3423 }
3424
3425
3426 int CkObjID::guessPE(){
3427                 switch(type){
3428                         case TypeChare:
3429                         case TypeMainChare:
3430                                 return data.chare.id.onPE;
3431                         case TypeGroup:
3432                         case TypeNodeGroup:
3433                                 return data.group.onPE;
3434                         case TypeArray:
3435                                 {
3436                                         CkArrayID aid(data.array.id);
3437                                         if(aid.ckLocalBranch() == NULL){
3438                                                 return -1;
3439                                         }
3440                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
3441                                 }
3442                         default:
3443                                 CkAssert(0);
3444                 }
3445 };
3446
3447 char *CkObjID::toString(char *buf) const {
3448         
3449         switch(type){
3450                 case TypeChare:
3451                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
3452                         break;
3453                 case TypeMainChare:
3454                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);       
3455                         break;
3456                 case TypeGroup:
3457                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
3458                         break;
3459                 case TypeNodeGroup:
3460                         sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
3461                         break;
3462                 case TypeArray:
3463                         {
3464                                 const CkArrayIndexMax &idx = data.array.idx.asMax();
3465                                 const int *indexData = idx.data();
3466                                 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
3467                                 break;
3468                         }
3469                 default:
3470                         CkAssert(0);
3471         }
3472         
3473         return buf;
3474 };
3475
3476 void CkObjID::updatePosition(int PE){
3477         if(guessPE() == PE){
3478                 return;
3479         }
3480         switch(type){
3481                 case TypeArray:
3482                         {
3483                                         CkArrayID aid(data.array.id);
3484                                         if(aid.ckLocalBranch() == NULL){
3485                                                 
3486                                         }else{
3487                                                 char str[100];
3488                                                 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
3489 //                                              CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
3490                                                 CkLocRec *rec = mgr->elementNrec(data.array.idx.asMax());
3491                                                 if(rec != NULL){
3492                                                         if(rec->type() == CkLocRec::local){
3493                                                                 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
3494                                                                 return;
3495                                                         }
3496                                                 }
3497                                                 mgr->inform(data.array.idx.asMax(),PE);
3498                                         }       
3499                                 }
3500
3501                         break;
3502                 case TypeChare:
3503                 case TypeMainChare:
3504                         CkAssert(data.chare.id.onPE == PE);
3505                         break;
3506                 case TypeGroup: