Added support for team-based message logging.
[charm.git] / src / ck-core / ckmessagelogging.C
1 /**
2  * Message Logging Fault Tolerance Protocol
3  * It includes the main functions for the basic and team-based schemes.
4  */
5
6 #include "charm.h"
7 #include "ck.h"
8 #include "ckmessagelogging.h"
9 #include "queueing.h"
10 #include <sys/types.h>
11 #include <signal.h>
12 #include "CentralLB.h"
13
14 #ifdef _FAULT_MLOG_
15
16 //#define DEBUG(x)  if(_restartFlag) {x;}
17 #define DEBUG(x)  //x
18 #define DEBUGRESTART(x)  //x
19 #define DEBUGLB(x) // x
20 #define DEBUG_TEAM(x)  //x
21
22 #define BUFFERED_LOCAL
23 #define BUFFERED_REMOTE 
24
25 extern const char *idx2str(const CkArrayIndex &ind);
26 extern const char *idx2str(const ArrayElement *el);
27 const char *idx2str(const CkArrayIndexMax &ind){
28         return idx2str((const CkArrayIndex &)ind);
29 };
30
31 void getGlobalStep(CkGroupID gID);
32
33 bool fault_aware(CkObjID &recver);
34 void sendCheckpointData(int mode);
35 void createObjIDList(void *data,ChareMlogData *mlogData);
36 inline bool isLocal(int destPE);
37 inline bool isTeamLocal(int destPE);
38
39 int _restartFlag=0;
40 //ERASE int restarted=0; // it's not being used anywhere
41
42 //TML: variables for measuring savings with groups in message logging
43 float MLOGFT_totalLogSize = 0.0;
44 float MLOGFT_totalMessages = 0.0;
45 float MLOGFT_totalObjects = 0.0;
46
47 //TODO: remove for perf runs
48 int countHashRefs=0; //count the number of gets
49 int countHashCollisions=0;
50
51 //#define CHECKPOINT_DISK
52 char *checkpointDirectory=".";
53 int unAckedCheckpoint=0;
54
55 int countLocal=0,countBuffered=0;
56 int countPiggy=0;
57 int countClearBufferedLocalCalls=0;
58
59 int countUpdateHomeAcks=0;
60
61 extern int chkptPeriod;
62 extern bool parallelRestart;
63
64 char *killFile;
65 int killFlag=0;
66 int restartingMlogFlag=0;
67 void readKillFile();
68 double killTime=0.0;
69 int checkpointCount=0;
70
71
72 CpvDeclare(Chare *,_currentObj);
73 CpvDeclare(CkQ<LocalMessageLog> *,_localMessageLog);
74 CpvDeclare(CkQ<TicketRequest *> *,_delayedTicketRequests);
75 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
76 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
77 CpvDeclare(Queue, _outOfOrderMessageQueue);
78 CpvDeclare(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
79 //CpvDeclare(CkQ<TicketRequest>**,_bufferedTicketRequests);
80 CpvDeclare(char **,_bufferedTicketRequests);
81 CpvDeclare(int *,_numBufferedTicketRequests);
82 CpvDeclare(char *,_bufferTicketReply);
83
84
85
86 static double adjustChkptPeriod=0.0; //in ms
87 static double nextCheckpointTime=0.0;//in seconds
88
89 double lastBufferedLocalMessageCopyTime;
90
91 int _maxBufferedMessages;
92 int _maxBufferedTicketRequests;
93 int BUFFER_TIME=2; // in ms
94
95
96 int _ticketRequestHandlerIdx;
97 int _ticketHandlerIdx;
98 int _localMessageCopyHandlerIdx;
99 int _localMessageAckHandlerIdx;
100 int _pingHandlerIdx;
101 int _bufferedLocalMessageCopyHandlerIdx;
102 int _bufferedLocalMessageAckHandlerIdx;
103 int _bufferedTicketRequestHandlerIdx;
104 int _bufferedTicketHandlerIdx;
105
106
107 char objString[100];
108 int _checkpointRequestHandlerIdx;
109 int _storeCheckpointHandlerIdx;
110 int _checkpointAckHandlerIdx;
111 int _getCheckpointHandlerIdx;
112 int _recvCheckpointHandlerIdx;
113 int _removeProcessedLogHandlerIdx;
114
115 int _verifyAckRequestHandlerIdx;
116 int _verifyAckHandlerIdx;
117 int _dummyMigrationHandlerIdx;
118
119
120 int     _getGlobalStepHandlerIdx;
121 int     _recvGlobalStepHandlerIdx;
122
123 int _updateHomeRequestHandlerIdx;
124 int _updateHomeAckHandlerIdx;
125 int _resendMessagesHandlerIdx;
126 int _resendReplyHandlerIdx;
127 int _receivedTNDataHandlerIdx;
128 int _distributedLocationHandlerIdx;
129
130 //TML: integer constants for group-based message logging
131 int _restartHandlerIdx;
132 int _getRestartCheckpointHandlerIdx;
133 int _recvRestartCheckpointHandlerIdx;
134
135 int verifyAckTotal;
136 int verifyAckCount;
137
138 int verifyAckedRequests=0;
139
140 RestartRequest *storedRequest;
141
142
143
144 int _falseRestart =0; /**
145                                                                                                         For testing on clusters we might carry out restarts on 
146                                                                                                         a porcessor without actually starting it
147                                                                                                         1 -> false restart
148                                                                                                         0 -> restart after an actual crash
149                                                                                                 */                                                                                                                              
150
151 //lock for the ticketRequestHandler and ticketLogLocalMessage methods;
152 int _lockNewTicket=0;
153
154
155 //Load balancing globals
156 int onGoingLoadBalancing=0;
157 void *centralLb;
158 void (*resumeLbFnPtr)(void *);
159 int _receiveMlogLocationHandlerIdx;
160 int _receiveMigrationNoticeHandlerIdx;
161 int _receiveMigrationNoticeAckHandlerIdx;
162 int _checkpointBarrierHandlerIdx;
163 int _checkpointBarrierAckHandlerIdx;
164
165 CkVec<MigrationRecord> migratedNoticeList;
166 CkVec<RetainedMigratedObject *> retainedObjectList;
167 int donotCountMigration=0;
168 int countLBMigratedAway=0;
169 int countLBToMigrate=0;
170 int migrationDoneCalled=0;
171 int checkpointBarrierCount=0;
172 int globalResumeCount=0;
173 CkGroupID globalLBID;
174 int restartDecisionNumber=-1;
175
176
177 double lastCompletedAlarm=0;
178 double lastRestart=0;
179
180
181 //update location globals
182 int _receiveLocationHandlerIdx;
183
184
185
186 // initialize message logging datastructures and register handlers
187 void _messageLoggingInit(){
188         //current object
189         CpvInitialize(Chare *,_currentObj);
190         
191         //registering handlers for message logging
192         _ticketRequestHandlerIdx = CkRegisterHandler((CmiHandler)_ticketRequestHandler);
193         _ticketHandlerIdx = CkRegisterHandler((CmiHandler)_ticketHandler);
194         _localMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageCopyHandler);
195         _localMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_localMessageAckHandler);
196         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
197         _bufferedLocalMessageCopyHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageCopyHandler);
198         _bufferedLocalMessageAckHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedLocalMessageAckHandler);
199   _bufferedTicketRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_bufferedTicketRequestHandler);
200         _bufferedTicketHandlerIdx = CkRegisterHandler((CmiHandler)_bufferedTicketHandler);
201
202                 
203         //handlers for checkpointing
204         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
205         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
206         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
207         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
208
209
210         //handlers for restart
211         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
212         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
213         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
214         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
215         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
216         _resendReplyHandlerIdx = CkRegisterHandler((CmiHandler)_resendReplyHandler);
217         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
218         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
219         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
220         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
221         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
222
223         //TML: handlers for group-based message logging
224         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
225         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
226         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
227
228         
229         //handlers for load balancing
230         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
231         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
232         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
233         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
234         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
235         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
236         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
237         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
238         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
239
240         
241         //handlers for updating locations
242         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
243         
244         //Cpv variables for message logging
245         CpvInitialize(CkQ<LocalMessageLog>*,_localMessageLog);
246         CpvAccess(_localMessageLog) = new CkQ<LocalMessageLog>(10000);
247         CpvInitialize(CkQ<TicketRequest *> *,_delayedTicketRequests);
248         CpvAccess(_delayedTicketRequests) = new CkQ<TicketRequest *>;
249         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalTicketRequests);
250         CpvAccess(_delayedLocalTicketRequests) = new CkQ<MlogEntry *>;
251         CpvInitialize(Queue, _outOfOrderMessageQueue);
252         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
253         CpvInitialize(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
254         CpvAccess(_bufferedLocalMessageLogs) = new CkQ<LocalMessageLog>;
255         
256         CpvInitialize(char **,_bufferedTicketRequests);
257         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
258         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
259         for(int i=0;i<CkNumPes();i++){
260                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
261                 CpvAccess(_numBufferedTicketRequests)[i]=0;
262         }
263   CpvInitialize(char *,_bufferTicketReply);
264         CpvAccess(_bufferTicketReply) = (char *)CmiAlloc(sizeof(BufferedTicketRequestHeader)+_maxBufferedTicketRequests*sizeof(TicketReply));
265         
266 //      CcdCallOnConditionKeep(CcdPERIODIC_100ms,retryTicketRequest,NULL);
267         CcdCallFnAfter(retryTicketRequest,NULL,100);    
268         
269         
270         //Cpv variables for checkpoint
271         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
272         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
273         
274 //      CcdCallOnConditionKeep(CcdPERIODIC_10s,startMlogCheckpoint,NULL);
275 //      printf("[%d] Checkpoint Period is %d s\n",CkMyPe(),chkptPeriod);
276 //      CcdCallFnAfter(startMlogCheckpoint,NULL,chkptPeriod);
277         if(CkMyPe() == 0){
278 //              CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod*1000);
279 #ifdef  BUFFERED_LOCAL
280                 if(CmiMyPe() == 0){
281                         printf("Local messages being buffered _maxBufferedMessages %d BUFFER_TIME %d ms \n",_maxBufferedMessages,BUFFER_TIME);
282                 }
283 #endif
284         }
285 #ifdef  BUFFERED_REMOTE
286         if(CmiMyPe() == 0){
287                 printf("[%d] Remote messages being buffered _maxBufferedTicketRequests %d BUFFER_TIME %d ms %p \n",CkMyPe(),_maxBufferedTicketRequests,BUFFER_TIME,CpvAccess(_bufferTicketReply));
288         }
289 #endif
290
291         traceRegisterUserEvent("Remove Logs", 20);
292         traceRegisterUserEvent("Ticket Request Handler", 21);
293         traceRegisterUserEvent("Ticket Handler", 22);
294         traceRegisterUserEvent("Local Message Copy Handler", 23);
295         traceRegisterUserEvent("Local Message Ack Handler", 24);        
296         traceRegisterUserEvent("Preprocess current message",25);
297         traceRegisterUserEvent("Preprocess past message",26);
298         traceRegisterUserEvent("Preprocess future message",27);
299         traceRegisterUserEvent("Checkpoint",28);
300         traceRegisterUserEvent("Checkpoint Store",29);
301         traceRegisterUserEvent("Checkpoint Ack",30);
302         
303         traceRegisterUserEvent("Send Ticket Request",31);
304         traceRegisterUserEvent("Generalticketrequest1",32);
305         traceRegisterUserEvent("TicketLogLocal",33);
306         traceRegisterUserEvent("next_ticket and SN",34);
307         traceRegisterUserEvent("Timeout for buffered remote messages",35);
308         traceRegisterUserEvent("Timeout for buffered local messages",36);
309         traceRegisterUserEvent("Inform Location Home",37);
310         traceRegisterUserEvent("Receive Location Handler",38);
311         
312         lastCompletedAlarm=CmiWallTimer();
313         lastRestart = CmiWallTimer();
314 //      CcdCallOnConditionKeep(CcdPROCESSOR_BEGIN_IDLE,checkBufferedLocalMessageCopy,NULL);
315         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
316 }
317
318 void killLocal(void *_dummy,double curWallTime);        
319
320 void readKillFile(){
321         FILE *fp=fopen(killFile,"r");
322         if(!fp){
323                 return;
324         }
325         int proc;
326         double sec;
327         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
328                 if(proc == CkMyPe()){
329                         killTime = CmiWallTimer()+sec;
330                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
331                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
332                 }
333         }
334         fclose(fp);
335 }
336
337 void killLocal(void *_dummy,double curWallTime){
338         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
339         if(CmiWallTimer()<killTime-1){
340                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
341         }else{  
342                 kill(getpid(),SIGKILL);
343         }
344 }
345
346
347
348 /************************ Message logging methods ****************/
349
350 // send a ticket request to a group on a processor
351 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx){
352         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
353                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
354                 void *origMsg = EnvToUsr(env);
355                 for(int i=0;i<CmiNumPes();i++){
356                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
357                                 void *copyMsg = CkCopyMsg(&origMsg);
358                                 envelope *copyEnv = UsrToEnv(copyMsg);
359                                 copyEnv->SN=0;
360                                 copyEnv->TN=0;
361                                 copyEnv->sender.type = TypeInvalid;
362                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
363                                 sendTicketGroupRequest(copyEnv,i,_infoIdx);
364                         }
365                 }
366                 return;
367         }
368         CkObjID recver;
369         recver.type = TypeGroup;
370         recver.data.group.id = env->getGroupNum();
371         recver.data.group.onPE = destPE;
372 /*      if(recver.data.group.id.idx == 11 && recver.data.group.onPE == 1){
373                 CmiPrintStackTrace(0);
374         }*/
375         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
376 }
377
378 //send a ticket request to a nodegroup
379 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx){
380         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
381                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
382                 void *origMsg = EnvToUsr(env);
383                 for(int i=0;i<CmiNumNodes();i++){
384                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
385                                 void *copyMsg = CkCopyMsg(&origMsg);
386                                 envelope *copyEnv = UsrToEnv(copyMsg);
387                                 copyEnv->SN=0;
388                                 copyEnv->TN=0;
389                                 copyEnv->sender.type = TypeInvalid;
390                                 sendTicketNodeGroupRequest(copyEnv,i,_infoIdx);
391                         }
392                 }
393                 return;
394         }
395         CkObjID recver;
396         recver.type = TypeNodeGroup;
397         recver.data.group.id = env->getGroupNum();
398         recver.data.group.onPE = destNode;
399         generateCommonTicketRequest(recver,env,destNode,_infoIdx);
400 }
401
402 //send a ticket request to an array element
403 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx){
404         CkObjID recver;
405         recver.type = TypeArray;
406         recver.data.array.id = env->getsetArrayMgr();
407         recver.data.array.idx.asMax() = *(&env->getsetArrayIndex());
408
409         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
410                 char recverString[100],senderString[100];
411                 
412                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
413         }
414
415         generateCommonTicketRequest(recver,env,destPE,_infoIdx);
416 };
417
418 /**
419  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
420  */
421 void generateCommonTicketRequest(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
422         envelope *env = _env;
423         int resend=0; //is it a resend
424         char recverName[100];
425         double _startTime=CkWallTimer();
426         
427         if(CpvAccess(_currentObj) == NULL){
428 //              CkAssert(0);
429                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
430                 generalCldEnqueue(destPE,env,_infoIdx);
431                 return;
432         }
433         
434         if(env->sender.type == TypeInvalid){
435                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
436                 //Set message logging data in the envelope
437         }else{
438                 envelope *copyEnv = copyEnvelope(env);
439                 env = copyEnv;
440                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
441                 env->SN = 0;
442         }
443         
444         CkObjID &sender = env->sender;
445         env->recver = recver;
446
447         Chare *obj = (Chare *)env->sender.getObject();
448           
449         if(env->SN == 0){
450                 env->SN = obj->mlogData->nextSN(recver);
451         }else{
452                 resend = 1;
453         }
454         
455         char senderString[100];
456 //      if(env->SN != 1){
457                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
458         //      CmiPrintStackTrace(0);
459 /*      }else{
460                 DEBUGRESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
461         }*/
462                 
463         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
464 //      CkPackMessage(&(mEntry->env));
465 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
466         
467         _startTime = CkWallTimer();
468
469         // uses the proper ticketing mechanism for local, group and general messages
470         if(isLocal(destPE)){
471                 ticketLogLocalMessage(mEntry);
472         }else{
473                 sendTicketRequest(sender,recver,destPE,mEntry,env->SN,resend);
474         }
475 }
476
477 /**
478  * Determines if the message is local or not. A message is local if:
479  * 1) Both the destination and origin are the same PE.
480  */
481 inline bool isLocal(int destPE){
482         // both the destination and the origin are the same PE
483         if(destPE == CkMyPe())
484                 return true;
485
486         return false;
487 }
488
489 /**
490  * Determines if the message is group local or not. A message is group local if:
491  * 1) They belong to the same group in the group-based message logging.
492  */
493 inline bool isTeamLocal(int destPE){
494
495         // they belong to the same group
496         if(TEAM_SIZE_MLOG > 1 && destPE/TEAM_SIZE_MLOG == CkMyPe()/TEAM_SIZE_MLOG)
497                 return true;
498
499         return false;
500 }
501
502
503
504 /**
505  * Method that does the actual send by creating a ticket request filling it up and sending it.
506  */
507 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
508         char recverString[100],senderString[100];
509         envelope *env = entry->env;
510         DEBUG(printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
511 /*      envelope *env = entry->env;
512         printf("[%d] Sending ticket Request to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer());*/
513
514         Chare *obj = (Chare *)entry->env->sender.getObject();
515         if(!resend){
516                 //TML: only stores message if either it goes to this processor or to a processor in a different group
517                 if(!isTeamLocal(entry->destPE)){
518                         obj->mlogData->addLogEntry(entry);
519                         MLOGFT_totalMessages += 1.0;
520                         MLOGFT_totalLogSize += entry->env->getTotalsize();
521                 }else{
522                         // the message has to be deleted after it has been sent
523                         entry->env->freeMsg = true;
524                 }
525         }
526
527 #ifdef BUFFERED_REMOTE
528         //buffer the ticket request 
529         if(CpvAccess(_bufferedTicketRequests)[destPE] == NULL){
530                 //first message to this processor, buffer needs to be created
531                 int _allocSize = sizeof(TicketRequest)*_maxBufferedTicketRequests + sizeof(BufferedTicketRequestHeader);
532                 CpvAccess(_bufferedTicketRequests)[destPE] = (char *)CmiAlloc(_allocSize);
533                 DEBUG(CmiPrintf("[%d] _bufferedTicketRequests[%d] allocated as %p\n",CmiMyPe(),destPE,&((CpvAccess(_bufferedTicketRequests))[destPE][0])));
534         }
535         //CpvAccess(_bufferedTicketRequests)[destPE]->enq(ticketRequest);
536         //Buffer the ticketrequests
537         TicketRequest *ticketRequest = (TicketRequest *)&(CpvAccess(_bufferedTicketRequests)[destPE][sizeof(BufferedTicketRequestHeader)+CpvAccess(_numBufferedTicketRequests)[destPE]*sizeof(TicketRequest)]);
538         ticketRequest->sender = sender;
539         ticketRequest->recver = recver;
540         ticketRequest->logEntry = entry;
541         ticketRequest->SN = SN;
542         ticketRequest->senderPE = CkMyPe();
543
544         CpvAccess(_numBufferedTicketRequests)[destPE]++;
545         
546         
547         if(CpvAccess(_numBufferedTicketRequests)[destPE] >= _maxBufferedTicketRequests){
548                 sendBufferedTicketRequests(destPE);
549         }else{
550                 if(CpvAccess(_numBufferedTicketRequests)[destPE] == 1){
551                         int *checkPE = new int;
552                         *checkPE = destPE;
553                         CcdCallFnAfter( checkBufferedTicketRequests ,checkPE , BUFFER_TIME);            
554                 }
555         }
556 #else
557
558         TicketRequest ticketRequest;
559         ticketRequest.sender = sender;
560         ticketRequest.recver = recver;
561         ticketRequest.logEntry = entry;
562         ticketRequest.SN = SN;
563         ticketRequest.senderPE = CkMyPe();
564         
565         CmiSetHandler((void *)&ticketRequest,_ticketRequestHandlerIdx);
566 //      CmiBecomeImmediate(&ticketRequest);
567         CmiSyncSend(destPE,sizeof(TicketRequest),(char *)&ticketRequest);
568 #endif
569         DEBUG(CmiMemoryCheck());
570 };
571
572 /**
573  * Send the ticket requests buffered for processor PE
574  **/
575 void sendBufferedTicketRequests(int destPE){
576         DEBUG(CmiMemoryCheck());
577         int numberRequests = CpvAccess(_numBufferedTicketRequests)[destPE];
578         if(numberRequests == 0){
579                 return;
580         }
581         DEBUG(printf("[%d] Send Buffered Ticket Requests to %d number %d\n",CkMyPe(),destPE,numberRequests));
582         int totalSize = sizeof(BufferedTicketRequestHeader )+numberRequests*(sizeof(TicketRequest));
583         void *buf = &(CpvAccess(_bufferedTicketRequests)[destPE][0]);
584         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)buf;
585         header->numberLogs = numberRequests;
586         
587         CmiSetHandler(buf,_bufferedTicketRequestHandlerIdx);
588         CmiSyncSend(destPE,totalSize,(char *)buf);
589         
590         CpvAccess(_numBufferedTicketRequests)[destPE]=0;
591         DEBUG(CmiMemoryCheck());
592 };
593
594 void checkBufferedTicketRequests(void *_destPE,double curWallTime){
595         int destPE = *(int *)_destPE;
596   if(CpvAccess(_numBufferedTicketRequests)[destPE] > 0){
597                 sendBufferedTicketRequests(destPE);
598 //              traceUserEvent(35);
599         }
600         delete (int *)_destPE;
601         DEBUG(CmiMemoryCheck());
602 };
603
604 /**
605  * Gets a ticket for a local message and then sends a copy to the buddy.
606  * This method is always in the main thread(not interrupt).. so it should 
607  * never find itself locked out of a newTicket.
608  */
609 void ticketLogLocalMessage(MlogEntry *entry){
610         double _startTime=CkWallTimer();
611         DEBUG(CmiMemoryCheck());
612
613         Chare *recverObj = (Chare *)entry->env->recver.getObject();
614         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
615         if(recverObj){
616                 //Consider the case, after a restart when this message has already been allotted a ticket number
617                 // and should get the same one as the old one.
618                 Ticket ticket;
619                 if(recverObj->mlogData->mapTable.numObjects() > 0){
620                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(entry->env->sender,entry->env->recver,entry->env->SN);
621                 }else{
622                         ticket.TN = 0;
623                 }
624                 
625                 char senderString[100], recverString[100] ;
626                 
627                 if(ticket.TN == 0){
628                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
629         
630                         if(ticket.TN == 0){
631                                 CpvAccess(_delayedLocalTicketRequests)->enq(entry);
632                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
633                                 
634         //              _lockNewTicket = 0;
635 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
636                                 return;
637                         }
638                 }       
639                 //TODO: check for the case when an invalid ticket is returned
640                 //TODO: check for OLD or RECEIVED TICKETS
641                 entry->env->TN = ticket.TN;
642                 CkAssert(entry->env->TN > 0);
643                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
644         
645                 // sends a copy of the metadata to the buddy    
646                 sendLocalMessageCopy(entry);
647                 
648                 DEBUG(CmiMemoryCheck());
649
650                 // sets the unackedLocal flag and stores the message in the log
651                 entry->unackedLocal = 1;
652                 CpvAccess(_currentObj)->mlogData->addLogEntry(entry);
653
654                 DEBUG(CmiMemoryCheck());
655         }else{
656                 CkPrintf("[%d] Local message in group-based message logging %d to %d\n",CkMyPe(),CkMyPe(),entry->destPE);
657                 DEBUG(printf("[%d] Local recver object in NULL \n",CmiMyPe()););
658         }
659         _lockNewTicket=0;
660 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
661 };
662
663 /**
664  * Sends the metadata of a local message to its buddy.
665  */
666 void sendLocalMessageCopy(MlogEntry *entry){
667         LocalMessageLog msgLog;
668         msgLog.sender = entry->env->sender;
669         msgLog.recver = entry->env->recver;
670         msgLog.SN = entry->env->SN;
671         msgLog.TN = entry->env->TN;
672         msgLog.entry = entry;
673         msgLog.senderPE = CkMyPe();
674         
675         char recvString[100];
676         char senderString[100];
677         DEBUG(printf("[%d] Sending local message log from %s to %s SN %d TN %d to processor %d handler %d time %.6lf entry %p env %p \n",CkMyPe(),msgLog.sender.toString(senderString),msgLog.recver.toString(recvString),msgLog.SN,    msgLog.TN,getCheckPointPE(),_localMessageCopyHandlerIdx,CkWallTimer(),entry,entry->env));
678
679 #ifdef BUFFERED_LOCAL
680         countLocal++;
681         CpvAccess(_bufferedLocalMessageLogs)->enq(msgLog);
682         if(CpvAccess(_bufferedLocalMessageLogs)->length() >= _maxBufferedMessages){
683                 sendBufferedLocalMessageCopy();
684         }else{
685                 if(countClearBufferedLocalCalls < 10 && CpvAccess(_bufferedLocalMessageLogs)->length() == 1){
686                         lastBufferedLocalMessageCopyTime = CkWallTimer();
687                         CcdCallFnAfter( checkBufferedLocalMessageCopy ,NULL , BUFFER_TIME);
688                         countClearBufferedLocalCalls++;
689                 }       
690         }
691 #else   
692         CmiSetHandler((void *)&msgLog,_localMessageCopyHandlerIdx);
693         
694         CmiSyncSend(getCheckPointPE(),sizeof(LocalMessageLog),(char *)&msgLog);
695 #endif
696         DEBUG(CmiMemoryCheck());
697 };
698
699
700 void sendBufferedLocalMessageCopy(){
701         int numberLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
702         if(numberLogs == 0){
703                 return;
704         }
705         countBuffered++;
706         int totalSize = sizeof(BufferedLocalLogHeader)+numberLogs*(sizeof(LocalMessageLog));
707         void *buf=CmiAlloc(totalSize);
708         BufferedLocalLogHeader *header = (BufferedLocalLogHeader *)buf;
709         header->numberLogs=numberLogs;
710
711         DEBUG(CmiMemoryCheck());
712         DEBUG(printf("[%d] numberLogs in sendBufferedCopy = %d buf %p\n",CkMyPe(),numberLogs,buf));
713         
714         char *ptr = (char *)buf;
715         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
716         
717         for(int i=0;i<numberLogs;i++){
718                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
719                 memcpy(ptr,&log,sizeof(LocalMessageLog));
720                 ptr = &ptr[sizeof(LocalMessageLog)];
721         }
722
723         CmiSetHandler(buf,_bufferedLocalMessageCopyHandlerIdx);
724
725         CmiSyncSendAndFree(getCheckPointPE(),totalSize,(char *)buf);
726         DEBUG(CmiMemoryCheck());
727 };
728
729 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime){
730         countClearBufferedLocalCalls--;
731         if(countClearBufferedLocalCalls > 10){
732                 CmiAbort("multiple checkBufferedLocalMessageCopy being called \n");
733         }
734         DEBUG(CmiMemoryCheck());
735         DEBUG(printf("[%d] checkBufferedLocalMessageCopy \n",CkMyPe()));
736         if((curWallTime-lastBufferedLocalMessageCopyTime)*1000 > BUFFER_TIME && CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
737                 if(CpvAccess(_bufferedLocalMessageLogs)->length() > 0){
738                         sendBufferedLocalMessageCopy();
739 //                      traceUserEvent(36);
740                 }
741         }
742         DEBUG(CmiMemoryCheck());
743 }
744
745 /****
746         The handler functions
747 *****/
748
749
750 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply=NULL);
751 /**
752  *  If there are any delayed requests, process them first before 
753  *  processing this request
754  * */
755 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
756         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
757         double  _startTime = CkWallTimer();
758         if(CpvAccess(_delayedTicketRequests)->length() > 0){
759                 retryTicketRequest(NULL,_startTime);
760         }
761         _processTicketRequest(ticketRequest);
762         CmiFree(ticketRequest);
763 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
764 }
765 /** Handler used for dealing with a bunch of ticket requests
766  * from one processor. The replies are also bunched together
767  * Does not use _ticketRequestHandler
768  * */
769 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader){
770         DEBUG(printf("[%d] Buffered Ticket Request handler started header %p\n",CkMyPe(),recvdHeader));
771         DEBUG(CmiMemoryCheck());
772         double _startTime = CkWallTimer();
773         if(CpvAccess(_delayedTicketRequests)->length() > 0){
774                 retryTicketRequest(NULL,_startTime);
775         }
776         DEBUG(CmiMemoryCheck());
777   int numRequests = recvdHeader->numberLogs;
778         char *msg = (char *)recvdHeader;
779         msg = &msg[sizeof(BufferedTicketRequestHeader)];
780         int senderPE=((TicketRequest *)msg)->senderPE;
781
782         
783         int totalSize = sizeof(BufferedTicketRequestHeader)+numRequests*sizeof(TicketReply);
784         void *buf = (void *)&(CpvAccess(_bufferTicketReply)[0]);
785         
786         char *ptr = (char *)buf;
787         BufferedTicketRequestHeader *header = (BufferedTicketRequestHeader *)ptr;
788         header->numberLogs = 0;
789
790         DEBUG(CmiMemoryCheck());
791         
792         ptr = &ptr[sizeof(BufferedTicketRequestHeader)]; //ptr at which the ticket replies will be stored
793         
794         for(int i=0;i<numRequests;i++){
795                 TicketRequest *request = (TicketRequest *)msg;
796                 msg = &msg[sizeof(TicketRequest)];
797                 bool replied = _processTicketRequest(request,(TicketReply *)ptr);
798
799                 if(replied){
800                         //the ticket request has been processed and 
801                         //the reply will be stored in the ptr
802                         header->numberLogs++;
803                         ptr = &ptr[sizeof(TicketReply)];
804                 }
805         }
806 /*      if(header->numberLogs == 0){
807                         printf("[%d] *************** Not sending any replies to previous buffered ticketRequest \n",CkMyPe());
808         }*/
809
810         CmiSetHandler(buf,_bufferedTicketHandlerIdx);
811         CmiSyncSend(senderPE,totalSize,(char *)buf);
812         CmiFree(recvdHeader);
813 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
814         DEBUG(CmiMemoryCheck());
815 };
816
817 /**Process the ticket request. 
818  * If it is processed and a reply is being sent 
819  * by this processor return true
820  * else return false.
821  * If a reply buffer is specified put the reply into that
822  * else send the reply
823  * */
824 inline bool _processTicketRequest(TicketRequest *ticketRequest,TicketReply *reply){
825
826 /*      if(_lockNewTicket){
827                 printf("ddeded %d\n",CkMyPe());
828                 if(CmiIsImmediate(ticketRequest)){
829                         CmiSetHandler(ticketRequest, (CmiGetHandler(ticketRequest))^0x8000);
830                 }
831                 CmiSyncSend(CkMyPe(),sizeof(TicketRequest),(char *)ticketRequest);
832                 
833         }else{
834                 _lockNewTicket = 1;
835         }*/
836
837         DEBUG(CmiMemoryCheck());
838         CkObjID sender = ticketRequest->sender;
839         CkObjID recver = ticketRequest->recver;
840         MCount SN = ticketRequest->SN;
841         
842         
843         Chare *recverObj = (Chare *)recver.getObject();
844         
845         
846         char recverName[100];
847         DEBUG(recver.toString(recverName);)
848         if(recverObj == NULL){
849                 int estPE = recver.guessPE();
850                 if(estPE == CkMyPe() || estPE == -1){           
851                         //try to fulfill the request after some time
852                         char senderString[100];
853                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed estPE %d mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),estPE,ticketRequest));
854                         if(estPE == CkMyPe() && recver.type == TypeArray){
855                                 CkArrayID aid(recver.data.array.id);            
856                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
857                                 DEBUG(printf("[%d] Object with delayed ticket request has home at %d\n",CkMyPe(),locMgr->homePe(recver.data.array.idx.asMax())));
858
859                         }
860                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
861                         *delayed = *ticketRequest;
862                         CpvAccess(_delayedTicketRequests)->enq(delayed);
863                         
864                 }else{
865                         DEBUGRESTART(printf("[%d] Ticket request to %s SN %d needs to be forwarded estPE %d mesg %p\n",CkMyPe(),recver.toString(recverName), SN,estPE,ticketRequest));
866                         TicketRequest forward = *ticketRequest;
867                         CmiSetHandler(&forward,_ticketRequestHandlerIdx);
868                         CmiSyncSend(estPE,sizeof(TicketRequest),(char *)&forward);                      
869                         
870                         
871                 }
872         DEBUG(CmiMemoryCheck());
873                 return false; // if the receverObj does not exist the ticket request cannot have been 
874                               // processed successfully
875         }else{
876                 char senderString[100];
877                 Ticket ticket;
878                 //check if a ticket for this has been already handed out to an object that used to be local but 
879                 // is no longer so.. need for parallel restart
880                 
881                 if(recverObj->mlogData->mapTable.numObjects() > 0){
882                         ticket.TN = recverObj->mlogData->searchRestoredLocalQ(ticketRequest->sender,ticketRequest->recver,ticketRequest->SN);
883                 }
884                 
885                 if(ticket.TN == 0){
886                         ticket = recverObj->mlogData->next_ticket(sender,SN);
887                 }
888                 if(ticket.TN > recverObj->mlogData->tProcessed){
889                         ticket.state = NEW_TICKET;
890                 }else{
891                         ticket.state = OLD_TICKET;
892                 }
893                 //TODO: check for the case when an invalid ticket is returned
894                 if(ticket.TN == 0){
895                         DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
896                         TicketRequest *delayed = (TicketRequest*)CmiAlloc(sizeof(TicketRequest));
897                         *delayed = *ticketRequest;
898                         CpvAccess(_delayedTicketRequests)->enq(delayed);
899                         return false;
900                 }
901 /*              if(ticket.TN < SN){ //error state this really should not happen
902                         recver.toString(recverName);
903                   printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer());
904                 }*/
905 //              CkAssert(ticket.TN >= SN);
906                 DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
907
908 //              TicketReply *ticketReply = (TicketReply *)CmiAlloc(sizeof(TicketReply));
909     if(reply == NULL){ 
910                         //There is no reply buffer and the ticketreply is going to be 
911                         //sent immediately
912                         TicketReply ticketReply;
913                         ticketReply.request = *ticketRequest;
914                         ticketReply.ticket = ticket;
915                         ticketReply.recverPE = CkMyPe();
916                 
917                         CmiSetHandler(&ticketReply,_ticketHandlerIdx);
918 //              CmiBecomeImmediate(&ticketReply);
919                         CmiSyncSend(ticketRequest->senderPE,sizeof(TicketReply),(char *)&ticketReply);
920          }else{ // Store ticket reply in the buffer provided
921                  reply->request = *ticketRequest;
922                  reply->ticket = ticket;
923                  reply->recverPE = CkMyPe();
924                  CmiSetHandler(reply,_ticketHandlerIdx); // not strictly necessary but will do that 
925                                                          // in case the ticket needs to be forwarded or something
926
927          }
928                 DEBUG(CmiMemoryCheck());
929                 return true;
930         }
931 //      _lockNewTicket=0;
932 };
933
934
935 /**
936  * @brief This function handles the ticket received after a request.
937  */
938 inline void _ticketHandler(TicketReply *ticketReply){
939
940         double _startTime = CkWallTimer();
941         DEBUG(CmiMemoryCheck());        
942         
943         char senderString[100];
944         CkObjID sender = ticketReply->request.sender;
945         CkObjID recver = ticketReply->request.recver;
946         
947         if(sender.guessPE() != CkMyPe()){       
948                 DEBUG(CkAssert(sender.guessPE()>= 0));
949                 DEBUG(printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE()));
950         //      printf("[%d] TN %d forwarded to %s on PE %d \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),sender.guessPE());
951                 ticketReply->ticket.state = ticketReply->ticket.state | FORWARDED_TICKET;
952                 CmiSetHandler(ticketReply,_ticketHandlerIdx);
953 #ifdef BUFFERED_REMOTE
954                 //will be freed by the buffered ticket handler most of the time
955                 //this might lead to a leak just after migration
956                 //when the ticketHandler is directly used without going through the buffered handler
957                 CmiSyncSend(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
958 #else
959                 CmiSyncSendAndFree(sender.guessPE(),sizeof(TicketReply),(char *)ticketReply);
960 #endif  
961         }else{
962                 char recverName[100];
963                 DEBUG(printf("[%d] TN %d received for %s SN %d from %s  time %.6lf \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString),ticketReply->request.SN,recver.toString(recverName),CmiWallTimer()));
964                 MlogEntry *logEntry=NULL;
965                 if(ticketReply->ticket.state & FORWARDED_TICKET){
966                         // Handle the case when you receive a forwarded message, We need to search through the message queue since the logEntry pointer is no longer valid
967                         DEBUG(printf("[%d] TN %d received for %s has been forwarded \n",CkMyPe(),ticketReply->ticket.TN,sender.toString(senderString)));
968                         Chare *senderObj = (Chare *)sender.getObject();
969                         if(senderObj){
970                                 CkQ<MlogEntry *> *mlog = senderObj->mlogData->getMlog();
971                                 for(int i=0;i<mlog->length();i++){
972                                         MlogEntry *tempEntry = (*mlog)[i];
973                                         if(tempEntry->env != NULL && ticketReply->request.sender == tempEntry->env->sender && ticketReply->request.recver == tempEntry->env->recver && ticketReply->request.SN == tempEntry->env->SN){
974                                                 logEntry = tempEntry;
975                                                 break;
976                                         }
977                                 }
978                                 if(logEntry == NULL){
979 #ifdef BUFFERED_REMOTE
980 #else
981                                         CmiFree(ticketReply);
982 #endif                                  
983                                         return;
984                                 }
985                         }else{
986                                 CmiAbort("This processor thinks it should have the sender\n");
987                         }
988                         ticketReply->ticket.state ^= FORWARDED_TICKET;
989                 }else{
990                         logEntry = ticketReply->request.logEntry;
991                 }
992                 if(logEntry->env->TN <= 0){
993                         //This logEntry has not received a TN earlier
994                         char recverString[100];
995                         logEntry->env->TN = ticketReply->ticket.TN;
996                         logEntry->env->setSrcPe(CkMyPe());
997                         if(ticketReply->ticket.state == NEW_TICKET){
998
999                                 // if message is group local, we store its metadata in teamTable
1000                                 if(isTeamLocal(ticketReply->recverPE)){
1001                                         DEBUG_TEAM(CkPrintf("[%d] Storing meta data for intragroup message %u\n",CkMyPe(),ticketReply->request.SN);)
1002                                         Chare *senderObj = (Chare *)sender.getObject();
1003                                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
1004                                         if(ticketRow == NULL){
1005                                                 ticketRow = new SNToTicket();
1006                                                 senderObj->mlogData->teamTable.put(recver) = ticketRow; 
1007                                         }
1008                                         ticketRow->put(ticketReply->request.SN) = ticketReply->ticket;
1009                                 }
1010
1011                                 DEBUG(printf("[%d] Message sender %s recver %s SN %d TN %d to processor %d env %p size %d \n",CkMyPe(),sender.toString(senderString),recver.toString(recverString), ticketReply->request.SN,ticketReply->ticket.TN,ticketReply->recverPE,logEntry->env,logEntry->env->getTotalsize()));
1012                                 if(ticketReply->recverPE != CkMyPe()){
1013                                         generalCldEnqueue(ticketReply->recverPE,logEntry->env,logEntry->_infoIdx);
1014                                 }else{
1015                                         //It is now a local message use the local message protocol
1016                                         sendLocalMessageCopy(logEntry);
1017                                 }       
1018                         }
1019                 }else{
1020                         DEBUG(printf("[%d] Message sender %s recver %s SN %d already had TN %d received TN %d\n",CkMyPe(),sender.toString(senderString),recver.toString(recverName),ticketReply->request.SN,logEntry->env->TN,ticketReply->ticket.TN));
1021                 }
1022                 recver.updatePosition(ticketReply->recverPE);
1023 #ifdef BUFFERED_REMOTE
1024 #else
1025                 CmiFree(ticketReply);
1026 #endif
1027         }
1028         CmiMemoryCheck();
1029
1030 //      traceUserBracketEvent(22,_startTime,CkWallTimer());     
1031 };
1032
1033 /**
1034  * Message to handle the bunch of tickets 
1035  * that we get from one processor. We send 
1036  * the tickets to be handled one at a time
1037  * */
1038
1039 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader){
1040         double _startTime=CmiWallTimer();
1041         int numTickets = recvdHeader->numberLogs;
1042         char *msg = (char *)recvdHeader;
1043         msg = &msg[sizeof(BufferedTicketRequestHeader)];
1044         DEBUG(CmiMemoryCheck());
1045         
1046         TicketReply *_reply = (TicketReply *)msg;
1047
1048         
1049         for(int i=0;i<numTickets;i++){
1050                 TicketReply *reply = (TicketReply *)msg;
1051                 _ticketHandler(reply);
1052                 
1053                 msg = &msg[sizeof(TicketReply)];
1054         }
1055         
1056         CmiFree(recvdHeader);
1057 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1058         DEBUG(CmiMemoryCheck());
1059 };
1060
1061 /**
1062  * Stores the metadata of a local message from its buddy.
1063  */
1064 void _localMessageCopyHandler(LocalMessageLog *msgLog){
1065         double _startTime = CkWallTimer();
1066         
1067         char senderString[100],recverString[100];
1068         DEBUG(printf("[%d] Local Message log from processor %d sender %s recver %s TN %d handler %d time %.6lf \n",CkMyPe(),msgLog->PE,msgLog->sender.toString(senderString),msgLog->recver.toString(recverString),msgLog->TN,_localMessageAckHandlerIdx,CmiWallTimer()));
1069 /*      if(!fault_aware(msgLog->recver)){
1070                 CmiAbort("localMessageCopyHandler with non fault aware local message copy");
1071         }*/
1072         CpvAccess(_localMessageLog)->enq(*msgLog);
1073         
1074         LocalMessageLogAck ack;
1075         ack.entry = msgLog->entry;
1076         DEBUG(printf("[%d] About to send back ack \n",CkMyPe()));
1077         CmiSetHandler(&ack,_localMessageAckHandlerIdx);
1078         CmiSyncSend(msgLog->senderPE,sizeof(LocalMessageLogAck),(char *)&ack);
1079         
1080 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1081 };
1082
1083 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader){
1084         double _startTime = CkWallTimer();
1085         DEBUG(CmiMemoryCheck());
1086         
1087         int numLogs = recvdHeader->numberLogs;
1088         char *msg = (char *)recvdHeader;
1089
1090         //piggy back the logs already stored on this processor
1091         int numPiggyLogs = CpvAccess(_bufferedLocalMessageLogs)->length();
1092         numPiggyLogs=0; //uncomment to turn off piggy backing of acks
1093 /*      if(numPiggyLogs > 0){
1094                 if((*CpvAccess(_bufferedLocalMessageLogs))[0].PE != getCheckPointPE()){
1095                         CmiAssert(0);
1096                 }
1097         }*/
1098         DEBUG(printf("[%d] _bufferedLocalMessageCopyHandler numLogs %d numPiggyLogs %d\n",CmiMyPe(),numLogs,numPiggyLogs));
1099         
1100         int totalSize = sizeof(BufferedLocalLogHeader)+numLogs*sizeof(LocalMessageLogAck)+sizeof(BufferedLocalLogHeader)+numPiggyLogs*sizeof(LocalMessageLog);
1101         void *buf = CmiAlloc(totalSize);
1102         char *ptr = (char *)buf;
1103         memcpy(ptr,msg,sizeof(BufferedLocalLogHeader));
1104         
1105         msg = &msg[sizeof(BufferedLocalLogHeader)];
1106         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1107
1108         DEBUG(CmiMemoryCheck());
1109         int PE;
1110         for(int i=0;i<numLogs;i++){
1111                 LocalMessageLog *msgLog = (LocalMessageLog *)msg;
1112                 CpvAccess(_localMessageLog)->enq(*msgLog);
1113                 PE = msgLog->senderPE;
1114                 DEBUG(CmiAssert( PE == getCheckPointPE()));
1115
1116                 LocalMessageLogAck *ack = (LocalMessageLogAck *)ptr;
1117                 ack->entry = msgLog->entry;
1118                 
1119                 msg = &msg[sizeof(LocalMessageLog)];
1120                 ptr = &ptr[sizeof(LocalMessageLogAck)];
1121         }
1122         DEBUG(CmiMemoryCheck());
1123
1124         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)ptr;
1125         piggyHeader->numberLogs = numPiggyLogs;
1126         ptr = &ptr[sizeof(BufferedLocalLogHeader)];
1127         if(numPiggyLogs > 0){
1128                 countPiggy++;
1129         }
1130
1131         for(int i=0;i<numPiggyLogs;i++){
1132                 LocalMessageLog log = CpvAccess(_bufferedLocalMessageLogs)->deq();
1133                 memcpy(ptr,&log,sizeof(LocalMessageLog));
1134                 ptr = &ptr[sizeof(LocalMessageLog)];
1135         }
1136         DEBUG(CmiMemoryCheck());
1137         
1138         CmiSetHandler(buf,_bufferedLocalMessageAckHandlerIdx);
1139         CmiSyncSendAndFree(PE,totalSize,(char *)buf);
1140                 
1141 /*      for(int i=0;i<CpvAccess(_localMessageLog)->length();i++){
1142                         LocalMessageLog localLogEntry = (*CpvAccess(_localMessageLog))[i];
1143                         if(!fault_aware(localLogEntry.recver)){
1144                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1145                         }
1146         }*/
1147         if(freeHeader){
1148                 CmiFree(recvdHeader);
1149         }
1150         DEBUG(CmiMemoryCheck());
1151 //      traceUserBracketEvent(23,_startTime,CkWallTimer());
1152 }
1153
1154
1155 void _localMessageAckHandler(LocalMessageLogAck *ack){
1156         
1157         double _startTime = CkWallTimer();
1158         
1159         MlogEntry *entry = ack->entry;
1160         if(entry == NULL){
1161                 CkExit();
1162         }
1163         entry->unackedLocal = 0;
1164         envelope *env = entry->env;
1165         char recverName[100];
1166         char senderString[100];
1167         DEBUG(CmiMemoryCheck());
1168         
1169         DEBUG(printf("[%d] at start of local message ack handler for entry %p env %p\n",CkMyPe(),entry,env));
1170         if(env == NULL)
1171                 return;
1172         CkAssert(env->SN > 0);
1173         CkAssert(env->TN > 0);
1174         env->sender.toString(senderString);
1175         DEBUG(printf("[%d] local message ack handler verified sender \n",CkMyPe()));
1176         env->recver.toString(recverName);
1177
1178         DEBUG(printf("[%d] Local Message log ack received for message from %s to %s TN %d time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(recverName),env->TN,CkWallTimer()));
1179         
1180 /*      
1181         void *origMsg = EnvToUsr(env);
1182         void *copyMsg = CkCopyMsg(&origMsg);
1183         envelope *copyEnv = UsrToEnv(copyMsg);
1184         entry->env = UsrToEnv(origMsg);*/
1185
1186 //      envelope *copyEnv = copyEnvelope(env);
1187
1188         envelope *copyEnv = env;
1189         copyEnv->localMlogEntry = entry;
1190
1191         DEBUG(printf("[%d] Local message copied response to ack \n",CkMyPe()));
1192         if(CmiMyPe() != entry->destPE){
1193                 DEBUG(printf("[%d] Formerly remote message to PE %d converted to local\n",CmiMyPe(),entry->destPE));
1194         }
1195 //      generalCldEnqueue(entry->destPE,copyEnv,entry->_infoIdx)
1196         _skipCldEnqueue(CmiMyPe(),copyEnv,entry->_infoIdx);     
1197         
1198         
1199 #ifdef BUFFERED_LOCAL
1200 #else
1201         CmiFree(ack);
1202 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1203 #endif
1204         
1205         DEBUG(CmiMemoryCheck());
1206         DEBUG(printf("[%d] Local message log ack handled \n",CkMyPe()));
1207 }
1208
1209
1210 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader){
1211
1212         double _startTime=CkWallTimer();
1213         DEBUG(CmiMemoryCheck());
1214
1215         int numLogs = recvdHeader->numberLogs;
1216         char *msg = (char *)recvdHeader;
1217         msg = &msg[sizeof(BufferedLocalLogHeader)];
1218
1219         DEBUG(printf("[%d] _bufferedLocalMessageAckHandler numLogs %d \n",CmiMyPe(),numLogs));
1220         
1221         for(int i=0;i<numLogs;i++){
1222                 LocalMessageLogAck *ack = (LocalMessageLogAck *)msg;
1223                 _localMessageAckHandler(ack);
1224                 
1225                 msg = &msg[sizeof(LocalMessageLogAck)]; 
1226         }
1227
1228         //deal with piggy backed local logs
1229         BufferedLocalLogHeader *piggyHeader = (BufferedLocalLogHeader *)msg;
1230         //printf("[%d] number of local logs piggied with ack %d \n",CkMyPe(),piggyHeader->numberLogs);
1231         if(piggyHeader->numberLogs > 0){
1232                 _bufferedLocalMessageCopyHandler(piggyHeader,0);
1233         }
1234         
1235         CmiFree(recvdHeader);
1236         DEBUG(CmiMemoryCheck());
1237 //      traceUserBracketEvent(24,_startTime,CkWallTimer());
1238 }
1239
1240
1241
1242
1243
1244
1245 bool fault_aware(CkObjID &recver){
1246         switch(recver.type){
1247                 case TypeChare:
1248                         return false;
1249                 case TypeGroup:
1250                 case TypeNodeGroup:
1251                 case TypeArray:
1252                         return true;
1253                 default:
1254                         return false;
1255         }
1256 };
1257
1258 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **logEntryPointer){
1259         char recverString[100];
1260         char senderString[100];
1261         
1262         DEBUG(CmiMemoryCheck());
1263         CkObjID recver = env->recver;
1264         if(!fault_aware(recver))
1265                 return 1;
1266
1267
1268         Chare *obj = (Chare *)recver.getObject();
1269         *objPointer = obj;
1270         if(obj == NULL){
1271                 int possiblePE = recver.guessPE();
1272                 if(possiblePE != CkMyPe()){
1273                         int totalSize = env->getTotalsize();                    
1274                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1275                 }
1276                 return 0;
1277         }
1278
1279
1280         double _startTime = CkWallTimer();
1281 //env->sender.updatePosition(env->getSrcPe());
1282         if(env->TN == obj->mlogData->tProcessed+1){
1283                 //the message that needs to be processed now
1284                 DEBUG(printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1285                 // once we find a message that we can process we put back all the messages in the out of order queue
1286                 // back into the main scheduler queue. 
1287                 if(env->sender.guessPE() == CkMyPe()){
1288                         *logEntryPointer = env->localMlogEntry;
1289                 }
1290         DEBUG(CmiMemoryCheck());
1291                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1292                         void *qMsgPtr;
1293                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1294                         envelope *qEnv = (envelope *)qMsgPtr;
1295                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());                       
1296         DEBUG(CmiMemoryCheck());
1297                 }
1298 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1299                 //TODO: this might be a problem.. change made for leanMD
1300 //              CpvAccess(_currentObj) = obj;
1301         DEBUG(CmiMemoryCheck());
1302                 return 1;
1303         }
1304         if(env->TN <= obj->mlogData->tProcessed){
1305                 //message already processed
1306                 DEBUG(printf("[%d] Message SN %d TN %d for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,recver.toString(recverString),obj->mlogData->tProcessed));
1307 //              traceUserBracketEvent(26,_startTime,CkWallTimer());
1308         DEBUG(CmiMemoryCheck());
1309                 return 0;
1310         }
1311         //message that needs to be processed in the future
1312
1313 //      DEBUG(printf("[%d] Early Message SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1314         //the message cant be processed now put it back in the out of order message Q, 
1315         //It will be transferred to the main queue later
1316         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1317 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1318         DEBUG(CmiMemoryCheck());
1319         
1320         return 0;
1321 }
1322
1323 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry){
1324         char senderString[100];
1325         if(obj){
1326                 if(sender.guessPE() == CkMyPe()){
1327                         if(entry != NULL){
1328                                 entry->env = NULL;
1329                         }
1330                 }
1331                 obj->mlogData->tProcessed++;
1332 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1333                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1334 //              CpvAccess(_currentObj)= NULL;
1335         }
1336         DEBUG(CmiMemoryCheck());
1337 }
1338
1339 /***
1340         Helpers for the handlers and message logging methods
1341 ***/
1342
1343 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx){
1344 //      double _startTime = CkWallTimer();
1345         if(env->recver.type != TypeNodeGroup){
1346         //This repeats a step performed in skipCldEnq for messages sent to
1347         //other processors. I do this here so that messages to local processors
1348         //undergo the same transformation.. It lets the resend be uniform for 
1349         //all messages
1350 //              CmiSetXHandler(env,CmiGetHandler(env));
1351                 _skipCldEnqueue(destPE,env,_infoIdx);
1352         }else{
1353                 _noCldNodeEnqueue(destPE,env);
1354         }
1355 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1356 }
1357 //extern "C" int CmiGetNonLocalLength();
1358 /** This method is used to retry the ticket requests
1359  * that had been queued up earlier
1360  * */
1361
1362 int calledRetryTicketRequest=0;
1363
1364 void retryTicketRequestTimer(void *_dummy,double _time){
1365                 calledRetryTicketRequest=0;
1366                 retryTicketRequest(_dummy,_time);
1367 }
1368
1369 void retryTicketRequest(void *_dummy,double curWallTime){       
1370         double start = CkWallTimer();
1371         DEBUG(CmiMemoryCheck());
1372         int length = CpvAccess(_delayedTicketRequests)->length();
1373         for(int i=0;i<length;i++){
1374                 TicketRequest *ticketRequest = CpvAccess(_delayedTicketRequests)->deq();
1375                 if(ticketRequest){
1376                         char senderString[100],recverString[100];
1377                         DEBUGRESTART(printf("[%d] RetryTicketRequest for ticket %p sender %s recver %s SN %d at %.6lf \n",CkMyPe(),ticketRequest,ticketRequest->sender.toString(senderString),ticketRequest->recver.toString(recverString), ticketRequest->SN, CmiWallTimer()));
1378                         DEBUG(CmiMemoryCheck());
1379                         _processTicketRequest(ticketRequest);
1380                   CmiFree(ticketRequest);
1381                         DEBUG(CmiMemoryCheck());
1382                 }       
1383         }       
1384         for(int i=0;i<CpvAccess(_delayedLocalTicketRequests)->length();i++){
1385                 MlogEntry *entry = CpvAccess(_delayedLocalTicketRequests)->deq();
1386                 ticketLogLocalMessage(entry);
1387         }
1388         int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue));
1389 //      int converse_qLength = CmiGetNonLocalLength();
1390         
1391 //      DEBUG(printf("[%d] Total RetryTicketRequest took %.6lf scheduler queue length %d converse queue length %d \n",CkMyPe(),CkWallTimer()-start,qLength,converse_qLength));
1392
1393 /*      PingMsg pingMsg;
1394         pingMsg.PE = CkMyPe();
1395         CmiSetHandler(&pingMsg,_pingHandlerIdx);
1396         if(CkMyPe() == 0 || CkMyPe() == CkNumPes() -1){
1397                 for(int i=0;i<CkNumPes();i++){
1398                         if(i != CkMyPe()){
1399                                 CmiSyncSend(i,sizeof(PingMsg),(char *)&pingMsg);
1400                         }
1401                 }
1402         }*/     
1403         //TODO: change this back to 100
1404         if(calledRetryTicketRequest == 0){
1405                 CcdCallFnAfter(retryTicketRequestTimer,NULL,500);       
1406                 calledRetryTicketRequest =1;
1407         }
1408         DEBUG(CmiMemoryCheck());
1409 }
1410
1411 void _pingHandler(CkPingMsg *msg){
1412         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1413         CmiFree(msg);
1414 }
1415
1416
1417 /*****************************************************************************
1418         Checkpointing methods..
1419                 Pack all the data on a processor and send it to the buddy periodically
1420                 Also used to throw away message logs
1421 *****************************************************************************/
1422 CkVec<TProcessedLog> processedTicketLog;
1423 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1424 void clearUpMigratedRetainedLists(int PE);
1425
1426 void checkpointAlarm(void *_dummy,double curWallTime){
1427         double diff = curWallTime-lastCompletedAlarm;
1428         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1429 /*      if(CkWallTimer()-lastRestart < 50){
1430                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1431                 return;
1432         }*/
1433         if(diff < ((chkptPeriod) - 2)){
1434                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1435                 return;
1436         }
1437         CheckpointRequest request;
1438         request.PE = CkMyPe();
1439         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1440         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1441 };
1442
1443 void _checkpointRequestHandler(CheckpointRequest *request){
1444         startMlogCheckpoint(NULL,CmiWallTimer());       
1445 }
1446
1447 void startMlogCheckpoint(void *_dummy,double curWallTime){
1448         double _startTime = CkWallTimer();
1449         checkpointCount++;
1450 /*      if(checkpointCount == 3 && CmiMyPe() == 4 && restarted == 0){
1451                 kill(getpid(),SIGKILL);
1452         }*/
1453         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1454                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1455         }
1456         PUP::sizer psizer;
1457         DEBUG(CmiMemoryCheck());
1458
1459         psizer | checkpointCount;
1460         
1461         CkPupROData(psizer);
1462         DEBUG(CmiMemoryCheck());
1463         CkPupGroupData(psizer);
1464         DEBUG(CmiMemoryCheck());
1465         CkPupNodeGroupData(psizer);
1466         DEBUG(CmiMemoryCheck());
1467         pupArrayElementsSkip(psizer,NULL);
1468         DEBUG(CmiMemoryCheck());
1469
1470         int dataSize = psizer.size();
1471         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1472         char *msg = (char *)CmiAlloc(totalSize);
1473         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1474         chkMsg->PE = CkMyPe();
1475         chkMsg->dataSize = dataSize;
1476
1477         
1478         char *buf = &msg[sizeof(CheckPointDataMsg)];
1479         PUP::toMem pBuf(buf);   
1480
1481         pBuf | checkpointCount;
1482         
1483         CkPupROData(pBuf);
1484         CkPupGroupData(pBuf);
1485         CkPupNodeGroupData(pBuf);
1486         pupArrayElementsSkip(pBuf,NULL);
1487
1488         unAckedCheckpoint=1;
1489         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1490         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1491         
1492         /*
1493                 Store the highest Ticket number processed for each chare on this processor
1494         */
1495         processedTicketLog.removeAll();
1496         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1497         if(CmiNumPes() < 256 || CmiMyPe() == 0){
1498                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1499         }
1500
1501         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1502                 lastCompletedAlarm = curWallTime;
1503                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1504         }
1505         traceUserBracketEvent(28,_startTime,CkWallTimer());
1506 };
1507
1508 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData){
1509         CkVec<TProcessedLog> *log = (   CkVec<TProcessedLog> *)data;
1510         TProcessedLog logEntry;
1511         logEntry.recver = mlogData->objID;
1512         logEntry.tProcessed = mlogData->tProcessed;
1513         log->push_back(logEntry);
1514         char objString[100];
1515         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1516 }
1517
1518 class ElementPacker : public CkLocIterator {
1519 private:
1520         CkLocMgr *locMgr;
1521         PUP::er &p;
1522 public:
1523                 ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1524                 void addLocation(CkLocation &loc) {
1525                         CkArrayIndexMax idx=loc.getIndex();
1526                         CkGroupID gID = locMgr->ckGetGroupID();
1527                         p|gID;      // store loc mgr's GID as well for easier restore
1528                         p|idx;
1529                         p|loc;
1530     }
1531 };
1532
1533
1534 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listsize){
1535         int numElements,i;
1536   int numGroups = CkpvAccess(_groupIDTable)->size();    
1537         if(!p.isUnpacking()){
1538                 numElements = CkCountArrayElements();
1539         }       
1540         p | numElements;
1541         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1542         if(!p.isUnpacking()){
1543                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1544         }else{
1545                 //Flush all recs of all LocMgrs before putting in new elements
1546 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1547                 for(int j=0;j<listsize;j++){
1548                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1549                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1550                                 listToSkip[j].idx.print();
1551                         }
1552                 }
1553                 
1554  printf("numElements = %d\n",numElements);
1555         
1556           for (int i=0; i<numElements; i++) {
1557                         CkGroupID gID;
1558                         CkArrayIndexMax idx;
1559                         p|gID;
1560             p|idx;
1561                         int flag=0;
1562                         int matchedIdx=0;
1563                         for(int j=0;j<listsize;j++){
1564                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1565                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1566                                                 matchedIdx = j;
1567                                                 flag = 1;
1568                                                 break;
1569                                         }
1570                                 }
1571                         }
1572                         if(flag == 1){
1573                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1574                         }else{
1575                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1576                         }
1577                                 
1578                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1579                         mgr->resume(idx,p,flag);
1580                         if(flag == 1){
1581                                 int homePE = mgr->homePe(idx);
1582                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1583                         }
1584           }
1585         }
1586 };
1587
1588
1589 void writeCheckpointToDisk(int size,char *chkpt){
1590         char fNameTemp[100];
1591         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1592         int fd = creat(fNameTemp,S_IRWXU);
1593         int ret = write(fd,chkpt,size);
1594         CkAssert(ret == size);
1595         close(fd);
1596         
1597         char fName[100];
1598         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1599         unlink(fName);
1600
1601         rename(fNameTemp,fName);
1602         
1603 }
1604
1605 //handler that receives the checkpoint from a processor
1606 //it stores it and acks it
1607 void _storeCheckpointHandler(char *msg){
1608         
1609         double _startTime=CkWallTimer();
1610                 
1611         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1612         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1613         
1614         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1615         
1616         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1617         if(oldChkpt != NULL){
1618                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1619                 CmiFree(oldmsg);
1620         }
1621         //turning off storing checkpoints
1622         
1623         int sendingPE = chkMsg->PE;
1624         
1625         CpvAccess(_storedCheckpointData)->buf = chkpt;
1626         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1627         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1628
1629 #ifdef CHECKPOINT_DISK
1630         //store the checkpoint on disk
1631         writeCheckpointToDisk(chkMsg->dataSize,chkpt);
1632         CpvAccess(_storedCheckpointData)->buf = NULL;
1633         CmiFree(msg);
1634 #endif
1635
1636         int count=0;
1637         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1638                 if(migratedNoticeList[j].fromPE == sendingPE){
1639                         migratedNoticeList[j].ackFrom = 1;
1640                 }else{
1641                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1642                 }
1643                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1644                         migratedNoticeList.remove(j);
1645                         count++;
1646                 }
1647                 
1648         }
1649         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1650         
1651         CheckPointAck ackMsg;
1652         ackMsg.PE = CkMyPe();
1653         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1654         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1655         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1656         
1657         
1658         
1659         traceUserBracketEvent(29,_startTime,CkWallTimer());
1660 };
1661
1662
1663 void sendRemoveLogRequests(){
1664         double _startTime = CkWallTimer();      
1665         //send out the messages asking senders to throw away message logs below a certain ticket number
1666         /*
1667                 The remove log request message looks like
1668                 |RemoveLogRequest||List of TProcessedLog|
1669         */
1670         int totalSize = sizeof(RemoveLogRequest)+processedTicketLog.size()*sizeof(TProcessedLog);
1671         char *requestMsg = (char *)CmiAlloc(totalSize);
1672         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1673         request->PE = CkMyPe();
1674         request->numberObjects = processedTicketLog.size();
1675         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1676         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1677         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1678         
1679         DEBUG(CmiMemoryCheck());
1680         for(int i=0;i<CkNumPes();i++){
1681                 CmiSyncSend(i,totalSize,requestMsg);
1682         }
1683         CmiFree(requestMsg);
1684
1685         clearUpMigratedRetainedLists(CmiMyPe());
1686         //TODO: clear ticketTable
1687         
1688         traceUserBracketEvent(30,_startTime,CkWallTimer());
1689         DEBUG(CmiMemoryCheck());
1690 }
1691
1692
1693 void _checkpointAckHandler(CheckPointAck *ackMsg){
1694         DEBUG(CmiMemoryCheck());
1695         unAckedCheckpoint=0;
1696         DEBUG(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1697         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1698         if(onGoingLoadBalancing){
1699                 onGoingLoadBalancing = 0;
1700                 finishedCheckpointLoadBalancing();
1701         }else{
1702                 sendRemoveLogRequests();
1703         }
1704         CmiFree(ackMsg);
1705         
1706 };
1707
1708 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1709         DEBUG(CmiMemoryCheck());
1710         CmiMemoryCheck();
1711         char *data = (char *)_data;
1712         RemoveLogRequest *request = (RemoveLogRequest *)data;
1713         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1714         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1715
1716         int count=0;
1717         for(int i=0;i<mlog->length();i++){
1718                 MlogEntry *logEntry = mlog->deq();
1719                 int match=0;
1720                 for(int j=0;j<request->numberObjects;j++){
1721                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed && logEntry->unackedLocal != 1)){
1722                                 //this log Entry should be removed
1723                                 match = 1;
1724                                 break;
1725                         }
1726                 }
1727                 char senderString[100],recverString[100];
1728 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1729                 if(match){
1730                         count++;
1731                         delete logEntry;
1732                 }else{
1733                         mlog->enq(logEntry);
1734                 }
1735         }
1736         if(count > 0){
1737                 char nameString[100];
1738                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1739         }
1740         DEBUG(CmiMemoryCheck());
1741         CmiMemoryCheck();
1742 }
1743
1744 void _removeProcessedLogHandler(char *requestMsg){
1745         double start = CkWallTimer();
1746         forAllCharesDo(removeProcessedLogs,requestMsg);
1747         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1748         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1749         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1750         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1751         if(request->PE == getCheckPointPE()){
1752                 TProcessedLog *list = (TProcessedLog *)(&requestMsg[sizeof(RemoveLogRequest)]);
1753                 CkQ<LocalMessageLog> *localQ = CpvAccess(_localMessageLog);
1754                 CkQ<LocalMessageLog> *tempQ = new CkQ<LocalMessageLog>;
1755                 int count=0;
1756 /*              DEBUG(for(int j=0;j<request->numberObjects;j++){)
1757                 DEBUG(char nameString[100];)
1758                         DEBUG(printf("[%d] Remove local message logs for %s with TN less than %d\n",CkMyPe(),list[j].recver.toString(nameString),list[j].tProcessed));
1759                 DEBUG(})*/
1760                 for(int i=0;i<localQ->length();i++){
1761                         LocalMessageLog localLogEntry = (*localQ)[i];
1762                         if(!fault_aware(localLogEntry.recver)){
1763                                 CmiAbort("Non fault aware logEntry recver found while clearing old local logs");
1764                         }
1765                         bool keep = true;
1766                         for(int j=0;j<request->numberObjects;j++){                              
1767                                 if(localLogEntry.recver == list[j].recver && localLogEntry.TN > 0 && localLogEntry.TN < list[j].tProcessed){
1768                                         keep = false;
1769                                         break;
1770                                 }
1771                         }       
1772                         if(keep){
1773                                 tempQ->enq(localLogEntry);
1774                         }else{
1775                                 count++;
1776                         }
1777                 }
1778                 delete localQ;
1779                 CpvAccess(_localMessageLog) = tempQ;
1780                 DEBUG(printf("[%d] %d Local logs for proc %d deleted on buddy \n",CkMyPe(),count,request->PE));
1781         }
1782
1783         /*
1784                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1785         */
1786         CmiMemoryCheck();
1787         clearUpMigratedRetainedLists(request->PE);
1788         
1789         traceUserBracketEvent(20,start,CkWallTimer());
1790         CmiFree(requestMsg);    
1791 };
1792
1793
1794 void clearUpMigratedRetainedLists(int PE){
1795         int count=0;
1796         CmiMemoryCheck();
1797         
1798         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1799                 if(migratedNoticeList[j].toPE == PE){
1800                         migratedNoticeList[j].ackTo = 1;
1801                 }
1802                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1803                         migratedNoticeList.remove(j);
1804                         count++;
1805                 }
1806         }
1807         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1808         
1809         for(int j=retainedObjectList.size()-1;j>=0;j--){
1810                 if(retainedObjectList[j]->migRecord.toPE == PE){
1811                         RetainedMigratedObject *obj = retainedObjectList[j];
1812                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1813                         retainedObjectList.remove(j);
1814                         if(obj->msg != NULL){
1815                                 CmiMemoryCheck();
1816                                 CmiFree(obj->msg);
1817                         }
1818                         delete obj;
1819                 }
1820         }
1821 }
1822
1823 /***************************************************************
1824         Restart Methods and handlers
1825 ***************************************************************/        
1826
1827 /**
1828  * Function for restarting the crashed processor.
1829  * It sets the restart flag and contacts the buddy
1830  * processor to get the latest checkpoint.
1831  */
1832 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1833         RestartRequest msg;
1834
1835         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1836
1837         // setting the restart flag
1838         _restartFlag = 1;
1839
1840         // if we are using group-based message logging, all members of the group have to be restarted
1841         if(TEAM_SIZE_MLOG > 1){
1842                 for(int i=(CkMyPe()/TEAM_SIZE_MLOG)*TEAM_SIZE_MLOG; i<((CkMyPe()/TEAM_SIZE_MLOG)+1)*TEAM_SIZE_MLOG; i++){
1843                         if(i != CkMyPe() && i < CkNumPes()){
1844                                 // sending a message to the group member
1845                                 msg.PE = CkMyPe();
1846                             CmiSetHandler(&msg,_restartHandlerIdx);
1847                             //HERE CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1848                         }
1849                 }
1850         }
1851
1852         // requesting the latest checkpoint from its buddy
1853         msg.PE = CkMyPe();
1854         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1855         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1856 };
1857
1858 /**
1859  * Function to restart this processor.
1860  * The handler is invoked by a member of its same group in message logging.
1861  */
1862 void _restartHandler(RestartRequest *restartMsg){
1863         int i;
1864         int numGroups = CkpvAccess(_groupIDTable)->size();
1865         RestartRequest msg;
1866         
1867         fprintf(stderr,"[%d] Restart-group started at %.6lf \n",CkMyPe(),CmiWallTimer());
1868
1869     // setting the restart flag
1870         _restartFlag = 1;
1871
1872         // flushing all buffers 
1873 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1874         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1875         for(int i=0;i<numGroups;i++){
1876         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1877                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1878                 obj->flushStates();
1879                 obj->ckJustMigrated();
1880         }*/
1881
1882     // requesting the latest checkpoint from its buddy
1883         msg.PE = CkMyPe();
1884         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1885         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1886 }
1887
1888
1889 /**
1890  * Gets the stored checkpoint but calls another function in the sender.
1891  */
1892 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1893
1894         // retrieving the stored checkpoint
1895         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1896
1897         // making sure it is its buddy who is requesting the checkpoint
1898         CkAssert(restartMsg->PE == storedChkpt->PE);
1899
1900         storedRequest = restartMsg;
1901         verifyAckTotal = 0;
1902
1903         for(int i=0;i<migratedNoticeList.size();i++){
1904                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1905 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1906                         if(migratedNoticeList[i].ackFrom == 0){
1907                                 //need to verify if the object actually exists .. it might not
1908                                 //have been acked but it might exist on it
1909                                 VerifyAckMsg msg;
1910                                 msg.migRecord = migratedNoticeList[i];
1911                                 msg.index = i;
1912                                 msg.fromPE = CmiMyPe();
1913                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1914                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1915                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1916                                 verifyAckTotal++;
1917                         }
1918                 }
1919         }
1920
1921         // sending the checkpoint back to its buddy     
1922         if(verifyAckTotal == 0){
1923                 sendCheckpointData(MLOG_RESTARTED);
1924         }
1925         verifyAckCount = 0;
1926 }
1927
1928 /**
1929  * Receives the checkpoint coming from its buddy.
1930  */
1931 void _recvRestartCheckpointHandler(char *_restartData){
1932         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1933         MigrationRecord *migratedAwayElements;
1934
1935         globalLBID = restartData->lbGroupID;
1936         
1937         restartData->restartWallTime *= 1000;
1938         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1939         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1940         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1941
1942         
1943         printf("[%d] Group Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1944         char *buf = &_restartData[sizeof(RestartProcessorData)];
1945         
1946         if(restartData->numMigratedAwayElements != 0){
1947                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1948                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1949                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1950                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1951         }
1952         
1953         PUP::fromMem pBuf(buf);
1954
1955         pBuf | checkpointCount;
1956
1957         CkPupROData(pBuf);
1958         CkPupGroupData(pBuf);
1959         CkPupNodeGroupData(pBuf);
1960 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
1961         pupArrayElementsSkip(pBuf,NULL);
1962         CkAssert(pBuf.size() == restartData->checkPointSize);
1963         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1964         
1965         forAllCharesDo(initializeRestart,NULL);
1966         
1967         //store the restored local message log in a vector
1968         buf = &buf[restartData->checkPointSize];        
1969         for(int i=0;i<restartData->numLocalMessages;i++){
1970                 LocalMessageLog logEntry;
1971                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
1972                 
1973                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
1974                 if(recverObj!=NULL){
1975                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
1976                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
1977                         char senderString[100];
1978                         char recverString[100];
1979                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
1980                 }else{
1981 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
1982                 }
1983                 buf = &buf[sizeof(LocalMessageLog)];
1984         }
1985
1986         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
1987         CmiFree(_restartData);  
1988         /*HERE _initDone();
1989
1990         getGlobalStep(globalLBID);
1991         
1992         countUpdateHomeAcks = 0;
1993         RestartRequest updateHomeRequest;
1994         updateHomeRequest.PE = CmiMyPe();
1995         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1996         for(int i=0;i<CmiNumPes();i++){
1997                 if(i != CmiMyPe()){
1998                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1999                 }
2000         }
2001 */
2002
2003
2004         // Send out the request to resend logged messages to all other processors
2005         CkVec<TProcessedLog> objectVec;
2006         forAllCharesDo(createObjIDList, (void *)&objectVec);
2007         int numberObjects = objectVec.size();
2008         
2009         /*
2010                 resendMsg layout |ResendRequest|Array of TProcessedLog|
2011         */
2012         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2013         char *resendMsg = (char *)CmiAlloc(totalSize);
2014         
2015
2016         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2017         resendReq->PE =CkMyPe(); 
2018         resendReq->numberObjects = numberObjects;
2019         char *objList = &resendMsg[sizeof(ResendRequest)];
2020         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
2021         
2022
2023         /* test for parallel restart migrate away object**/
2024         if(parallelRestart){
2025                 distributeRestartedObjects();
2026                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2027         }
2028         
2029         /*      To make restart work for load balancing.. should only
2030         be used when checkpoint happens along with load balancing
2031         **/
2032 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2033
2034         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2035         CpvAccess(_currentObj) = lb;
2036         lb->ReceiveDummyMigration(restartDecisionNumber);
2037
2038         sleep(10);
2039         
2040         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2041         for(int i=0;i<CkNumPes();i++){
2042                 if(i != CkMyPe()){
2043                         CmiSyncSend(i,totalSize,resendMsg);
2044                 }       
2045         }
2046         _resendMessagesHandler(resendMsg);
2047
2048
2049 }
2050
2051
2052 void CkMlogRestartDouble(void *,double){
2053         CkMlogRestart(NULL,NULL);
2054 };
2055
2056 //TML: restarting from local (group) failure
2057 void CkMlogRestartLocal(){
2058     CkMlogRestart(NULL,NULL);
2059 };
2060
2061
2062 void readCheckpointFromDisk(int size,char *buf){
2063         char fName[100];
2064         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
2065
2066         int fd = open(fName,O_RDONLY);
2067         int count=0;
2068         while(count < size){
2069                 count += read(fd,&buf[count],size-count);
2070         }
2071         close(fd);
2072         
2073 };
2074
2075
2076
2077 /**
2078  * Gets the stored checkpoint for its buddy processor.
2079  */
2080 void _getCheckpointHandler(RestartRequest *restartMsg){
2081
2082         // retrieving the stored checkpoint
2083         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
2084
2085         // making sure it is its buddy who is requesting the checkpoint
2086         CkAssert(restartMsg->PE == storedChkpt->PE);
2087
2088         storedRequest = restartMsg;
2089         verifyAckTotal = 0;
2090
2091         for(int i=0;i<migratedNoticeList.size();i++){
2092                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
2093 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
2094                         if(migratedNoticeList[i].ackFrom == 0){
2095                                 //need to verify if the object actually exists .. it might not
2096                                 //have been acked but it might exist on it
2097                                 VerifyAckMsg msg;
2098                                 msg.migRecord = migratedNoticeList[i];
2099                                 msg.index = i;
2100                                 msg.fromPE = CmiMyPe();
2101                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
2102                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
2103                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
2104                                 verifyAckTotal++;
2105                         }
2106                 }
2107         }
2108
2109         // sending the checkpoint back to its buddy     
2110         if(verifyAckTotal == 0){
2111                 sendCheckpointData(MLOG_CRASHED);
2112         }
2113         verifyAckCount = 0;
2114 }
2115
2116
2117 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
2118         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
2119         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
2120         if(rec != NULL && rec->type() == CkLocRec::local){
2121                         //this location exists on this processor
2122                         //and needs to be removed       
2123                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
2124                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
2125                         
2126                         int localIdx = localRec->getLocalIndex();
2127                         LBDatabase *lbdb = localRec->getLBDB();
2128                         LDObjHandle ldHandle = localRec->getLdHandle();
2129                                 
2130                         locMgr->setDuringMigration(true);
2131                         
2132                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
2133                         lbdb->UnregisterObj(ldHandle);
2134                         
2135                         locMgr->setDuringMigration(false);
2136                         
2137                         verifyAckedRequests++;
2138
2139         }
2140         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
2141         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
2142 };
2143
2144
2145 void _verifyAckHandler(VerifyAckMsg *verifyReply){
2146         int index =     verifyReply->index;
2147         migratedNoticeList[index] = verifyReply->migRecord;
2148         verifyAckCount++;
2149         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
2150         if(verifyAckCount == verifyAckTotal){
2151                 sendCheckpointData(MLOG_CRASHED);
2152         }
2153 }
2154
2155
2156
2157 void sendCheckpointData(int mode){      
2158         RestartRequest *restartMsg = storedRequest;
2159         StoredCheckpoint *storedChkpt =         CpvAccess(_storedCheckpointData);
2160         int numMigratedAwayElements = migratedNoticeList.size();
2161         if(migratedNoticeList.size() != 0){
2162                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
2163 //                      CkAssert(migratedNoticeList.size() == 0);
2164         }
2165         
2166         
2167         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2168         
2169         DEBUGRESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2170         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2171         
2172         CkQ<LocalMessageLog > *localMsgQ = CpvAccess(_localMessageLog);
2173         totalSize += localMsgQ->length()*sizeof(LocalMessageLog);
2174         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2175         
2176         char *msg = (char *)CmiAlloc(totalSize);
2177         
2178         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2179         dataMsg->PE = CkMyPe();
2180         dataMsg->restartWallTime = CmiTimer();
2181         dataMsg->checkPointSize = storedChkpt->bufSize;
2182         
2183         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2184 //      dataMsg->numMigratedAwayElements = 0;
2185         
2186         dataMsg->numMigratedInElements = 0;
2187         dataMsg->migratedElementSize = 0;
2188         dataMsg->lbGroupID = globalLBID;
2189         /*msg layout 
2190                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2191                 Local MessageLog|
2192         */
2193         //store checkpoint data
2194         char *buf = &msg[sizeof(RestartProcessorData)];
2195
2196         if(dataMsg->numMigratedAwayElements != 0){
2197                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2198                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2199         }
2200         
2201
2202 #ifdef CHECKPOINT_DISK
2203         readCheckpointFromDisk(storedChkpt->bufSize,buf);
2204 #else   
2205         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2206 #endif
2207         buf = &buf[storedChkpt->bufSize];
2208
2209
2210         //store localmessage Log
2211         dataMsg->numLocalMessages = localMsgQ->length();
2212         for(int i=0;i<localMsgQ->length();i++){
2213                 if(!fault_aware(((*localMsgQ)[i]).recver )){
2214                         CmiAbort("Non fault aware localMsgQ");
2215                 }
2216                 memcpy(buf,&(*localMsgQ)[i],sizeof(LocalMessageLog));
2217                 buf = &buf[sizeof(LocalMessageLog)];
2218         }
2219         
2220         if(mode == MLOG_RESTARTED){
2221                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2222                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2223                 CmiFree(restartMsg);
2224         }else{
2225                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2226                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2227                 CmiFree(restartMsg);
2228         }
2229 };
2230
2231
2232 // this list is used to create a vector of the object ids of all
2233 //the chares on this processor currently and the highest TN processed by them 
2234 //the first argument is actually a CkVec<TProcessedLog> *
2235 void createObjIDList(void *data,ChareMlogData *mlogData){
2236         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2237         TProcessedLog entry;
2238         entry.recver = mlogData->objID;
2239         entry.tProcessed = mlogData->tProcessed;
2240         list->push_back(entry);
2241         char objString[100];
2242         DEBUG(printf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2243 }
2244
2245
2246 /**
2247  * Receives the checkpoint data from its buddy, restores the state of all the objects
2248  * and asks everyone else to update its home.
2249  */
2250 void _recvCheckpointHandler(char *_restartData){
2251         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2252         MigrationRecord *migratedAwayElements;
2253
2254         globalLBID = restartData->lbGroupID;
2255         
2256         restartData->restartWallTime *= 1000;
2257         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2258         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2259         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2260
2261         
2262         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2263         char *buf = &_restartData[sizeof(RestartProcessorData)];
2264         
2265         if(restartData->numMigratedAwayElements != 0){
2266                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2267                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2268                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2269                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2270         }
2271         
2272         PUP::fromMem pBuf(buf);
2273
2274         pBuf | checkpointCount;
2275
2276         CkPupROData(pBuf);
2277         CkPupGroupData(pBuf);
2278         CkPupNodeGroupData(pBuf);
2279 //      pupArrayElementsSkip(pBuf,migratedAwayElements,restartData->numMigratedAwayElements);
2280         pupArrayElementsSkip(pBuf,NULL);
2281         CkAssert(pBuf.size() == restartData->checkPointSize);
2282         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2283         
2284         forAllCharesDo(initializeRestart,NULL);
2285         
2286         //store the restored local message log in a vector
2287         buf = &buf[restartData->checkPointSize];        
2288         for(int i=0;i<restartData->numLocalMessages;i++){
2289                 LocalMessageLog logEntry;
2290                 memcpy(&logEntry,buf,sizeof(LocalMessageLog));
2291                 
2292                 Chare *recverObj = (Chare *)logEntry.recver.getObject();
2293                 if(recverObj!=NULL){
2294                         recverObj->mlogData->addToRestoredLocalQ(&logEntry);
2295                         recverObj->mlogData->receivedTNs->push_back(logEntry.TN);
2296                         char senderString[100];
2297                         char recverString[100];
2298                         DEBUGRESTART(printf("[%d] Received local message log sender %s recver %s SN %d  TN %d\n",CkMyPe(),logEntry.sender.toString(senderString),logEntry.recver.toString(recverString),logEntry.SN,logEntry.TN));
2299                 }else{
2300 //                      DEBUGRESTART(printf("Object receiving local message doesnt exist on restarted processor .. ignoring it"));
2301                 }
2302                 buf = &buf[sizeof(LocalMessageLog)];
2303         }
2304
2305         forAllCharesDo(sortRestoredLocalMsgLog,NULL);
2306
2307         CmiFree(_restartData);
2308         
2309         
2310         _initDone();
2311
2312         getGlobalStep(globalLBID);
2313         
2314         countUpdateHomeAcks = 0;
2315         RestartRequest updateHomeRequest;
2316         updateHomeRequest.PE = CmiMyPe();
2317         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
2318         for(int i=0;i<CmiNumPes();i++){
2319                 if(i != CmiMyPe()){
2320                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
2321                 }
2322         }
2323
2324 }
2325
2326 /**
2327  * Receives the updateHome ACKs from all other processors. Once everybody
2328  * has replied, it sends a request to resend the logged messages.
2329  */
2330 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2331         countUpdateHomeAcks++;
2332         CmiFree(updateHomeAck);
2333         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2334         if(countUpdateHomeAcks != CmiNumPes()){
2335                 return;
2336         }
2337
2338         // Send out the request to resend logged messages to all other processors
2339         CkVec<TProcessedLog> objectVec;
2340         forAllCharesDo(createObjIDList, (void *)&objectVec);
2341         int numberObjects = objectVec.size();
2342         
2343         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2344         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2345         char *resendMsg = (char *)CmiAlloc(totalSize);  
2346
2347         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2348         resendReq->PE =CkMyPe(); 
2349         resendReq->numberObjects = numberObjects;
2350         char *objList = &resendMsg[sizeof(ResendRequest)];
2351         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2352
2353         /* test for parallel restart migrate away object**/
2354         if(parallelRestart){
2355                 distributeRestartedObjects();
2356                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2357         }
2358         
2359         /*      To make restart work for load balancing.. should only
2360         be used when checkpoint happens along with load balancing
2361         **/
2362 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2363
2364         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2365         CpvAccess(_currentObj) = lb;
2366         lb->ReceiveDummyMigration(restartDecisionNumber);
2367
2368         sleep(10);
2369         
2370         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2371         for(int i=0;i<CkNumPes();i++){
2372                 if(i != CkMyPe()){
2373                         CmiSyncSend(i,totalSize,resendMsg);
2374                 }       
2375         }
2376         _resendMessagesHandler(resendMsg);
2377         CmiFree(resendMsg);
2378 };
2379
2380 /**
2381  * @brief Initializes variables and flags for restarting procedure.
2382  */
2383 void initializeRestart(void *data, ChareMlogData *mlogData){
2384         mlogData->resendReplyRecvd = 0;
2385         mlogData->receivedTNs = new CkVec<MCount>;
2386         mlogData->restartFlag = 1;
2387         mlogData->restoredLocalMsgLog.removeAll();
2388         mlogData->mapTable.empty();
2389 };
2390
2391 /**
2392  * Updates the homePe of chare array elements.
2393  */
2394 void updateHomePE(void *data,ChareMlogData *mlogData){
2395         RestartRequest *updateRequest = (RestartRequest *)data;
2396         int PE = updateRequest->PE; //restarted PE
2397         //if this object is an array Element and its home is the restarted processor
2398         // the home processor needs to know its current location
2399         if(mlogData->objID.type == TypeArray){
2400                 //it is an array element
2401                 CkGroupID myGID = mlogData->objID.data.array.id;
2402                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asMax();
2403                 CkArrayID aid(mlogData->objID.data.array.id);           
2404                 //check if the restarted processor is the home processor for this object
2405                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2406                 if(locMgr->homePe(myIdx) == PE){
2407                         DEBUGRESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2408                         DEBUGRESTART(myIdx.print());
2409                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2410                 }
2411         }
2412 };
2413
2414
2415 /**
2416  * Updates the homePe for all chares in this processor.
2417  */
2418 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2419         int sender = updateRequest->PE;
2420         
2421         forAllCharesDo(updateHomePE,updateRequest);
2422         
2423         updateRequest->PE = CmiMyPe();
2424         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2425         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2426         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2427                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2428                 checkpointCount--;
2429                 startMlogCheckpoint(NULL,0);
2430         }
2431         if(sender == getCheckPointPE()){
2432                 for(int i=0;i<retainedObjectList.size();i++){
2433                         if(retainedObjectList[i]->acked == 0){
2434                                 MigrationNotice migMsg;
2435                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2436                                 migMsg.record = retainedObjectList[i];
2437                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2438                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2439                         }
2440                 }
2441         }
2442 }
2443
2444 /**
2445  * @brief Fills up the ticket vector for each chare.
2446  */
2447 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2448         ResendData *resendData = (ResendData *)data;
2449         int PE = resendData->PE; //restarted PE
2450         int count=0;
2451         CkHashtableIterator *iterator;
2452         void *objp;
2453         void *objkey;
2454         CkObjID *objID;
2455         SNToTicket *snToTicket;
2456         Ticket ticket;
2457         
2458         // traversing the team table looking up for the maximum TN received     
2459         iterator = mlogData->teamTable.iterator();
2460         while( (objp = iterator->next(&objkey)) != NULL ){
2461                 objID = (CkObjID *)objkey;
2462         
2463                 // traversing the resendData structure to add ticket numbers
2464                 for(int j=0;j<resendData->numberObjects;j++){
2465                         if((*objID) == (resendData->listObjects)[j].recver){
2466 char name[100];
2467                                 snToTicket = *(SNToTicket **)objp;
2468 CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2469                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2470 CkPrintf("[%d] ---> Zero\n",CkMyPe());
2471                                         ticket = snToTicket->get(snIndex);      
2472                                         if(ticket.TN > resendData->maxTickets[j]){
2473 CkPrintf("[%d] ---> One\n",CkMyPe());
2474                                                 resendData->maxTickets[j] = ticket.TN;
2475                                         }
2476                                         if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2477 CkPrintf("[%d] ---> Two\n",CkMyPe());
2478                                                 //store the TNs that have been since the recver last checkpointed
2479                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2480                                         }
2481                                 }
2482                         }
2483                 }
2484         }
2485
2486         //releasing the memory for the iterator
2487         delete iterator;
2488 }
2489
2490 //the data argument is of type ResendData which contains the 
2491 //array of objects on  the restartedProcessor
2492 //this method resends the messages stored in this chare's message log 
2493 //to the restarted processor. It also accumulates the maximum TN
2494 //for all the objects on the restarted processor
2495 void resendMessageForChare(void *data,ChareMlogData *mlogData){
2496         char nameString[100];
2497         ResendData *resendData = (ResendData *)data;
2498         int PE = resendData->PE; //restarted PE
2499         DEBUGRESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2500         int count=0;
2501         int ticketRequests=0;
2502         CkQ<MlogEntry *> *log = mlogData->getMlog();
2503         
2504         for(int i=0;i<log->length();i++){
2505                 MlogEntry *logEntry = (*log)[i];
2506                 
2507                 // if we sent out the logs of a local message to buddy and he crashed
2508                 //before acking
2509                 envelope *env = logEntry->env;
2510                 if(env == NULL){
2511                         continue;
2512                 }
2513                 if(logEntry->unackedLocal){
2514                         char recverString[100];
2515                         DEBUGRESTART(printf("[%d] Resend Local unacked message from %s to %s SN %d TN %d \n",CkMyPe(),env->sender.toString(nameString),env->recver.toString(recverString),env->SN,env->TN);)
2516                         sendLocalMessageCopy(logEntry);
2517                 }
2518                 //looks like near a crash messages between uninvolved processors can also get lost. Resend ticket requests as a result
2519                 if(env->TN <= 0){
2520                         //ticket not yet replied send it out again
2521                         sendTicketRequest(env->sender,env->recver,logEntry->destPE,logEntry,env->SN,1);
2522                 }
2523                 
2524                 if(env->recver.type != TypeInvalid){
2525                         int flag = 0;//marks if any of the restarted objects matched this log entry
2526                         for(int j=0;j<resendData->numberObjects;j++){
2527                                 if(env->recver == (resendData->listObjects)[j].recver){
2528                                         flag = 1;
2529                                         //message has a valid TN
2530                                         if(env->TN > 0){
2531                                                 //store maxTicket
2532                                                 if(env->TN > resendData->maxTickets[j]){
2533                                                         resendData->maxTickets[j] = env->TN;
2534                                                 }
2535                                                 //if the TN for this entry is more than the TN processed, send the message out
2536                                                 if(env->TN >= (resendData->listObjects)[j].tProcessed){
2537                                                         //store the TNs that have been since the recver last checkpointed
2538                                                         resendData->ticketVecs[j].push_back(env->TN);
2539                                                         
2540                                                         if(PE != CkMyPe()){
2541                                                                 if(env->recver.type == TypeNodeGroup){
2542                                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2543                                                                 }else{
2544                                                                         CmiSetHandler(env,CmiGetXHandler(env));
2545                                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2546                                                                 }
2547                                                         }else{
2548                                                                 envelope *copyEnv = copyEnvelope(env);
2549                                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2550                                                         }
2551                                                         char senderString[100];
2552                                                         DEBUGRESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2553                                                         count++;
2554                                                 }       
2555                                         }else{
2556 /*                                      //the message didnt get a ticket the last time and needs to start with a ticket request
2557                                                 DEBUGRESTART(printf("[%d] Resent ticket request SN %d to %s needs ticket at %d in logQ \n",CkMyPe(),env->SN,env->recver.toString(nameString),i));
2558                                                 //generateCommonTicketRequest(env->recver,env,PE,logEntry->_infoIdx);                                           
2559                                                 CkAssert(logEntry->destPE != CkMyPe());
2560                                                 
2561                                                 sendTicketRequest(env->sender,env->recver,PE,logEntry,env->SN,1);
2562                                                 
2563                                                 ticketRequests++;*/
2564                                         }
2565                                 }
2566                         }//end of for loop of objects
2567                         
2568                 }       
2569         }
2570         DEBUGRESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)       
2571 }
2572
2573 /**
2574  * Resends the messages since the last checkpoint to the list of objects included in the 
2575  * request.
2576  */
2577 void _resendMessagesHandler(char *msg){
2578         ResendData d;
2579         ResendRequest *resendReq = (ResendRequest *)msg;
2580
2581         // building the reply message
2582         char *listObjects = &msg[sizeof(ResendRequest)];
2583         d.numberObjects = resendReq->numberObjects;
2584         d.PE = resendReq->PE;
2585         d.listObjects = (TProcessedLog *)listObjects;
2586         d.maxTickets = new MCount[d.numberObjects];
2587         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2588         for(int i=0;i<d.numberObjects;i++){
2589                 d.maxTickets[i] = 0;
2590         }
2591
2592         //Check if any of the retained objects need to be recreated
2593         //If they have not been recreated on the restarted processor
2594         //they need to be recreated on this processor
2595         int count=0;
2596         for(int i=0;i<retainedObjectList.size();i++){
2597                 if(retainedObjectList[i]->migRecord.toPE == d.PE){
2598                         count++;
2599                         int recreate=1;
2600                         for(int j=0;j<d.numberObjects;j++){
2601                                 if(d.listObjects[j].recver.type != TypeArray ){
2602                                         continue;
2603                                 }
2604                                 CkArrayID aid(d.listObjects[j].recver.data.array.id);           
2605                                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2606                                 if(retainedObjectList[i]->migRecord.gID == locMgr->getGroupID()){
2607                                         if(retainedObjectList[i]->migRecord.idx == d.listObjects[j].recver.data.array.idx.asMax()){
2608                                                 recreate = 0;
2609                                                 break;
2610                                         }
2611                                 }
2612                         }
2613                         CmiPrintf("[%d] Object migrated away but did not checkpoint recreate %d locmgrid %d idx %s\n",CmiMyPe(),recreate,retainedObjectList[i]->migRecord.gID.idx,idx2str(retainedObjectList[i]->migRecord.idx));
2614                         if(recreate){
2615                                 donotCountMigration=1;
2616                                 _receiveMlogLocationHandler(retainedObjectList[i]->msg);
2617                                 donotCountMigration=0;
2618                                 CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(retainedObjectList[i]->migRecord.gID).getObj();
2619                                 int homePE = locMgr->homePe(retainedObjectList[i]->migRecord.idx);
2620                                 informLocationHome(retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,homePE,CmiMyPe());
2621                                 sendDummyMigration(d.PE,globalLBID,retainedObjectList[i]->migRecord.gID,retainedObjectList[i]->migRecord.idx,CmiMyPe());
2622                                 CkLocRec *rec = locMgr->elementRec(retainedObjectList[i]->migRecord.idx);
2623                                 CmiAssert(rec->type() == CkLocRec::local);
2624                                 CkVec<CkMigratable *> eltList;
2625                                 locMgr->migratableList((CkLocRec_local *)rec,eltList);
2626                                 for(int j=0;j<eltList.size();j++){
2627                                         if(eltList[j]->mlogData->toResumeOrNot == 1 && eltList[j]->mlogData->resumeCount < globalResumeCount){
2628                                                 CpvAccess(_currentObj) = eltList[j];
2629                                                 eltList[j]->ResumeFromSync();
2630                                         }
2631                                 }
2632                                 retainedObjectList[i]->msg=NULL;        
2633                         }
2634                 }
2635         }
2636         
2637         if(count > 0){
2638 //              CmiAbort("retainedObjectList for restarted processor not empty");
2639         }
2640         
2641         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2642
2643
2644         //TML: examines the origin processor to determine if it belongs to the same group.
2645         // In that case, it only returns the maximum ticket received for each object in the list.
2646         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2647                 forAllCharesDo(fillTicketForChare,&d);
2648         else
2649                 forAllCharesDo(resendMessageForChare,&d);
2650
2651         //send back the maximum ticket number for a message sent to each object on the 
2652         //restarted processor
2653         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2654         
2655         int totalTNStored=0;
2656         for(int i=0;i<d.numberObjects;i++){
2657                 totalTNStored += d.ticketVecs[i].size();
2658         }
2659         
2660         int totalSize = sizeof(ResendRequest)+d.numberObjects*(sizeof(CkObjID)+sizeof(int)) + totalTNStored*sizeof(MCount);
2661         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2662         
2663         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2664         resendReply->PE = CkMyPe();
2665         resendReply->numberObjects = d.numberObjects;
2666         
2667         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2668         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2669         for(int i=0;i<d.numberObjects;i++){
2670                 replyObjects[i] = d.listObjects[i].recver;
2671         }
2672         
2673         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2674         for(int i=0;i<d.numberObjects;i++){
2675                 int vecsize = d.ticketVecs[i].size();
2676                 memcpy(ticketList,&vecsize,sizeof(int));
2677                 ticketList = &ticketList[sizeof(int)];
2678                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2679                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2680         }       
2681
2682         CmiSetHandler(resendReplyMsg,_resendReplyHandlerIdx);
2683         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2684         
2685 /*      
2686         if(verifyAckRequestsUnacked){
2687                 CmiPrintf("[%d] verifyAckRequestsUnacked %d call dummy migrates\n",CmiMyPe(),verifyAckRequestsUnacked);
2688                 for(int i=0;i<verifyAckRequestsUnacked;i++){
2689                         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2690                         LDObjHandle h;
2691                         lb->Migrated(h,1);
2692                 }
2693         }
2694         
2695         verifyAckRequestsUnacked=0;*/
2696         
2697         delete [] d.maxTickets;
2698         delete [] d.ticketVecs;
2699         if(resendReq->PE != CkMyPe()){
2700                 CmiFree(msg);
2701         }       
2702 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2703         lastRestart = CmiWallTimer();
2704 }
2705
2706 void sortVec(CkVec<MCount> *TNvec);
2707 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2708
2709 /**
2710  * @brief Receives the tickets assigned to message to other objects.
2711  */
2712 void _resendReplyHandler(char *msg){    
2713         /**
2714                 need to rewrite this method to deal with parallel restart
2715         */
2716         ResendRequest *resendReply = (ResendRequest *)msg;
2717         CkObjID *listObjects = (CkObjID *)( &msg[sizeof(ResendRequest)]);
2718
2719         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2720         
2721         DEBUGRESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2722         for(int i =0; i< resendReply->numberObjects;i++){       
2723                 Chare *obj = (Chare *)listObjects[i].getObject();
2724                 
2725                 int vecsize;
2726                 memcpy(&vecsize,listTickets,sizeof(int));
2727                 listTickets = &listTickets[sizeof(int)];
2728                 MCount *listTNs = (MCount *)listTickets;        
2729                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2730                 
2731                 if(obj != NULL){
2732                         //the object was restarted on the processor on which it existed
2733                         processReceivedTN(obj,vecsize,listTNs);
2734                 }else{
2735                 //pack up objID vecsize and listTNs and send it to the correct processor
2736                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2737                         char *TNMsg = (char *)CmiAlloc(totalSize);
2738                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2739                         receivedTNData->recver = listObjects[i];
2740                         receivedTNData->numTNs = vecsize;
2741                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2742                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2743
2744                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2745                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2746                 }
2747                 
2748         }
2749 };
2750
2751 void _receivedTNDataHandler(ReceivedTNData *msg){
2752         char objName[100];
2753         Chare *obj = (Chare *) msg->recver.getObject();
2754         if(obj){                
2755                 char *_msg = (char *)msg;
2756                 DEBUGRESTART(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2757                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2758                 processReceivedTN(obj,msg->numTNs,listTNs);
2759         }else{
2760                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2761                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2762         }
2763 };
2764
2765 /**
2766  * @brief Processes the received list of tickets from a particular PE.
2767  */
2768 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2769         // increases the number of resendReply received
2770         obj->mlogData->resendReplyRecvd++;
2771         
2772         // includes the tickets into the receivedTN structure
2773         for(int j=0;j<listSize;j++){
2774                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2775         }
2776         
2777         //if this object has received all the replies find the ticket numbers
2778         //that senders know about. Those less than the ticket number processed 
2779         //by the receiver can be thrown away. The rest need not be consecutive
2780         // ie there can be holes in the list of ticket numbers seen by senders
2781         if(obj->mlogData->resendReplyRecvd == CkNumPes()){
2782                 obj->mlogData->resendReplyRecvd = 0;
2783                 //sort the received TNS
2784                 sortVec(obj->mlogData->receivedTNs);
2785         
2786                 //after all the received tickets are in we need to sort them and then 
2787                 // calculate the holes  
2788                 if(obj->mlogData->receivedTNs->size() > 0){
2789                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2790                         int vecsize = obj->mlogData->receivedTNs->size();
2791                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2792                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2793                         if(numberHoles == 0){
2794                         }else{
2795                                 char objName[100];                                      
2796                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2797                                 obj->mlogData->numberHoles = numberHoles;
2798                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2799                                 int countHoles=0;
2800                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2801                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2802                                                 //the TNs are not consecutive at this point
2803                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2804                                                         printf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]);
2805                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2806                                                         countHoles++;
2807                                                 }       
2808                                         }
2809                                 }
2810                                 //Holes have been given new TN
2811                                 if(countHoles != numberHoles){
2812                                         char str[100];
2813                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2814                                 }
2815                                 CkAssert(countHoles == numberHoles);                                    
2816                                 obj->mlogData->currentHoles = numberHoles;
2817                         }
2818                 }
2819         
2820                 // cleaning up structures and getting ready to continue execution       
2821                 delete obj->mlogData->receivedTNs;
2822                 obj->mlogData->receivedTNs = NULL;
2823                 obj->mlogData->restartFlag = 0;
2824
2825                 DEBUGRESTART(char objString[100]);
2826                 DEBUGRESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2827         }
2828
2829 }
2830
2831
2832 void sortVec(CkVec<MCount> *TNvec){
2833         //sort it ->its bloddy bubble sort
2834         //TODO: use quicksort
2835         for(int i=0;i<TNvec->size();i++){
2836                 for(int j=i+1;j<TNvec->size();j++){
2837                         if((*TNvec)[j] < (*TNvec)[i]){
2838                                 MCount temp;
2839                                 temp = (*TNvec)[i];
2840                                 (*TNvec)[i] = (*TNvec)[j];
2841                                 (*TNvec)[j] = temp;
2842                         }
2843                 }
2844         }
2845         //make it unique .. since its sorted all equal units will be consecutive
2846         MCount *tempArray = new MCount[TNvec->size()];
2847         int     uniqueCount=-1;
2848         for(int i=0;i<TNvec->size();i++){
2849                 tempArray[i] = 0;
2850                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2851                         uniqueCount++;
2852                         tempArray[uniqueCount] = (*TNvec)[i];
2853                 }
2854         }
2855         uniqueCount++;
2856         TNvec->removeAll();
2857         for(int i=0;i<uniqueCount;i++){
2858                 TNvec->push_back(tempArray[i]);
2859         }
2860         delete [] tempArray;
2861 }       
2862
2863 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2864         if(TNVec->size() == 0){
2865                 return -1; //not found in an empty vec
2866         }
2867         //binary search to find 
2868         int left=0;
2869         int right = TNVec->size();
2870         int mid = (left +right)/2;
2871         while(searchTN != (*TNVec)[mid] && left < right){
2872                 if((*TNVec)[mid] > searchTN){
2873                         right = mid-1;
2874                 }else{
2875                         left = mid+1;
2876                 }
2877                 mid = (left + right)/2;
2878         }
2879         if(left < right){
2880                 //mid is the element to be returned
2881                 return mid;
2882         }else{
2883                 if(mid < TNVec->size() && mid >=0){
2884                         if((*TNVec)[mid] == searchTN){
2885                                 return mid;
2886                         }else{
2887                                 return -1;
2888                         }
2889                 }else{
2890                         return -1;
2891                 }
2892         }
2893 };
2894
2895
2896 /*
2897         Method to do parallel restart. Distribute some of the array elements to other processors.
2898         The problem is that we cant use to charm entry methods to do migration as it will get
2899         stuck in the protocol that is going to restart
2900 */
2901
2902 class ElementDistributor: public CkLocIterator{
2903         CkLocMgr *locMgr;
2904         int *targetPE;
2905         void pupLocation(CkLocation &loc,PUP::er &p){
2906                 CkArrayIndexMax idx=loc.getIndex();
2907                 CkGroupID gID = locMgr->ckGetGroupID();
2908                 p|gID;      // store loc mgr's GID as well for easier restore
2909                 p|idx;
2910                 p|loc;
2911         };
2912         public:
2913                 ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2914                 void addLocation(CkLocation &loc){
2915                         if(*targetPE == CkMyPe()){
2916                                 *targetPE = (*targetPE +1)%CkNumPes();                          
2917                                 return;
2918                         }
2919                         
2920                         CkArrayIndexMax idx=loc.getIndex();
2921                         CkLocRec_local *rec = loc.getLocalRecord();
2922                         
2923                         CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2924                         idx.print();
2925                         
2926
2927                         //TODO: an element that is being moved should leave some trace behind so that
2928                         // the arraybroadcaster can forward messages to it
2929                         
2930                         //pack up this location and send it across
2931                         PUP::sizer psizer;
2932                         pupLocation(loc,psizer);
2933                         int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2934                         char *msg = (char *)CmiAlloc(totalSize);
2935                         char *buf = &msg[CmiMsgHeaderSizeBytes];
2936                         PUP::toMem pmem(buf);
2937                         pmem.becomeDeleting();
2938                         pupLocation(loc,pmem);
2939                         
2940                         locMgr->setDuringMigration(CmiTrue);                    
2941                         delete rec;
2942                         locMgr->setDuringMigration(CmiFalse);                   
2943                         locMgr->inform(idx,*targetPE);
2944
2945                         CmiSetHandler(msg,_distributedLocationHandlerIdx);
2946                         CmiSyncSendAndFree(*targetPE,totalSize,msg);
2947
2948                         CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2949                         //decide on the target processor for the next object
2950                         *targetPE = (*targetPE +1)%CkNumPes();
2951                 }
2952                 
2953 };
2954
2955 void distributeRestartedObjects(){
2956         int numGroups = CkpvAccess(_groupIDTable)->size();      
2957         int i;
2958         int targetPE=CkMyPe();
2959         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2960 };
2961
2962 void _distributedLocationHandler(char *receivedMsg){
2963         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2964         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2965         PUP::fromMem pmem(buf);
2966         CkGroupID gID;
2967         CkArrayIndexMax idx;
2968         pmem |gID;
2969         pmem |idx;
2970         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2971         donotCountMigration=1;
2972         mgr->resume(idx,pmem);
2973         donotCountMigration=0;
2974         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2975         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2976         idx.print();
2977
2978         CkLocRec *rec = mgr->elementRec(idx);
2979         CmiAssert(rec->type() == CkLocRec::local);
2980         
2981         CkVec<CkMigratable *> eltList;
2982         mgr->migratableList((CkLocRec_local *)rec,eltList);
2983         for(int i=0;i<eltList.size();i++){
2984                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2985                         CpvAccess(_currentObj) = eltList[i];
2986                         eltList[i]->ResumeFromSync();
2987                 }
2988         }
2989         
2990         
2991 }
2992
2993
2994 /** this method is used to send messages to a restarted processor to tell
2995  * it that a particular expected object is not going to get to it */
2996 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2997         DummyMigrationMsg buf;
2998         buf.flag = MLOG_OBJECT;
2999         buf.lbID = lbID;
3000         buf.mgrID = locMgrID;
3001         buf.idx = idx;
3002         buf.locationPE = locationPE;
3003         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3004         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3005 };
3006
3007
3008 /**this method is used by a restarted processor to tell other processors
3009  * that they are not going to receive these many objects.. just the count
3010  * not the objects themselves ***/
3011
3012 void sendDummyMigrationCounts(int *dummyCounts){
3013         DummyMigrationMsg buf;
3014         buf.flag = MLOG_COUNT;
3015         buf.lbID = globalLBID;
3016         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3017         for(int i=0;i<CmiNumPes();i++){
3018                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3019                         buf.count = dummyCounts[i];
3020                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3021                 }
3022         }
3023 }
3024
3025
3026 /** this handler is used to process a dummy migration msg.
3027  * it looks up the load balancer and calls migrated for it */
3028
3029 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3030         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3031         if(msg->flag == MLOG_OBJECT){
3032                 DEBUGRESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3033                 LDObjHandle h;
3034                 lb->Migrated(h,1);
3035         }
3036         if(msg->flag == MLOG_COUNT){
3037                 DEBUGRESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3038                 msg->count -= verifyAckedRequests;
3039                 for(int i=0;i<msg->count;i++){
3040                         LDObjHandle h;
3041                         lb->Migrated(h,1);
3042                 }
3043         }
3044         verifyAckedRequests=0;
3045         CmiFree(msg);
3046 };
3047
3048
3049
3050
3051
3052
3053
3054 /*****************************************************
3055         Implementation of a method that can be used to call
3056         any method on the ChareMlogData of all the chares on
3057         a processor currently
3058 ******************************************************/
3059
3060
3061 class ElementCaller :  public CkLocIterator {
3062 private:
3063         CkLocMgr *locMgr;
3064         MlogFn fnPointer;
3065         void *data;
3066 public:
3067         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3068                 locMgr = _locMgr;
3069                 fnPointer = _fnPointer;
3070                 data = _data;
3071         };
3072         void addLocation(CkLocation &loc){
3073                 CkVec<CkMigratable *> list;
3074                 CkLocRec_local *local = loc.getLocalRecord();
3075                 locMgr->migratableList (local,list);
3076                 for(int i=0;i<list.size();i++){
3077                         CkMigratable *migratableElement = list[i];
3078                         fnPointer(data,migratableElement->mlogData);
3079                 }
3080         }
3081 };
3082
3083 void forAllCharesDo(MlogFn fnPointer,void *data){
3084         int numGroups = CkpvAccess(_groupIDTable)->size();
3085         for(int i=0;i<numGroups;i++){
3086                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3087                 fnPointer(data,obj->mlogData);
3088         }
3089         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3090         for(int i=0;i<numNodeGroups;i++){
3091                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3092                 fnPointer(data,obj->mlogData);
3093         }
3094         int i;
3095         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3096         
3097         
3098 };
3099
3100
3101 /******************************************************************
3102  Load Balancing
3103 ******************************************************************/
3104
3105 void initMlogLBStep(CkGroupID gid){
3106         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3107         countLBMigratedAway = 0;
3108         countLBToMigrate=0;
3109         onGoingLoadBalancing=1;
3110         migrationDoneCalled=0;
3111         checkpointBarrierCount=0;
3112         if(globalLBID.idx != 0){
3113                 CmiAssert(globalLBID.idx == gid.idx);
3114         }
3115         globalLBID = gid;
3116 }
3117
3118 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3119         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3120         
3121         resumeLbFnPtr = _fnPtr;
3122         centralLb = _centralLb;
3123         migrationDoneCalled = 1;
3124         if(countLBToMigrate == countLBMigratedAway){
3125                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3126                 startMlogCheckpoint(NULL,CmiWallTimer());       
3127         }
3128 };
3129
3130 void finishedCheckpointLoadBalancing(){
3131         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3132         CheckpointBarrierMsg msg;
3133         msg.fromPE = CmiMyPe();
3134         msg.checkpointCount = checkpointCount;
3135
3136         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3137         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3138         
3139 };
3140
3141
3142 void sendMlogLocation(int targetPE,envelope *env){
3143         void *_msg = EnvToUsr(env);
3144         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3145
3146
3147         int existing = 0;
3148         //if this object is already in the retainedobjectlust destined for this
3149         //processor it should not be sent
3150         
3151         for(int i=0;i<retainedObjectList.size();i++){
3152                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3153                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3154                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3155                         existing = 1;
3156                         break;
3157                 }
3158         }
3159
3160         if(existing){
3161                 return;
3162         }
3163         
3164         
3165         countLBToMigrate++;
3166         
3167         MigrationNotice migMsg;
3168         migMsg.migRecord.gID = msg->gid;
3169         migMsg.migRecord.idx = msg->idx;
3170         migMsg.migRecord.fromPE = CkMyPe();
3171         migMsg.migRecord.toPE =  targetPE;
3172         
3173         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3174         
3175         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3176         retainedObject->migRecord = migMsg.migRecord;
3177         retainedObject->acked  = 0;
3178         
3179         CkPackMessage(&env);
3180         
3181         migMsg.record = retainedObject;
3182         retainedObject->msg = env;
3183         int size = retainedObject->size = env->getTotalsize();
3184         
3185         retainedObjectList.push_back(retainedObject);
3186         
3187         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3188         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3189         
3190         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3191
3192 }
3193
3194 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3195         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3196         migratedNoticeList.push_back(msg->migRecord);
3197
3198         MigrationNoticeAck buf;
3199         buf.record = msg->record;
3200         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3201         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3202 }
3203
3204 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3205         
3206         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3207         retainedObject->acked = 1;
3208
3209         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3210         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3211
3212         //inform home about the new location of this object
3213         CkGroupID gID = retainedObject->migRecord.gID ;
3214         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3215         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3216         
3217         countLBMigratedAway++;
3218         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3219                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3220                 startMlogCheckpoint(NULL,CmiWallTimer());
3221         }
3222 };
3223
3224 void _receiveMlogLocationHandler(void *buf){
3225         envelope *env = (envelope *)buf;
3226         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3227         CkUnpackMessage(&env);
3228         void *_msg = EnvToUsr(env);
3229         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3230         CkGroupID gID= msg->gid;
3231         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3232         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3233         CpvAccess(_currentObj)=mgr;
3234         mgr->immigrate(msg);
3235 };
3236
3237
3238 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3239 /*      if(mlogData->objID.type == TypeArray){
3240                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3241         //      TODO: make sure later that atSync has been called and it needs 
3242         //      to be resumed from sync
3243         //
3244                 CpvAccess(_currentObj) = elt;
3245                 elt->ResumeFromSync();
3246         }*/
3247 }
3248
3249 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3250         if(checkpointBarrierCount == CmiNumPes()){
3251                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3252                 for(int i=0;i<CmiNumPes();i++){
3253                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3254                 }
3255         }
3256 }
3257
3258 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3259         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3260         if(msg->checkpointCount == checkpointCount){
3261                 checkpointBarrierCount++;
3262                 checkAndSendCheckpointBarrierAcks(msg);
3263         }else{
3264                 if(msg->checkpointCount-1 == checkpointCount){
3265                         checkpointBarrierCount++;
3266                         checkAndSendCheckpointBarrierAcks(msg);
3267                 }else{
3268                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3269                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3270                 }
3271         }
3272         CmiFree(msg);
3273 }
3274
3275 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3276         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3277         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3278         sendRemoveLogRequests();
3279         (*resumeLbFnPtr)(centralLb);
3280         CmiFree(msg);
3281 }
3282
3283 /**
3284         method that informs an array elements home processor of its current location
3285         It is a converse method to bypass the charm++ message logging framework
3286 */
3287
3288 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
3289         double _startTime = CmiWallTimer();
3290         CurrentLocationMsg msg;
3291         msg.mgrID = locMgrID;
3292         msg.idx = idx;
3293         msg.locationPE = currentPE;
3294         msg.fromPE = CkMyPe();
3295
3296         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3297         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3298         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3299         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3300 }
3301
3302
3303 void _receiveLocationHandler(CurrentLocationMsg *data){
3304         double _startTime = CmiWallTimer();
3305         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3306         if(mgr == NULL){
3307                 CmiFree(data);
3308                 return;
3309         }
3310         CkLocRec *rec = mgr->elementNrec(data->idx);
3311         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3312         if(rec != NULL){
3313                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3314                         if(data->fromPE == data->locationPE){
3315                                 CmiAbort("Another processor has the same object");
3316                         }
3317                 }
3318         }
3319         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3320                 int targetPE = data->fromPE;
3321                 data->fromPE = CmiMyPe();
3322                 data->locationPE = CmiMyPe();
3323                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3324                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3325         }else{
3326                 mgr->inform(data->idx,data->locationPE);
3327         }
3328         CmiFree(data);
3329         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3330 }
3331
3332
3333
3334 void getGlobalStep(CkGroupID gID){
3335         LBStepMsg msg;
3336         int destPE = 0;
3337         msg.lbID = gID;
3338         msg.fromPE = CmiMyPe();
3339         msg.step = -1;
3340         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3341         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3342 };
3343
3344 void _getGlobalStepHandler(LBStepMsg *msg){
3345         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3346         msg->step = lb->step();
3347         CmiAssert(msg->fromPE != CmiMyPe());
3348         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3349         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3350         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3351 };
3352
3353 void _recvGlobalStepHandler(LBStepMsg *msg){
3354         
3355         restartDecisionNumber=msg->step;
3356         RestartRequest *dummyAck = (RestartRequest *)CmiAlloc(sizeof(RestartRequest));
3357         _updateHomeAckHandler(dummyAck);
3358 };
3359
3360 /**
3361  * @brief Function to wrap up performance information.
3362  */
3363 void _messageLoggingExit(){
3364 /*      if(CkMyPe() == 0){
3365                 if(countBuffered != 0){
3366                         printf("[%d] countLocal %d countBuffered %d countPiggy %d Effeciency blocking %.2lf \n",CkMyPe(),countLocal,countBuffered,countPiggy,countLocal/(double )(countBuffered*_maxBufferedMessages));
3367                 }
3368
3369 //              printf("[%d] totalSearchRestoredTime = %.6lf totalSearchRestoredCount %.1lf \n",CkMyPe(),totalSearchRestoredTime,totalSearchRestoredCount);     
3370         }
3371         printf("[%d] countHashCollisions %d countHashRefs %d \n",CkMyPe(),countHashCollisions,countHashRefs);*/
3372         printf("[%d] _messageLoggingExit \n",CmiMyPe());
3373
3374         //TML: printing some statistics for group approach
3375         //if(TEAM_SIZE_MLOG > 1)
3376                 CkPrintf("[%d] Logged messages = %.0f, log size =  %.2f MB\n",CkMyPe(),MLOGFT_totalMessages,MLOGFT_totalLogSize/(float)MEGABYTE);
3377
3378 }
3379
3380 /**
3381         The method for returning the actual object pointed to by an id
3382         If the object doesnot exist on the processor it returns NULL
3383 **/
3384
3385 void* CkObjID::getObject(){
3386         
3387                 switch(type){
3388                         case TypeChare:
3389         
3390                                 return CkLocalChare(&data.chare.id);
3391                         case TypeGroup:
3392         
3393                                 CkAssert(data.group.onPE == CkMyPe());
3394                                 return CkLocalBranch(data.group.id);
3395                         case TypeNodeGroup:
3396                                 CkAssert(data.group.onPE == CkMyNode());
3397                                 //CkLocalNodeBranch(data.group.id);
3398                                 {
3399                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3400                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3401                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3402         
3403                                         return retval;
3404                                 }       
3405                         case TypeArray:
3406                                 {
3407         
3408         
3409                                         CkArrayID aid(data.array.id);
3410         
3411                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3412         
3413                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asMax());
3414         
3415                                         return aProxy.ckLocal();
3416                                 }
3417                         default:
3418                                 CkAssert(0);
3419                 }
3420 }
3421
3422
3423 int CkObjID::guessPE(){
3424                 switch(type){
3425                         case TypeChare:
3426                                 return data.chare.id.onPE;
3427                         case TypeGroup:
3428                         case TypeNodeGroup:
3429                                 return data.group.onPE;
3430                         case TypeArray:
3431                                 {
3432                                         CkArrayID aid(data.array.id);
3433                                         if(aid.ckLocalBranch() == NULL){
3434                                                 return -1;
3435                                         }
3436                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asMax());
3437                                 }
3438                         default:
3439                                 CkAssert(0);
3440                 }
3441 };
3442
3443 char *CkObjID::toString(char *buf) const {
3444         
3445         switch(type){
3446                 case TypeChare:
3447                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
3448                         break;
3449                 case TypeGroup:
3450                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
3451                         break;
3452                 case TypeNodeGroup:
3453                         sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
3454                         break;
3455                 case TypeArray:
3456                         {
3457                                 const CkArrayIndexMax &idx = data.array.idx.asMax();
3458                                 const int *indexData = idx.data();
3459                                 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
3460                                 break;
3461                         }
3462                 default:
3463                         CkAssert(0);
3464         }
3465         
3466         return buf;
3467 };
3468
3469 void CkObjID::updatePosition(int PE){
3470         if(guessPE() == PE){
3471                 return;
3472         }
3473         switch(type){
3474                 case TypeArray:
3475                         {
3476                                         CkArrayID aid(data.array.id);
3477                                         if(aid.ckLocalBranch() == NULL){
3478                                                 
3479                                         }else{
3480                                                 char str[100];
3481                                                 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
3482 //                                              CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
3483                                                 CkLocRec *rec = mgr->elementNrec(data.array.idx.asMax());
3484                                                 if(rec != NULL){
3485                                                         if(rec->type() == CkLocRec::local){
3486                                                                 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
3487                                                                 return;
3488                                                         }
3489                                                 }
3490                                                 mgr->inform(data.array.idx.asMax(),PE);
3491                                         }       
3492                                 }
3493
3494                         break;
3495                 case TypeChare:
3496                         CkAssert(data.chare.id.onPE == PE);
3497                         break;
3498                 case TypeGroup:
3499                 case TypeNodeGroup:
3500                         CkAssert(data.group.onPE == PE);
3501                         break;
3502                 default:
3503                         CkAssert(0);
3504         }
3505 }
3506
3507
3508
3509 void MlogEntry::pup(PUP::er &p){
3510       &