efdf53c465b1a19557be486348155580936ba92a
[charm.git] / src / ck-core / ckmessagelogging.C
1 /**
2   * Fast Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of causal message-logging approach. It does NOT use determinants at all.
5         * Supports multiple concurrent failures.
6         * Assumes the application is iterative and structured in communication.
7   */
8
9 #include "charm.h"
10 #include "ck.h"
11 #include "ckmessagelogging.h"
12 #include "queueing.h"
13 #include <sys/types.h>
14 #include <signal.h>
15 #include "CentralLB.h"
16
17 #ifdef _FAULT_MLOG_
18
19 // Collects some statistics about message logging. 
20 #define COLLECT_STATS_MSGS 0
21 #define COLLECT_STATS_MSGS_TOTAL 0
22 #define COLLECT_STATS_MSG_COUNT 0
23 #define COLLECT_STATS_MEMORY 0
24 #define COLLECT_STATS_TEAM 0
25
26 #define RECOVERY_SEND "SEND"
27 #define RECOVERY_PROCESS "PROCESS"
28
29 #define DEBUG_MEM(x)  //x
30 #define DEBUG(x) // x
31 #define DEBUG_RESTART(x)  //x
32 #define DEBUGLB(x)   // x
33 #define DEBUG_TEAM(x)  // x
34 #define DEBUG_PERF(x) // x
35 #define DEBUG_CHECKPOINT 1
36 #define DEBUG_NOW(x) x
37 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
38 #define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
39 #define DEBUG_RECOVERY(x) //x
40
41 extern const char *idx2str(const CkArrayIndex &ind);
42 extern const char *idx2str(const ArrayElement *el);
43
44 void getGlobalStep(CkGroupID gID);
45
46 bool fault_aware(CkObjID &recver);
47 void sendCheckpointData();
48 void createObjIDList(void *data,ChareMlogData *mlogData);
49 inline bool isLocal(int destPE);
50 inline bool isTeamLocal(int destPE);
51 void printLog(CkObjID *log);
52
53 int _restartFlag=0;
54 int _numRestartResponses=0;
55
56 char *checkpointDirectory=".";
57 int unAckedCheckpoint=0;
58 int countUpdateHomeAcks=0;
59
60 extern int teamSize;
61 extern int chkptPeriod;
62 extern bool fastRecovery;
63 extern int parallelRecovery;
64
65 char *killFile;
66 char *faultFile;
67 int killFlag=0;
68 int faultFlag=0;
69 int restartingMlogFlag=0;
70 void readKillFile();
71 double killTime=0.0;
72 double faultMean;
73 int checkpointCount=0;
74
75 /***** VARIABLES FOR MESSAGE LOGGING *****/
76 // stores the id of current object sending a message
77 CpvDeclare(Chare *,_currentObj);
78 // stores checkpoint from buddy
79 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
80 // stores the incarnation number from every other processor
81 CpvDeclare(char *, _incarnation);
82 // stores messages received before object gets created
83 CpvDeclare(Queue, _outOfOrderMessageQueue);
84 /***** *****/
85
86 /***** VARIABLES FOR PARALLEL RECOVERY *****/
87 CpvDeclare(int, _numEmigrantRecObjs);
88 CpvDeclare(int, _numImmigrantRecObjs);
89 CpvDeclare(CkVec<CkLocation *> *, _immigrantRecObjs);
90 /***** *****/
91
92 #if COLLECT_STATS_MSGS
93 int *numMsgsTarget;
94 int *sizeMsgsTarget;
95 int totalMsgsTarget;
96 float totalMsgsSize;
97 #endif
98 #if COLLECT_STATS_MEMORY
99 int msgLogSize;
100 int bufferedDetsSize;
101 int storedDetsSize;
102 #endif
103
104 /***** IDS FOR MESSAGE LOGGING HANDLERS *****/
105 int _pingHandlerIdx;
106 int _checkpointRequestHandlerIdx;
107 int _storeCheckpointHandlerIdx;
108 int _checkpointAckHandlerIdx;
109 int _getCheckpointHandlerIdx;
110 int _recvCheckpointHandlerIdx;
111 int _verifyAckRequestHandlerIdx;
112 int _verifyAckHandlerIdx;
113 int _dummyMigrationHandlerIdx;
114 int     _getGlobalStepHandlerIdx;
115 int     _recvGlobalStepHandlerIdx;
116 int _updateHomeRequestHandlerIdx;
117 int _updateHomeAckHandlerIdx;
118 int _resendMessagesHandlerIdx;
119 int _receivedDetDataHandlerIdx;
120 int _distributedLocationHandlerIdx;
121 int _sendBackLocationHandlerIdx;
122
123 void setTeamRecovery(void *data, ChareMlogData *mlogData);
124 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
125 int verifyAckTotal;
126 int verifyAckCount;
127 int verifyAckedRequests=0;
128 RestartRequest *storedRequest;
129 int _falseRestart =0; /**
130                                                                                                         For testing on clusters we might carry out restarts on 
131                                                                                                         a porcessor without actually starting it
132                                                                                                         1 -> false restart
133                                                                                                         0 -> restart after an actual crash
134                                                                                                 */
135
136 //Load balancing globals
137 int onGoingLoadBalancing=0;
138 void *centralLb;
139 void (*resumeLbFnPtr)(void *);
140 int _receiveMlogLocationHandlerIdx;
141 int _receiveMigrationNoticeHandlerIdx;
142 int _receiveMigrationNoticeAckHandlerIdx;
143 int _checkpointBarrierHandlerIdx;
144 int _checkpointBarrierAckHandlerIdx;
145 CkVec<MigrationRecord> migratedNoticeList;
146 CkVec<RetainedMigratedObject *> retainedObjectList;
147 int donotCountMigration=0;
148 int countLBMigratedAway=0;
149 int countLBToMigrate=0;
150 int migrationDoneCalled=0;
151 int checkpointBarrierCount=0;
152 int globalResumeCount=0;
153 CkGroupID globalLBID;
154 int restartDecisionNumber=-1;
155 double lastCompletedAlarm=0;
156 double lastRestart=0;
157
158 //update location globals
159 int _receiveLocationHandlerIdx;
160
161 #if CMK_CONVERSE_MPI
162 static int heartBeatHandlerIdx;
163 static int heartBeatCheckHandlerIdx;
164 static int partnerFailureHandlerIdx;
165 static double lastPingTime = -1;
166
167 extern "C" void mpi_restart_crashed(int pe, int rank);
168 extern "C" int  find_spare_mpirank(int pe);
169
170 void heartBeatPartner();
171 void heartBeatHandler(void *msg);
172 void heartBeatCheckHandler();
173 void partnerFailureHandler(char *msg);
174 int getReverseCheckPointPE();
175 #endif
176
177 /***** *****/
178
179 /** 
180  * @brief Initialize message logging data structures and register handlers
181  */
182 void _messageLoggingInit(){
183         if(CkMyPe() == 0)
184                 CkPrintf("[%d] Fast Message Logging Support \n",CkMyPe());
185
186         //current object
187         CpvInitialize(Chare *,_currentObj);
188         
189         //registering handlers for message logging
190         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
191                 
192         //handlers for checkpointing
193         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
194         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
195         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
196
197         //handlers for restart
198         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
199         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
200         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
201         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
202         _sendBackLocationHandlerIdx=CkRegisterHandler((CmiHandler)_sendBackLocationHandler);
203         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
204         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
205         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
206
207         //handlers for load balancing
208         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
209         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
210         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
211         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
212         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
213         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
214         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
215         
216         //handlers for updating locations
217         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
218
219         // handlers for failure detection in MPI layer
220 #if CMK_CONVERSE_MPI
221         heartBeatHandlerIdx = CkRegisterHandler((CmiHandler)heartBeatHandler);
222         heartBeatCheckHandlerIdx = CkRegisterHandler((CmiHandler)heartBeatCheckHandler);
223         partnerFailureHandlerIdx = CkRegisterHandler((CmiHandler)partnerFailureHandler);
224 #endif
225         
226         //Cpv variables for message logging
227         CpvInitialize(Queue, _outOfOrderMessageQueue);
228         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
229         
230         // Cpv variables for causal protocol
231         CpvInitialize(char *, _incarnation);
232         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
233         for(int i=0; i<CmiNumPes(); i++){
234                 CpvAccess(_incarnation)[i] = 0;
235         }
236
237         // Cpv variables for parallel recovery
238         CpvInitialize(int, _numEmigrantRecObjs);
239     CpvAccess(_numEmigrantRecObjs) = 0;
240     CpvInitialize(int, _numImmigrantRecObjs);
241     CpvAccess(_numImmigrantRecObjs) = 0;
242
243     CpvInitialize(CkVec<CkLocation *> *, _immigrantRecObjs);
244     CpvAccess(_immigrantRecObjs) = new CkVec<CkLocation *>;
245
246         //Cpv variables for checkpoint
247         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
248         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
249
250         // registering user events for projections      
251         traceRegisterUserEvent("Remove Logs", 20);
252         traceRegisterUserEvent("Ticket Request Handler", 21);
253         traceRegisterUserEvent("Ticket Handler", 22);
254         traceRegisterUserEvent("Local Message Copy Handler", 23);
255         traceRegisterUserEvent("Local Message Ack Handler", 24);        
256         traceRegisterUserEvent("Preprocess current message",25);
257         traceRegisterUserEvent("Preprocess past message",26);
258         traceRegisterUserEvent("Preprocess future message",27);
259         traceRegisterUserEvent("Checkpoint",28);
260         traceRegisterUserEvent("Checkpoint Store",29);
261         traceRegisterUserEvent("Checkpoint Ack",30);
262         traceRegisterUserEvent("Send Ticket Request",31);
263         traceRegisterUserEvent("Generalticketrequest1",32);
264         traceRegisterUserEvent("TicketLogLocal",33);
265         traceRegisterUserEvent("next_ticket and SN",34);
266         traceRegisterUserEvent("Timeout for buffered remote messages",35);
267         traceRegisterUserEvent("Timeout for buffered local messages",36);
268         traceRegisterUserEvent("Inform Location Home",37);
269         traceRegisterUserEvent("Receive Location Handler",38);
270         
271         lastCompletedAlarm=CmiWallTimer();
272         lastRestart = CmiWallTimer();
273
274         // fault detection for MPI layer
275 #if CMK_CONVERSE_MPI
276   void heartBeatPartner();
277   void heartBeatCheckHandler();
278   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
279   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
280 #endif
281
282 #if COLLECT_STATS_MSGS
283 #if COLLECT_STATS_MSGS_TOTAL
284         totalMsgsTarget = 0;
285         totalMsgsSize = 0.0;
286 #else
287         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
288         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
289         for(int i=0; i<CmiNumPes(); i++){
290                 numMsgsTarget[i] = 0;
291                 sizeMsgsTarget[i] = 0;
292         }
293 #endif
294 #endif
295 #if COLLECT_STATS_MEMORY
296         msgLogSize = 0;
297         bufferedDetsSize = 0;
298         storedDetsSize = 0;
299 #endif
300
301 }
302
303 #if CMK_CONVERSE_MPI
304
305 /**
306  * Receives the notification of a failure and updates pe-to-rank mapping.
307  */
308 void partnerFailureHandler(char *msg)
309 {
310    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
311
312    // send message to crash pe to let it restart
313    int newrank = find_spare_mpirank(diepe);
314    int buddy = getCheckPointPE();
315    if (buddy == diepe)  {
316      mpi_restart_crashed(diepe, newrank);
317      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
318    }
319 }
320
321 /**
322  * Registers last time it knew about the PE that checkpoints on it.
323  */
324 void heartBeatHandler(void *msg)
325 {
326         lastPingTime = CmiWallTimer();
327         CmiFree(msg);
328 }
329
330 /**
331  * Checks whether the PE that checkpoints on it is still alive.
332  */
333 void heartBeatCheckHandler()
334 {
335         double now = CmiWallTimer();
336         if (lastPingTime > 0 && now - lastPingTime > 4) {
337                 int i, pe, buddy;
338                 // tell everyone that PE is down
339                 buddy = getReverseCheckPointPE();
340                 CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
341                 
342                 for (int pe = 0; pe < CmiNumPes(); pe++) {
343                         if (pe == buddy) continue;
344                         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
345                         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
346                         CmiSetHandler(msg, partnerFailureHandlerIdx);
347                         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
348                 }
349         }
350         else 
351                 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
352 }
353
354 /**
355  * Pings buddy to let it know this PE is alive. Used for failure detection.
356  */
357 void heartBeatPartner()
358 {
359         int buddy = getCheckPointPE();
360         //printf("[%d] heartBeatPartner %d\n", CmiMyPe(), buddy);
361         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
362         *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
363         CmiSetHandler(msg, heartBeatHandlerIdx);
364         CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
365         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
366 }
367 #endif
368
369 void killLocal(void *_dummy,double curWallTime);        
370
371 void readKillFile(){
372         FILE *fp=fopen(killFile,"r");
373         if(!fp){
374                 return;
375         }
376         int proc;
377         double sec;
378         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
379                 if(proc == CkMyPe()){
380                         killTime = CmiWallTimer()+sec;
381                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
382                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
383                 }
384         }
385         fclose(fp);
386 }
387
388 /**
389  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
390  * We assume an exponential distribution for the mean-time-between-failures.
391  */
392 void readFaultFile(){
393         FILE *fp=fopen(faultFile,"r");
394         if(!fp){
395                 return;
396         }
397         int proc;
398         double sec;
399         fscanf(fp,"%d %lf",&proc,&sec);
400         faultMean = sec;
401         if(proc == CkMyPe()){
402                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
403                 CcdCallFnAfter(killLocal,NULL,sec*1000);
404         }
405         fclose(fp);
406 }
407
408 void killLocal(void *_dummy,double curWallTime){
409         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
410         if(CmiWallTimer()<killTime-1){
411                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
412         }else{
413 #if CMK_CONVERSE_MPI
414                 CkDieNow();
415 #else
416                 kill(getpid(),SIGKILL);
417 #endif
418         }
419 }
420
421 #if ! CMK_CONVERSE_MPI
422 void CkDieNow()
423 {
424         // kill -9 for non-mpi version
425         CmiPrintf("[%d] die now.\n", CmiMyPe());
426         killTime = CmiWallTimer()+0.001;
427         CcdCallFnAfter(killLocal,NULL,1);
428 }
429 #endif
430
431
432 /*** Auxiliary Functions ***/
433
434 /************************ Message logging methods ****************/
435
436 /**
437  * Sends a group message that might be a broadcast.
438  */
439 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
440         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
441                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
442                 void *origMsg = EnvToUsr(env);
443                 for(int i=0;i<CmiNumPes();i++){
444                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
445                                 void *copyMsg = CkCopyMsg(&origMsg);
446                                 envelope *copyEnv = UsrToEnv(copyMsg);
447                                 copyEnv->SN=0;
448                                 copyEnv->sender.type = TypeInvalid;
449                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
450                                 sendGroupMsg(copyEnv,i,_infoIdx);
451                         }
452                 }
453                 return;
454         }
455
456         // initializing values of envelope
457         env->SN=0;
458         env->sender.type = TypeInvalid;
459
460         CkObjID recver;
461         recver.type = TypeGroup;
462         recver.data.group.id = env->getGroupNum();
463         recver.data.group.onPE = destPE;
464         sendCommonMsg(recver,env,destPE,_infoIdx);
465 }
466
467 /**
468  * Sends a nodegroup message that might be a broadcast.
469  */
470 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
471         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
472                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
473                 void *origMsg = EnvToUsr(env);
474                 for(int i=0;i<CmiNumNodes();i++){
475                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
476                                 void *copyMsg = CkCopyMsg(&origMsg);
477                                 envelope *copyEnv = UsrToEnv(copyMsg);
478                                 copyEnv->SN=0;
479                                 copyEnv->sender.type = TypeInvalid;
480                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
481                         }
482                 }
483                 return;
484         }
485
486         // initializing values of envelope
487         env->SN=0;
488         env->sender.type = TypeInvalid;
489
490         CkObjID recver;
491         recver.type = TypeNodeGroup;
492         recver.data.group.id = env->getGroupNum();
493         recver.data.group.onPE = destNode;
494         sendCommonMsg(recver,env,destNode,_infoIdx);
495 }
496
497 /**
498  * Sends a message to an array element.
499  */
500 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
501         CkObjID recver;
502         recver.type = TypeArray;
503         recver.data.array.id = env->getsetArrayMgr();
504         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
505
506         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
507                 char recverString[100],senderString[100];
508                 
509                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
510         }
511
512         // initializing values of envelope
513         env->SN = 0;
514
515         sendCommonMsg(recver,env,destPE,_infoIdx);
516 };
517
518 /**
519  * Sends a message to a singleton chare.
520  */
521 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
522         CkObjID recver;
523         recver.type = TypeChare;
524         recver.data.chare.id = *pCid;
525
526         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
527                 char recverString[100],senderString[100];
528                 
529                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
530         }
531
532         // initializing values of envelope
533         env->SN = 0;
534
535         sendCommonMsg(recver,env,destPE,_infoIdx);
536 };
537
538 /**
539  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
540  */
541 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
542         envelope *env = _env;
543         int resend=0; //is it a resend
544         DEBUG(char recverName[100]);
545         DEBUG(char senderString[100]);
546         
547         DEBUG_MEM(CmiMemoryCheck());
548
549         if(CpvAccess(_currentObj) == NULL){
550 //              CkAssert(0);
551                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
552                 generalCldEnqueue(destPE,env,_infoIdx);
553                 return;
554         }
555
556         // checking if this message should bypass determinants in message-logging
557         if(env->flags & CK_BYPASS_DET_MLOG){
558                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
559                 env->recver = recver;
560                 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
561                 generalCldEnqueue(destPE,env,_infoIdx);
562                 return;
563         }
564         
565         // setting message logging data in the envelope
566         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
567         env->sender = CpvAccess(_currentObj)->mlogData->objID;
568         env->SN = 0;
569         
570         DEBUG_MEM(CmiMemoryCheck());
571
572         CkObjID &sender = env->sender;
573         env->recver = recver;
574
575         Chare *obj = (Chare *)env->sender.getObject();
576           
577         if(env->SN == 0){
578                 DEBUG_MEM(CmiMemoryCheck());
579                 env->SN = obj->mlogData->nextSN(recver);
580         }else{
581                 resend = 1;
582         }
583
584         // uses the proper sending mechanism for local or remote messages
585         if(isLocal(destPE)){
586                 sendLocalMsg(env, _infoIdx);
587         }else{
588                 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
589                 sendRemoteMsg(sender,recver,destPE,mEntry,env->SN,resend);
590         }
591 }
592
593 /**
594  * Determines if the message is local or not. A message is local if:
595  * 1) Both the destination and origin are the same PE.
596  */
597 inline bool isLocal(int destPE){
598         // both the destination and the origin are the same PE
599         if(destPE == CkMyPe())
600                 return true;
601
602         return false;
603 }
604
605 /**
606  * Determines if the message is group local or not. A message is group local if:
607  * 1) They belong to the same team in the team-based message logging.
608  */
609 inline bool isTeamLocal(int destPE){
610
611         // they belong to the same group
612         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
613                 return true;
614
615         return false;
616 }
617
618 /**
619  * Method that does the actual send by creating a ticket request filling it up and sending it.
620  */
621 void sendRemoteMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
622         DEBUG_NOW(char recverString[100]);
623         DEBUG_NOW(char senderString[100]);
624
625         int totalSize;
626
627         envelope *env = entry->env;
628         DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
629
630         // setting all the information
631         Chare *obj = (Chare *)entry->env->sender.getObject();
632         entry->env->recver = recver;
633         entry->env->SN = SN;
634         if(!resend){
635                 obj->mlogData->addLogEntry(entry);
636 #if COLLECT_STATS_TEAM
637                 MLOGFT_totalMessages += 1.0;
638                 MLOGFT_totalLogSize += entry->env->getTotalsize();
639 #endif
640         }
641
642         // sending the message
643         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
644
645         DEBUG_MEM(CmiMemoryCheck());
646 #if COLLECT_STATS_MSGS
647 #if COLLECT_STATS_MSGS_TOTAL
648         totalMsgsTarget++;
649         totalMsgsSize += (float)env->getTotalsize();
650 #else
651         numMsgsTarget[destPE]++;
652         sizeMsgsTarget[destPE] += env->getTotalsize();
653 #endif
654 #endif
655 #if COLLECT_STATS_MEMORY
656         msgLogSize += env->getTotalsize();
657 #endif
658 };
659
660
661 /**
662  * @brief Function to send a local message. It first gets a ticket and
663  * then enqueues the message. If we are recovering, then the message 
664  * is enqueued in a delay queue.
665  */
666 void sendLocalMsg(envelope *env, int _infoIdx){
667         DEBUG_PERF(double _startTime=CkWallTimer());
668         DEBUG_MEM(CmiMemoryCheck());
669         DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
670         DEBUG(char senderString[100]);
671         DEBUG(char recverString[100]);
672
673         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
674
675         // getting the receiver local object
676         Chare *recverObj = (Chare *)env->recver.getObject();
677
678         // if receiver object is not NULL, we will ask it for a ticket
679         if(recverObj){
680
681                 // sends the local message
682                 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
683
684                 DEBUG_MEM(CmiMemoryCheck());
685         }else{
686                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
687         }
688 };
689
690 /****
691         The handler functions
692 *****/
693
694 bool fault_aware(CkObjID &recver){
695         switch(recver.type){
696                 case TypeChare:
697                         return true;
698                 case TypeMainChare:
699                         return false;
700                 case TypeGroup:
701                 case TypeNodeGroup:
702                 case TypeArray:
703                         return true;
704                 default:
705                         return false;
706         }
707 };
708
709 /* Preprocesses a received message */
710 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
711         DEBUG_NOW(char recverString[100]);
712         DEBUG_NOW(char senderString[100]);
713         DEBUG_MEM(CmiMemoryCheck());
714         int flag;
715         bool ticketSuccess;
716
717         // getting the receiver object
718         CkObjID recver = env->recver;
719
720         // checking for determinants bypass in message logging
721         if(env->flags & CK_BYPASS_DET_MLOG){
722                 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
723                 return 1;       
724         }
725
726         // checking if receiver is fault aware
727         if(!fault_aware(recver)){
728                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
729                 return 1;
730         }
731
732         Chare *obj = (Chare *)recver.getObject();
733         *objPointer = obj;
734         if(obj == NULL){
735                 int possiblePE = recver.guessPE();
736                 if(possiblePE != CkMyPe()){
737                         int totalSize = env->getTotalsize();
738                         CmiSyncSend(possiblePE,totalSize,(char *)env);
739                         
740                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
741                 }else{
742                         // this is the case where a message is received and the object has not been initialized
743                         // we delayed the delivery of the message
744                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
745                         
746                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
747                 }
748                 return 0;
749         }
750
751         // checking if message comes from an old incarnation, message must be discarded
752         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
753                 CmiFree(env);
754                 return 0;
755         }
756
757         DEBUG_MEM(CmiMemoryCheck());
758         DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
759
760         // checking if message has already been processed, message must be discarded    
761         if(obj->mlogData->checkAndStoreSsn(env->sender,env->SN)){
762                 DEBUG(printf("[%d] Message SN %d sender %s for recver %s being ignored\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
763                 CmiFree(env);
764                 return 0;
765         }
766
767         // message can be processed at this point
768         DEBUG(printf("[%d] Message SN %d sender %s for recver %s being delivered\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
769         return 1;
770 }
771
772 /**
773  * @brief Updates a few variables once a message has been processed.
774  */
775 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
776 }
777
778 /***
779         Helpers for the handlers and message logging methods
780 ***/
781
782 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
783 //      double _startTime = CkWallTimer();
784         if(env->recver.type != TypeNodeGroup){
785         //This repeats a step performed in skipCldEnq for messages sent to
786         //other processors. I do this here so that messages to local processors
787         //undergo the same transformation.. It lets the resend be uniform for 
788         //all messages
789 //              CmiSetXHandler(env,CmiGetHandler(env));
790                 _skipCldEnqueue(destPE,env,_infoIdx);
791         }else{
792                 _noCldNodeEnqueue(destPE,env);
793         }
794 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
795 }
796 //extern "C" int CmiGetNonLocalLength();
797
798 void _pingHandler(CkPingMsg *msg){
799         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
800         CmiFree(msg);
801 }
802
803
804 /*****************************************************************************
805         Checkpointing methods..
806                 Pack all the data on a processor and send it to the buddy periodically
807                 Also used to throw away message logs
808 *****************************************************************************/
809 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
810 void clearUpMigratedRetainedLists(int PE);
811
812 void checkpointAlarm(void *_dummy,double curWallTime){
813         double diff = curWallTime-lastCompletedAlarm;
814         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
815 /*      if(CkWallTimer()-lastRestart < 50){
816                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
817                 return;
818         }*/
819         if(diff < ((chkptPeriod) - 2)){
820                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
821                 return;
822         }
823         CheckpointRequest request;
824         request.PE = CkMyPe();
825         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
826         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
827 };
828
829 void _checkpointRequestHandler(CheckpointRequest *request){
830         startMlogCheckpoint(NULL,CmiWallTimer());
831 }
832
833 /**
834  * @brief Starts the checkpoint phase after migration.
835  */
836 void startMlogCheckpoint(void *_dummy, double curWallTime){
837         double _startTime = CkWallTimer();
838
839         // increasing the checkpoint counter
840         checkpointCount++;
841         
842 #if DEBUG_CHECKPOINT
843         if(CmiMyPe() == 0){
844                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
845         }
846 #endif
847
848         DEBUG_MEM(CmiMemoryCheck());
849
850         PUP::sizer psizer;
851         psizer | checkpointCount;
852         for(int i=0; i<CmiNumPes(); i++){
853                 psizer | CpvAccess(_incarnation)[i];
854         }
855         CkPupROData(psizer);
856         DEBUG_MEM(CmiMemoryCheck());
857         CkPupGroupData(psizer,CmiTrue);
858         DEBUG_MEM(CmiMemoryCheck());
859         CkPupNodeGroupData(psizer,CmiTrue);
860         DEBUG_MEM(CmiMemoryCheck());
861         pupArrayElementsSkip(psizer,CmiTrue,NULL);
862         DEBUG_MEM(CmiMemoryCheck());
863
864         int dataSize = psizer.size();
865         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
866         char *msg = (char *)CmiAlloc(totalSize);
867         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
868         chkMsg->PE = CkMyPe();
869         chkMsg->dataSize = dataSize;
870         
871         char *buf = &msg[sizeof(CheckPointDataMsg)];
872         PUP::toMem pBuf(buf);
873
874         pBuf | checkpointCount;
875         for(int i=0; i<CmiNumPes(); i++){
876                 pBuf | CpvAccess(_incarnation)[i];
877         }
878         CkPupROData(pBuf);
879         CkPupGroupData(pBuf,CmiTrue);
880         CkPupNodeGroupData(pBuf,CmiTrue);
881         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
882
883         unAckedCheckpoint=1;
884         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
885         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
886
887 #if DEBUG_CHECKPOINT
888         if(CmiMyPe() == 0){
889                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
890         }
891 #endif
892
893 #if COLLECT_STATS_MEMORY
894         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
895         msgLogSize = 0;
896         bufferedDetsSize = 0;
897         storedDetsSize = 0;
898 #endif
899
900         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
901                 lastCompletedAlarm = curWallTime;
902                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
903         }
904         traceUserBracketEvent(28,_startTime,CkWallTimer());
905 };
906
907
908 class ElementPacker : public CkLocIterator {
909 private:
910         CkLocMgr *locMgr;
911         PUP::er &p;
912 public:
913         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
914         void addLocation(CkLocation &loc) {
915                 CkArrayIndexMax idx=loc.getIndex();
916                 CkGroupID gID = locMgr->ckGetGroupID();
917                 p|gID;      // store loc mgr's GID as well for easier restore
918                 p|idx;
919                 p|loc;
920         }
921 };
922
923 /**
924  * Pups all the array elements in this processor.
925  */
926 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
927         int numElements,i;
928         int numGroups = CkpvAccess(_groupIDTable)->size();      
929         if(!p.isUnpacking()){
930                 numElements = CkCountArrayElements();
931         }       
932         p | numElements;
933         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
934         if(!p.isUnpacking()){
935                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
936         }else{
937                 //Flush all recs of all LocMgrs before putting in new elements
938 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
939                 for(int j=0;j<listsize;j++){
940                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
941                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
942                                 listToSkip[j].idx.print();
943                         }
944                 }
945                 
946                 printf("numElements = %d\n",numElements);
947         
948                 for (int i=0; i<numElements; i++) {
949                         CkGroupID gID;
950                         CkArrayIndexMax idx;
951                         p|gID;
952                 p|idx;
953                         int flag=0;
954                         int matchedIdx=0;
955                         for(int j=0;j<listsize;j++){
956                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
957                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
958                                                 matchedIdx = j;
959                                                 flag = 1;
960                                                 break;
961                                         }
962                                 }
963                         }
964                         if(flag == 1){
965                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
966                         }else{
967                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
968                         }
969                                 
970                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
971                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
972                         mgr->resume(idx,p,create,flag);
973                         if(flag == 1){
974                                 int homePE = mgr->homePe(idx);
975                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
976                         }
977                 }
978         }
979 };
980
981
982 void writeCheckpointToDisk(int size,char *chkpt){
983         char fNameTemp[100];
984         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
985         int fd = creat(fNameTemp,S_IRWXU);
986         int ret = write(fd,chkpt,size);
987         CkAssert(ret == size);
988         close(fd);
989         
990         char fName[100];
991         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
992         unlink(fName);
993
994         rename(fNameTemp,fName);
995 }
996
997 //handler that receives the checkpoint from a processor
998 //it stores it and acks it
999 void _storeCheckpointHandler(char *msg){
1000         double _startTime=CkWallTimer();
1001                 
1002         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1003         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1004         
1005         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1006         
1007         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1008         if(oldChkpt != NULL){
1009                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1010                 CmiFree(oldmsg);
1011         }
1012         //turning off storing checkpoints
1013         
1014         int sendingPE = chkMsg->PE;
1015         
1016         CpvAccess(_storedCheckpointData)->buf = chkpt;
1017         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1018         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1019
1020         int count=0;
1021         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1022                 if(migratedNoticeList[j].fromPE == sendingPE){
1023                         migratedNoticeList[j].ackFrom = 1;
1024                 }else{
1025                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1026                 }
1027                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1028                         migratedNoticeList.remove(j);
1029                         count++;
1030                 }
1031                 
1032         }
1033         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1034         
1035         CheckPointAck ackMsg;
1036         ackMsg.PE = CkMyPe();
1037         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1038         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1039         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1040         
1041         traceUserBracketEvent(29,_startTime,CkWallTimer());
1042 };
1043
1044
1045 void _checkpointAckHandler(CheckPointAck *ackMsg){
1046         DEBUG_MEM(CmiMemoryCheck());
1047         unAckedCheckpoint=0;
1048         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1049         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1050         if(onGoingLoadBalancing){
1051                 onGoingLoadBalancing = 0;
1052                 finishedCheckpointLoadBalancing();
1053         }
1054         CmiFree(ackMsg);
1055 };
1056
1057 void clearUpMigratedRetainedLists(int PE){
1058         int count=0;
1059         CmiMemoryCheck();
1060         
1061         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1062                 if(migratedNoticeList[j].toPE == PE){
1063                         migratedNoticeList[j].ackTo = 1;
1064                 }
1065                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1066                         migratedNoticeList.remove(j);
1067                         count++;
1068                 }
1069         }
1070         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1071         
1072         for(int j=retainedObjectList.size()-1;j>=0;j--){
1073                 if(retainedObjectList[j]->migRecord.toPE == PE){
1074                         RetainedMigratedObject *obj = retainedObjectList[j];
1075                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1076                         retainedObjectList.remove(j);
1077                         if(obj->msg != NULL){
1078                                 CmiMemoryCheck();
1079                                 CmiFree(obj->msg);
1080                         }
1081                         delete obj;
1082                 }
1083         }
1084 }
1085
1086 /***************************************************************
1087         Restart Methods and handlers
1088 ***************************************************************/        
1089
1090 /**
1091  * Function for restarting the crashed processor.
1092  * It sets the restart flag and contacts the buddy
1093  * processor to get the latest checkpoint.
1094  */
1095 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1096         RestartRequest msg;
1097
1098         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1099
1100         // setting the restart flag
1101         _restartFlag = 1;
1102         _numRestartResponses = 0;
1103
1104         // requesting the latest checkpoint from its buddy
1105         msg.PE = CkMyPe();
1106         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1107         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1108 };
1109
1110 void CkMlogRestartDouble(void *,double){
1111         CkMlogRestart(NULL,NULL);
1112 };
1113
1114 /**
1115  * Gets the stored checkpoint for its buddy processor.
1116  */
1117 void _getCheckpointHandler(RestartRequest *restartMsg){
1118
1119         // retrieving the stored checkpoint
1120         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1121
1122         // making sure it is its buddy who is requesting the checkpoint
1123         CkAssert(restartMsg->PE == storedChkpt->PE);
1124
1125         storedRequest = restartMsg;
1126         verifyAckTotal = 0;
1127
1128         for(int i=0;i<migratedNoticeList.size();i++){
1129                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1130 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1131                         if(migratedNoticeList[i].ackFrom == 0){
1132                                 //need to verify if the object actually exists .. it might not
1133                                 //have been acked but it might exist on it
1134                                 VerifyAckMsg msg;
1135                                 msg.migRecord = migratedNoticeList[i];
1136                                 msg.index = i;
1137                                 msg.fromPE = CmiMyPe();
1138                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1139                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1140                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1141                                 verifyAckTotal++;
1142                         }
1143                 }
1144         }
1145
1146         // sending the checkpoint back to its buddy     
1147         if(verifyAckTotal == 0){
1148                 sendCheckpointData();
1149         }
1150         verifyAckCount = 0;
1151 }
1152
1153
1154 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1155         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1156         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1157         if(rec != NULL && rec->type() == CkLocRec::local){
1158                         //this location exists on this processor
1159                         //and needs to be removed       
1160                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1161                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1162                         
1163                         int localIdx = localRec->getLocalIndex();
1164                         LBDatabase *lbdb = localRec->getLBDB();
1165                         LDObjHandle ldHandle = localRec->getLdHandle();
1166                                 
1167                         locMgr->setDuringMigration(true);
1168                         
1169                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1170                         lbdb->UnregisterObj(ldHandle);
1171                         
1172                         locMgr->setDuringMigration(false);
1173                         
1174                         verifyAckedRequests++;
1175
1176         }
1177         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1178         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1179 };
1180
1181
1182 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1183         int index =     verifyReply->index;
1184         migratedNoticeList[index] = verifyReply->migRecord;
1185         verifyAckCount++;
1186         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1187         if(verifyAckCount == verifyAckTotal){
1188                 sendCheckpointData();
1189         }
1190 }
1191
1192
1193 /**
1194  * Sends the checkpoint to its buddy. 
1195  */
1196 void sendCheckpointData(){      
1197         RestartRequest *restartMsg = storedRequest;
1198         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1199         int numMigratedAwayElements = migratedNoticeList.size();
1200         if(migratedNoticeList.size() != 0){
1201                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1202 //                      CkAssert(migratedNoticeList.size() == 0);
1203         }
1204         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1205         
1206         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1207         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1208         
1209         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1210         
1211         char *msg = (char *)CmiAlloc(totalSize);
1212         
1213         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1214         dataMsg->PE = CkMyPe();
1215         dataMsg->restartWallTime = CmiTimer();
1216         dataMsg->checkPointSize = storedChkpt->bufSize;
1217         
1218         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1219 //      dataMsg->numMigratedAwayElements = 0;
1220         
1221         dataMsg->numMigratedInElements = 0;
1222         dataMsg->migratedElementSize = 0;
1223         dataMsg->lbGroupID = globalLBID;
1224         /*msg layout 
1225                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1226                 Local MessageLog|
1227         */
1228         //store checkpoint data
1229         char *buf = &msg[sizeof(RestartProcessorData)];
1230
1231         if(dataMsg->numMigratedAwayElements != 0){
1232                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1233                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1234         }
1235         
1236
1237         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1238         buf = &buf[storedChkpt->bufSize];
1239
1240         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1241         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1242         CmiFree(restartMsg);
1243
1244 };
1245
1246
1247 // this list is used to create a vector of the object ids of all
1248 //the chares on this processor currently and the highest TN processed by them 
1249 //the first argument is actually a CkVec<TProcessedLog> *
1250 void createObjIDList(void *data, ChareMlogData *mlogData){
1251         CkVec<CkObjID> *list = (CkVec<CkObjID> *)data;
1252         CkObjID entry;
1253         entry = mlogData->objID;
1254         list->push_back(entry);
1255         DEBUG_RECOVERY(printLog(&entry));
1256 }
1257
1258
1259 /**
1260  * Receives the checkpoint data from its buddy, restores the state of all the objects
1261  * and asks everyone else to update its home.
1262  */
1263 void _recvCheckpointHandler(char *_restartData){
1264         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1265         MigrationRecord *migratedAwayElements;
1266
1267         globalLBID = restartData->lbGroupID;
1268         
1269         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1270         char *buf = &_restartData[sizeof(RestartProcessorData)];
1271         
1272         if(restartData->numMigratedAwayElements != 0){
1273                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1274                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1275                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1276                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1277         }
1278         
1279         PUP::fromMem pBuf(buf);
1280
1281         pBuf | checkpointCount;
1282         for(int i=0; i<CmiNumPes(); i++){
1283                 pBuf | CpvAccess(_incarnation)[i];
1284         }
1285         CkPupROData(pBuf);
1286         CkPupGroupData(pBuf,CmiTrue);
1287         CkPupNodeGroupData(pBuf,CmiTrue);
1288         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1289         CkAssert(pBuf.size() == restartData->checkPointSize);
1290         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1291
1292         // increases the incarnation number
1293         CpvAccess(_incarnation)[CmiMyPe()]++;
1294         
1295         forAllCharesDo(initializeRestart,NULL);
1296         
1297         CmiFree(_restartData);
1298         
1299         _initDone();
1300
1301         getGlobalStep(globalLBID);
1302
1303         
1304 }
1305
1306 /**
1307  * @brief Initializes variables and flags for restarting procedure.
1308  */
1309 void initializeRestart(void *data, ChareMlogData *mlogData){
1310         mlogData->resendReplyRecvd = 0;
1311         mlogData->restartFlag = 1;
1312 };
1313
1314 /**
1315  * Updates the homePe of chare array elements.
1316  */
1317 void updateHomePE(void *data,ChareMlogData *mlogData){
1318         RestartRequest *updateRequest = (RestartRequest *)data;
1319         int PE = updateRequest->PE; //restarted PE
1320         //if this object is an array Element and its home is the restarted processor
1321         // the home processor needs to know its current location
1322         if(mlogData->objID.type == TypeArray){
1323                 //it is an array element
1324                 CkGroupID myGID = mlogData->objID.data.array.id;
1325                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
1326                 CkArrayID aid(mlogData->objID.data.array.id);           
1327                 //check if the restarted processor is the home processor for this object
1328                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
1329                 if(locMgr->homePe(myIdx) == PE){
1330                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
1331                         DEBUG_RESTART(myIdx.print());
1332                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
1333                 }
1334         }
1335 };
1336
1337 /**
1338  * Prints a processed log.
1339  */
1340 void printLog(CkObjID &recver){
1341         char recverString[100];
1342         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" \n",CkMyPe(),recver.toString(recverString));
1343 }
1344
1345 /**
1346  * Prints information about a message.
1347  */
1348 void printMsg(envelope *env, const char* par){
1349         char senderString[100];
1350         char recverString[100];
1351         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
1352 }
1353
1354 /**
1355  * @brief Resends all the logged messages to a particular chare list.
1356  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
1357  * @param mlogData a particular chare living in this processor.
1358  */
1359 void resendMessageForChare(void *data, ChareMlogData *mlogData){
1360         DEBUG_RESTART(char nameString[100]);
1361         DEBUG_RESTART(char recverString[100]);
1362         DEBUG_RESTART(char senderString[100]);
1363
1364         ResendData *resendData = (ResendData *)data;
1365         int PE = resendData->PE; //restarted PE
1366         int count=0;
1367         int ticketRequests=0;
1368         CkQ<MlogEntry *> *log = mlogData->getMlog();
1369
1370         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
1371
1372         // traversing the message log to see if we must resend a message        
1373         for(int i=0;i<log->length();i++){
1374                 MlogEntry *logEntry = (*log)[i];
1375                 
1376                 // if we sent out the logs of a local message to buddy and it crashed
1377                 //before acknowledging 
1378                 envelope *env = logEntry->env;
1379                 if(env == NULL){
1380                         continue;
1381                 }
1382         
1383                 // resend if type is not invalid        
1384                 if(env->recver.type != TypeInvalid){
1385                         for(int j=0;j<resendData->numberObjects;j++){
1386                                 if(env->recver == (resendData->listObjects)[j]){
1387                                         if(PE != CkMyPe()){
1388                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
1389                                                 if(env->recver.type == TypeNodeGroup){
1390                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
1391                                                 }else{
1392                                                         CmiSetHandler(env,CmiGetXHandler(env));
1393                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
1394                                                 }
1395                                         }else{
1396                                                 envelope *copyEnv = copyEnvelope(env);
1397                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
1398                                         }
1399                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
1400                                         count++;
1401                                 }
1402                         }//end of for loop of objects
1403                         
1404                 }       
1405         }
1406         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
1407 }
1408
1409 /**
1410  * Resends messages since last checkpoint to the list of objects included in the 
1411  * request. It also sends stored remote determinants to the particular failed PE.
1412  */
1413 void _resendMessagesHandler(char *msg){
1414         ResendData d;
1415         ResendRequest *resendReq = (ResendRequest *)msg;
1416
1417         // building the reply message
1418         char *listObjects = &msg[sizeof(ResendRequest)];
1419         d.numberObjects = resendReq->numberObjects;
1420         d.PE = resendReq->PE;
1421         d.listObjects = (CkObjID *)listObjects;
1422         
1423         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
1424
1425         // resends messages for the list of objects
1426         forAllCharesDo(resendMessageForChare,&d);
1427
1428         DEBUG_MEM(CmiMemoryCheck());
1429
1430         if(resendReq->PE != CkMyPe()){
1431                 CmiFree(msg);
1432         }       
1433 }
1434
1435 /*
1436         Method to do parallel restart. Distribute some of the array elements to other processors.
1437         The problem is that we cant use to charm entry methods to do migration as it will get
1438         stuck in the protocol that is going to restart
1439         Note: in order to avoid interference between the objects being recovered, the current PE
1440     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
1441     objects end up stepping into each other's shoes (interference).
1442 */
1443
1444 class ElementDistributor: public CkLocIterator{
1445         CkLocMgr *locMgr;
1446         int *targetPE;
1447
1448         void pupLocation(CkLocation &loc,PUP::er &p){
1449                 CkArrayIndexMax idx=loc.getIndex();
1450                 CkGroupID gID = locMgr->ckGetGroupID();
1451                 p|gID;      // store loc mgr's GID as well for easier restore
1452                 p|idx;
1453                 p|loc;
1454         };
1455 public:
1456         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
1457
1458         void addLocation(CkLocation &loc){
1459
1460                 // leaving object on this PE
1461                 if(*targetPE == CkMyPe()){
1462                         *targetPE = (*targetPE +1)%CkNumPes();
1463                         return;
1464                 }
1465                         
1466                 CkArrayIndexMax idx = loc.getIndex();
1467                 CkLocRec_local *rec = loc.getLocalRecord();
1468                 CkLocMgr *locMgr = loc.getManager();
1469                 CkVec<CkMigratable *> eltList;
1470                         
1471                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
1472                 idx.print();
1473
1474                 // incrementing number of emigrant objects
1475                 CpvAccess(_numEmigrantRecObjs)++;
1476         locMgr->migratableList((CkLocRec_local *)rec,eltList);
1477                 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
1478                 
1479                 // let everybody else know the object is leaving
1480                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
1481                 reductionMgr->incNumEmigrantRecObjs();
1482         
1483                 //pack up this location and send it across
1484                 PUP::sizer psizer;
1485                 pupLocation(loc,psizer);
1486                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
1487                 char *msg = (char *)CmiAlloc(totalSize);
1488                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
1489                 distributeMsg->PE = CkMyPe();
1490                 char *buf = &msg[sizeof(DistributeObjectMsg)];
1491                 PUP::toMem pmem(buf);
1492                 pmem.becomeDeleting();
1493                 pupLocation(loc,pmem);
1494                         
1495                 locMgr->setDuringMigration(CmiTrue);
1496                 delete rec;
1497                 locMgr->setDuringMigration(CmiFalse);
1498                 locMgr->inform(idx,*targetPE);
1499
1500                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
1501                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
1502
1503                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
1504
1505                 //decide on the target processor for the next object
1506                 *targetPE = *targetPE + 1;
1507                 if(*targetPE > (CkMyPe() + parallelRecovery)){
1508                         *targetPE = CkMyPe() + 1;
1509                 }
1510         }
1511
1512 };
1513
1514 /**
1515  * Distributes objects to accelerate recovery after a failure.
1516  */
1517 void distributeRestartedObjects(){
1518         int numGroups = CkpvAccess(_groupIDTable)->size();      
1519         int i;
1520         int targetPE=CkMyPe()+1;
1521         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
1522 };
1523
1524 /**
1525  * Handler to receive back a location.
1526  */
1527 void _sendBackLocationHandler(char *receivedMsg){
1528         printf("Array element received at processor %d after recovery\n",CkMyPe());
1529         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
1530         int sourcePE = distributeMsg->PE;
1531         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
1532         PUP::fromMem pmem(buf);
1533         CkGroupID gID;
1534         CkArrayIndexMax idx;
1535         pmem |gID;
1536         pmem |idx;
1537         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1538         donotCountMigration=1;
1539         mgr->resume(idx,pmem,CmiTrue);
1540         donotCountMigration=0;
1541         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
1542         printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
1543         idx.print();
1544
1545         // decrementing number of emigrant objects at reduction manager
1546         CkVec<CkMigratable *> eltList;
1547         CkLocRec *rec = mgr->elementRec(idx);
1548         mgr->migratableList((CkLocRec_local *)rec,eltList);
1549         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
1550         reductionMgr->decNumEmigrantRecObjs();
1551         reductionMgr->decGCount();
1552
1553         // checking if it has received all emigrant recovering objects
1554         CpvAccess(_numEmigrantRecObjs)--;
1555         if(CpvAccess(_numEmigrantRecObjs) == 0){
1556                 (*resumeLbFnPtr)(centralLb);
1557         }
1558
1559 }
1560
1561 /**
1562  * Handler to update information about an object just received.
1563  */
1564 void _distributedLocationHandler(char *receivedMsg){
1565         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
1566         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
1567         int sourcePE = distributeMsg->PE;
1568         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
1569         PUP::fromMem pmem(buf);
1570         CkGroupID gID;
1571         CkArrayIndexMax idx;
1572         pmem |gID;
1573         pmem |idx;
1574         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1575         donotCountMigration=1;
1576         mgr->resume(idx,pmem,CmiTrue);
1577         donotCountMigration=0;
1578         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
1579         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
1580         idx.print();
1581
1582         CkLocRec *rec = mgr->elementRec(idx);
1583         CmiAssert(rec->type() == CkLocRec::local);
1584
1585         // adding object to the list of immigrant recovery objects
1586         CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,(CkLocRec_local *)rec));
1587         CpvAccess(_numImmigrantRecObjs)++;
1588         
1589         CkVec<CkMigratable *> eltList;
1590         mgr->migratableList((CkLocRec_local *)rec,eltList);
1591         for(int i=0;i<eltList.size();i++){
1592                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
1593                         CpvAccess(_currentObj) = eltList[i];
1594                         eltList[i]->mlogData->immigrantRecFlag = 1;
1595                         eltList[i]->mlogData->immigrantSourcePE = sourcePE;
1596
1597                         // incrementing immigrant counter at reduction manager
1598                         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
1599                         reductionMgr->incNumImmigrantRecObjs();
1600                         reductionMgr->decGCount();
1601
1602                         eltList[i]->ResumeFromSync();
1603                 }
1604         }
1605 }
1606
1607
1608 /** this method is used to send messages to a restarted processor to tell
1609  * it that a particular expected object is not going to get to it */
1610 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
1611         DummyMigrationMsg buf;
1612         buf.flag = MLOG_OBJECT;
1613         buf.lbID = lbID;
1614         buf.mgrID = locMgrID;
1615         buf.idx = idx;
1616         buf.locationPE = locationPE;
1617         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
1618         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
1619 };
1620
1621
1622 /**this method is used by a restarted processor to tell other processors
1623  * that they are not going to receive these many objects.. just the count
1624  * not the objects themselves ***/
1625
1626 void sendDummyMigrationCounts(int *dummyCounts){
1627         DummyMigrationMsg buf;
1628         buf.flag = MLOG_COUNT;
1629         buf.lbID = globalLBID;
1630         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
1631         for(int i=0;i<CmiNumPes();i++){
1632                 if(i != CmiMyPe() && dummyCounts[i] != 0){
1633                         buf.count = dummyCounts[i];
1634                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
1635                 }
1636         }
1637 }
1638
1639
1640 /** this handler is used to process a dummy migration msg.
1641  * it looks up the load balancer and calls migrated for it */
1642
1643 void _dummyMigrationHandler(DummyMigrationMsg *msg){
1644         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
1645         if(msg->flag == MLOG_OBJECT){
1646                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
1647                 LDObjHandle h;
1648                 lb->Migrated(h,1);
1649         }
1650         if(msg->flag == MLOG_COUNT){
1651                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
1652                 msg->count -= verifyAckedRequests;
1653                 for(int i=0;i<msg->count;i++){
1654                         LDObjHandle h;
1655                         lb->Migrated(h,1);
1656                 }
1657         }
1658         verifyAckedRequests=0;
1659         CmiFree(msg);
1660 };
1661
1662 /*****************************************************
1663         Implementation of a method that can be used to call
1664         any method on the ChareMlogData of all the chares on
1665         a processor currently
1666 ******************************************************/
1667
1668
1669 class ElementCaller :  public CkLocIterator {
1670 private:
1671         CkLocMgr *locMgr;
1672         MlogFn fnPointer;
1673         void *data;
1674 public:
1675         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
1676                 locMgr = _locMgr;
1677                 fnPointer = _fnPointer;
1678                 data = _data;
1679         };
1680         void addLocation(CkLocation &loc){
1681                 CkVec<CkMigratable *> list;
1682                 CkLocRec_local *local = loc.getLocalRecord();
1683                 locMgr->migratableList (local,list);
1684                 for(int i=0;i<list.size();i++){
1685                         CkMigratable *migratableElement = list[i];
1686                         fnPointer(data,migratableElement->mlogData);
1687                 }
1688         }
1689 };
1690
1691 /**
1692  * Map function pointed by fnPointer over all the chares living in this processor.
1693  */
1694 void forAllCharesDo(MlogFn fnPointer, void *data){
1695         int numGroups = CkpvAccess(_groupIDTable)->size();
1696         for(int i=0;i<numGroups;i++){
1697                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1698                 fnPointer(data,obj->mlogData);
1699         }
1700         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1701         for(int i=0;i<numNodeGroups;i++){
1702                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
1703                 fnPointer(data,obj->mlogData);
1704         }
1705         int i;
1706         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
1707 };
1708
1709
1710 /******************************************************************
1711  Load Balancing
1712 ******************************************************************/
1713
1714 /**
1715  * This is the first time Converse is called after AtSync method has been called by every local object.
1716  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
1717  * message logging, we can take advantage of this situation and garbage collect at this point.
1718  */
1719 void initMlogLBStep(CkGroupID gid){
1720         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
1721         countLBMigratedAway = 0;
1722         countLBToMigrate=0;
1723         onGoingLoadBalancing=1;
1724         migrationDoneCalled=0;
1725         checkpointBarrierCount=0;
1726         if(globalLBID.idx != 0){
1727                 CmiAssert(globalLBID.idx == gid.idx);
1728         }
1729         globalLBID = gid;
1730 #if SYNCHRONIZED_CHECKPOINT
1731         garbageCollectMlog();
1732 #endif
1733 }
1734
1735 /**
1736  * Pups a location
1737  */
1738 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
1739         CkArrayIndexMax idx = loc->getIndex();
1740         CkGroupID gID = locMgr->ckGetGroupID();
1741         p|gID;      // store loc mgr's GID as well for easier restore
1742         p|idx;
1743         p|*loc;
1744 };
1745
1746 /**
1747  * Sends back the immigrant recovering object to their origin PE.
1748  */
1749 void sendBackImmigrantRecObjs(){
1750         CkLocation *loc;
1751         CkLocMgr *locMgr;
1752         CkArrayIndexMax idx;
1753         CkLocRec_local *rec;
1754         PUP::sizer psizer;
1755         int targetPE;
1756         CkVec<CkMigratable *> eltList;
1757         CkReductionMgr *reductionMgr;
1758  
1759         // looping through all elements in immigrant recovery objects vector
1760         for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
1761
1762                 // getting the components of each location
1763                 loc = (*CpvAccess(_immigrantRecObjs))[i];
1764                 idx = loc->getIndex();
1765                 rec = loc->getLocalRecord();
1766                 locMgr = loc->getManager();
1767         locMgr->migratableList((CkLocRec_local *)rec,eltList);
1768                 targetPE = eltList[i]->mlogData->immigrantSourcePE;
1769
1770                 // decrement counter at array manager
1771                 reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
1772                 reductionMgr->decNumImmigrantRecObjs();
1773
1774                 CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
1775                 idx.print();
1776
1777                 // let everybody else know the object is leaving
1778                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
1779                         
1780                 //pack up this location and send it across
1781                 pupLocation(loc,locMgr,psizer);
1782                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
1783                 char *msg = (char *)CmiAlloc(totalSize);
1784                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
1785                 distributeMsg->PE = CkMyPe();
1786                 char *buf = &msg[sizeof(DistributeObjectMsg)];
1787                 PUP::toMem pmem(buf);
1788                 pmem.becomeDeleting();
1789                 pupLocation(loc,locMgr,pmem);
1790                 
1791                 locMgr->setDuringMigration(CmiTrue);
1792                 delete rec;
1793                 locMgr->setDuringMigration(CmiFalse);
1794                 locMgr->inform(idx,targetPE);
1795
1796                 // sending the object
1797                 CmiSetHandler(msg,_sendBackLocationHandlerIdx);
1798                 CmiSyncSendAndFree(targetPE,totalSize,msg);
1799
1800                 // freeing memory
1801                 delete loc;
1802
1803                 CmiAssert(locMgr->lastKnown(idx) == targetPE);
1804                 
1805         }
1806
1807         // cleaning up all data structures
1808         CpvAccess(_immigrantRecObjs)->removeAll();
1809         CpvAccess(_numImmigrantRecObjs) = 0;
1810
1811 }
1812
1813 /**
1814  * Restores objects after parallel recovery, either by sending back the immigrant objects or 
1815  * by waiting for all emigrant objects to be back.
1816  */
1817 void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
1818         resumeLbFnPtr = _fnPtr;
1819         centralLb = _centralLb;
1820
1821         // sending back the immigrant recovering objects
1822         if(CpvAccess(_numImmigrantRecObjs) > 0){
1823                 sendBackImmigrantRecObjs();     
1824         }
1825
1826         // checking whether it needs to wait for emigrant recovery objects
1827         if(CpvAccess(_numEmigrantRecObjs) > 0)
1828                 return;
1829
1830         // otherwise, load balancing process is finished
1831         (*resumeLbFnPtr)(centralLb);
1832 }
1833
1834 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
1835         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
1836         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
1837
1838         resumeLbFnPtr = _fnPtr;
1839         centralLb = _centralLb;
1840         migrationDoneCalled = 1;
1841         if(countLBToMigrate == countLBMigratedAway){
1842                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
1843                 startMlogCheckpoint(NULL,CmiWallTimer());
1844         }
1845 };
1846
1847 void finishedCheckpointLoadBalancing(){
1848         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
1849         CheckpointBarrierMsg msg;
1850         msg.fromPE = CmiMyPe();
1851         msg.checkpointCount = checkpointCount;
1852
1853         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
1854         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
1855         
1856 };
1857
1858 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
1859         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
1860         migratedNoticeList.push_back(msg->migRecord);
1861
1862         MigrationNoticeAck buf;
1863         buf.record = msg->record;
1864         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
1865         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
1866 }
1867
1868 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
1869         
1870         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
1871         retainedObject->acked = 1;
1872
1873         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
1874         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
1875
1876         //inform home about the new location of this object
1877         CkGroupID gID = retainedObject->migRecord.gID ;
1878         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1879         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
1880         
1881         countLBMigratedAway++;
1882         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
1883                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
1884                 startMlogCheckpoint(NULL,CmiWallTimer());
1885         }
1886 };
1887
1888 void _receiveMlogLocationHandler(void *buf){
1889         envelope *env = (envelope *)buf;
1890         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
1891         CkUnpackMessage(&env);
1892         void *_msg = EnvToUsr(env);
1893         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
1894         CkGroupID gID= msg->gid;
1895         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
1896         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1897         CpvAccess(_currentObj)=mgr;
1898         mgr->immigrate(msg);
1899 };
1900
1901 /**
1902  * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
1903  */
1904 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
1905         if(checkpointBarrierCount == CmiNumPes()){
1906                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
1907                 for(int i=0;i<CmiNumPes();i++){
1908                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
1909                 }
1910         }
1911 }
1912
1913 /**
1914  * @brief Processor 0 receives a contribution from every other processor after checkpoint.
1915  */ 
1916 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
1917         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
1918         if(msg->checkpointCount == checkpointCount){
1919                 checkpointBarrierCount++;
1920                 checkAndSendCheckpointBarrierAcks(msg);
1921         }else{
1922                 if(msg->checkpointCount-1 == checkpointCount){
1923                         checkpointBarrierCount++;
1924                         checkAndSendCheckpointBarrierAcks(msg);
1925                 }else{
1926                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
1927                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
1928                 }
1929         }
1930
1931         // deleting the received message
1932         CmiFree(msg);
1933 }
1934
1935 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
1936         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
1937         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
1938
1939         // resuming LB function pointer
1940         (*resumeLbFnPtr)(centralLb);
1941
1942         // deleting message
1943         CmiFree(msg);
1944 }
1945
1946 /**
1947  * @brief Function to remove all messages in the message log of a particular chare.
1948  */
1949 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
1950         int total;
1951         MlogEntry *logEntry;
1952         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1953
1954         // traversing the whole message log and removing all elements
1955         total = mlog->length();
1956         for(int i=0; i<total; i++){
1957                 logEntry = mlog->deq();
1958                 delete logEntry;
1959         }
1960
1961 }
1962
1963 /**
1964  * @brief Garbage collects the message log and other data structures.
1965  */
1966 void garbageCollectMlog(){
1967         DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
1968
1969         // removing all messages in message log for every chare
1970         forAllCharesDo(garbageCollectMlogForChare, NULL);
1971 }
1972
1973 /**
1974         method that informs an array elements home processor of its current location
1975         It is a converse method to bypass the charm++ message logging framework
1976 */
1977
1978 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
1979         double _startTime = CmiWallTimer();
1980         CurrentLocationMsg msg;
1981         msg.mgrID = locMgrID;
1982         msg.idx = idx;
1983         msg.locationPE = currentPE;
1984         msg.fromPE = CkMyPe();
1985
1986         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
1987         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
1988         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
1989         traceUserBracketEvent(37,_startTime,CmiWallTimer());
1990 }
1991
1992
1993 void _receiveLocationHandler(CurrentLocationMsg *data){
1994         double _startTime = CmiWallTimer();
1995         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
1996         if(mgr == NULL){
1997                 CmiFree(data);
1998                 return;
1999         }
2000         CkLocRec *rec = mgr->elementNrec(data->idx);
2001         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
2002         if(rec != NULL){
2003                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
2004                         if(data->fromPE == data->locationPE){
2005                                 CmiAbort("Another processor has the same object");
2006                         }
2007                 }
2008         }
2009         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
2010                 int targetPE = data->fromPE;
2011                 data->fromPE = CmiMyPe();
2012                 data->locationPE = CmiMyPe();
2013                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
2014                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
2015         }else{
2016                 mgr->inform(data->idx,data->locationPE);
2017         }
2018         CmiFree(data);
2019         traceUserBracketEvent(38,_startTime,CmiWallTimer());
2020 }
2021
2022
2023
2024 void getGlobalStep(CkGroupID gID){
2025         LBStepMsg msg;
2026         int destPE = 0;
2027         msg.lbID = gID;
2028         msg.fromPE = CmiMyPe();
2029         msg.step = -1;
2030         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
2031         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
2032 };
2033
2034 void _getGlobalStepHandler(LBStepMsg *msg){
2035         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2036         msg->step = lb->step();
2037         CmiAssert(msg->fromPE != CmiMyPe());
2038         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
2039         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
2040         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
2041 };
2042
2043 /**
2044  * @brief Receives the global step handler from PE 0
2045  */
2046 void _recvGlobalStepHandler(LBStepMsg *msg){
2047         
2048         // updating restart decision number
2049         restartDecisionNumber = msg->step;
2050         CmiFree(msg);
2051
2052         CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
2053
2054         // continuing with restart process; send out the request to resend logged messages to all other processors
2055         CkVec<CkObjID> objectVec;
2056         forAllCharesDo(createObjIDList, (void *)&objectVec);
2057         int numberObjects = objectVec.size();
2058         
2059         //      resendMsg layout: |ResendRequest|Array of CkObjID|
2060         int totalSize = sizeof(ResendRequest) + numberObjects * sizeof(CkObjID);
2061         char *resendMsg = (char *)CmiAlloc(totalSize);  
2062
2063         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2064         resendReq->PE = CkMyPe(); 
2065         resendReq->numberObjects = numberObjects;
2066         char *objList = &resendMsg[sizeof(ResendRequest)];
2067         memcpy(objList,objectVec.getVec(),numberObjects * sizeof(CkObjID));     
2068
2069         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2070         CpvAccess(_currentObj) = lb;
2071         lb->ReceiveDummyMigration(restartDecisionNumber);
2072
2073         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2074         for(int i=0;i<CkNumPes();i++){
2075                 if(i != CkMyPe()){
2076                         CmiSyncSend(i,totalSize,resendMsg);
2077                 }
2078         }
2079         _resendMessagesHandler(resendMsg);
2080         CmiFree(resendMsg);
2081
2082         /* test for parallel restart migrate away object**/
2083         if(fastRecovery){
2084                 distributeRestartedObjects();
2085                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2086         }
2087
2088 };
2089
2090 /**
2091  * @brief Function to wrap up performance information.
2092  */
2093 void _messageLoggingExit(){
2094         
2095         // printing the signature for causal message logging
2096         if(CkMyPe() == 0)
2097                 printf("[%d] FastMessageLoggingExit \n",CmiMyPe());
2098
2099 #if COLLECT_STATS_MSGS
2100 #if COLLECT_STATS_MSGS_TOTAL
2101         printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
2102         printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
2103 #else
2104         printf("[%d] TARGETS: ",CmiMyPe());
2105         for(int i=0; i<CmiNumPes(); i++){
2106 #if COLLECT_STATS_MSG_COUNT
2107                 printf("%d ",numMsgsTarget[i]);
2108 #else
2109                 printf("%d ",sizeMsgsTarget[i]);
2110 #endif
2111         }
2112         printf("\n");
2113 #endif
2114 #endif
2115
2116
2117 }
2118
2119 /**
2120         The method for returning the actual object pointed to by an id
2121         If the object doesnot exist on the processor it returns NULL
2122 **/
2123
2124 void* CkObjID::getObject(){
2125         
2126                 switch(type){
2127                         case TypeChare: 
2128                                 return CkLocalChare(&data.chare.id);
2129                         case TypeMainChare:
2130                                 return CkLocalChare(&data.chare.id);
2131                         case TypeGroup:
2132         
2133                                 CkAssert(data.group.onPE == CkMyPe());
2134                                 return CkLocalBranch(data.group.id);
2135                         case TypeNodeGroup:
2136                                 CkAssert(data.group.onPE == CkMyNode());
2137                                 //CkLocalNodeBranch(data.group.id);
2138                                 {
2139                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
2140                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
2141                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
2142         
2143                                         return retval;
2144                                 }       
2145                         case TypeArray:
2146                                 {
2147         
2148         
2149                                         CkArrayID aid(data.array.id);
2150         
2151                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
2152         
2153                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asChild());
2154         
2155                                         return aProxy.ckLocal();
2156                                 }
2157                         default:
2158                                 CkAssert(0);
2159                 }
2160 }
2161
2162
2163 int CkObjID::guessPE(){
2164                 switch(type){
2165                         case TypeChare:
2166                         case TypeMainChare:
2167                                 return data.chare.id.onPE;
2168                         case TypeGroup:
2169                         case TypeNodeGroup:
2170                                 return data.group.onPE;
2171                         case TypeArray:
2172                                 {
2173                                         CkArrayID aid(data.array.id);
2174                                         if(aid.ckLocalBranch() == NULL){
2175                                                 return -1;
2176                                         }
2177                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asChild());
2178                                 }
2179                         default:
2180                                 CkAssert(0);
2181                 }
2182 };
2183
2184 char *CkObjID::toString(char *buf) const {
2185         
2186         switch(type){
2187                 case TypeChare:
2188                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
2189                         break;
2190                 case TypeMainChare:
2191                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);       
2192                         break;
2193                 case TypeGroup:
2194                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
2195                         break;
2196                 case TypeNodeGroup:
2197                         sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
2198                         break;
2199                 case TypeArray:
2200                         {
2201                                 const CkArrayIndexMax &idx = data.array.idx.asChild();
2202                                 const int *indexData = idx.data();
2203                                 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
2204                                 break;
2205                         }
2206                 default:
2207                         CkAssert(0);
2208         }
2209         
2210         return buf;
2211 };
2212
2213 void CkObjID::updatePosition(int PE){
2214         if(guessPE() == PE){
2215                 return;
2216         }
2217         switch(type){
2218                 case TypeArray:
2219                         {
2220                                         CkArrayID aid(data.array.id);
2221                                         if(aid.ckLocalBranch() == NULL){
2222                                                 
2223                                         }else{
2224                                                 char str[100];
2225                                                 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
2226 //                                              CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
2227                                                 CkLocRec *rec = mgr->elementNrec(data.array.idx.asChild());
2228                                                 if(rec != NULL){
2229                                                         if(rec->type() == CkLocRec::local){
2230                                                                 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
2231                                                                 return;
2232                                                         }
2233                                                 }
2234                                                 mgr->inform(data.array.idx.asChild(),PE);
2235                                         }       
2236                                 }
2237
2238                         break;
2239                 case TypeChare:
2240                 case TypeMainChare:
2241                         CkAssert(data.chare.id.onPE == PE);
2242                         break;
2243                 case TypeGroup:
2244                 case TypeNodeGroup:
2245                         CkAssert(data.group.onPE == PE);
2246                         break;
2247                 default:
2248                         CkAssert(0);
2249         }
2250 }
2251
2252 void MlogEntry::pup(PUP::er &p){
2253         p | destPE;
2254         p | _infoIdx;
2255         int size;
2256         if(!p.isUnpacking()){
2257 /*              CkAssert(env);
2258                 if(!env->isPacked()){
2259                         CkPackMessage(&env);
2260                 }*/
2261                 if(env == NULL){
2262                         //message was probably local and has been removed from logs
2263                         size = 0;
2264                 }else{
2265                         size = env->getTotalsize();
2266                 }       
2267         }
2268         p | size;
2269         if(p.isUnpacking()){
2270                 if(size > 0){
2271                         env = (envelope *)_allocEnv(ForChareMsg,size);
2272                 }else{
2273                         env = NULL;
2274                 }
2275         }
2276         if(size > 0){
2277                 p((char *)env,size);
2278         }
2279 };
2280
2281
2282 /**********************************
2283         * The methods of the message logging
2284         * data structure stored in each chare
2285         ********************************/
2286
2287 MCount ChareMlogData::nextSN(const CkObjID &recver){
2288         MCount *SN = ssnTable.getPointer(recver);
2289         if(SN==NULL){
2290                 ssnTable.put(recver) = 1;
2291                 return 1;
2292         }else{
2293                 (*SN)++;
2294                 return *SN;
2295         }
2296 };
2297  
2298 /**
2299  * Adds an entry into the message log.
2300  */
2301 void ChareMlogData::addLogEntry(MlogEntry *entry){
2302         DEBUG(char nameString[100]);
2303         DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
2304         DEBUG_MEM(CmiMemoryCheck());
2305
2306         // enqueuing the entry in the message log
2307         mlog.enq(entry);
2308 };
2309
2310 /**
2311  * Checks whether a ssn has been already received. The collateral effect is the ssn get added to the list.
2312  */
2313 int ChareMlogData::checkAndStoreSsn(const CkObjID &sender, MCount ssn){
2314         RSSN *rssn;
2315         rssn = receivedSsnTable.get(sender);
2316         if(rssn == NULL){
2317                 rssn = new RSSN();
2318                 receivedSsnTable.put(sender) = rssn;
2319         }
2320         return rssn->checkAndStore(ssn);
2321 }
2322
2323 /**
2324  * Pup method for the metadata.
2325  * We are preventing the whole message log to be stored (as proposed by Sayantan for dealing with multiple failures).
2326  * Then, we only support one failure at a time. Read Sayantan's thesis, sections 4.2 and 4.3 for more details.
2327  */
2328 void ChareMlogData::pup(PUP::er &p){
2329         int startSize=0;
2330         char nameStr[100];
2331         if(p.isSizing()){
2332                 PUP::sizer *sizep = (PUP::sizer *)&p;
2333                 startSize = sizep->size();
2334         }
2335         p | objID;
2336         if(p.isUnpacking()){
2337                 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
2338         }
2339         p | toResumeOrNot;
2340         p | resumeCount;
2341         DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
2342         
2343         ssnTable.pup(p);
2344         
2345         // pupping receivedSsnTable
2346         int rssnTableSize;
2347         if(!p.isUnpacking()){
2348                 rssnTableSize = receivedSsnTable.numObjects();
2349         }
2350         p | rssnTableSize;
2351         if(!p.isUnpacking()){
2352                 CkHashtableIterator *iter = receivedSsnTable.iterator();
2353                 while(iter->hasNext()){
2354                         CkObjID *objID;
2355                         RSSN **row = (RSSN **)iter->next((void **)&objID);
2356                         p | (*objID);
2357                         (*row)->pup(p);
2358                 }
2359                 delete iter;
2360         }else{
2361                 for(int i=0; i<rssnTableSize; i++){
2362                         CkObjID objID;
2363                         p | objID;
2364                         RSSN *row = new RSSN;
2365                         row->pup(p);
2366                         receivedSsnTable.put(objID) = row;
2367                 }
2368         }
2369         
2370         p | resendReplyRecvd;
2371         p | restartFlag;
2372
2373         if(p.isSizing()){
2374                 PUP::sizer *sizep = (PUP::sizer *)&p;
2375                 int pupSize = sizep->size()-startSize;
2376                 DEBUG(char name[40]);
2377                 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
2378         //      CkAssert(pupSize <100000000);
2379         }
2380         
2381         double _finTime = CkWallTimer();
2382         DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
2383 };
2384
2385 /****************
2386 *****************/
2387
2388 /**
2389  * Getting the pe number of the current processor's buddy.
2390  * In the team-based approach each processor might checkpoint in the next team, but currently
2391  * teams are only meant to reduce memory overhead.
2392  * Note: function getReverseCheckPointPE performs the reverse map. It must be changed accordingly.
2393  */
2394 int getCheckPointPE(){
2395         return (CmiMyPe() + 1) % CmiNumPes();
2396 }
2397
2398 /**
2399  * Getting the pe that checkpoints on this pe.
2400  */
2401 int getReverseCheckPointPE(){
2402         return (CmiMyPe() - 1 + CmiNumPes()) % CmiNumPes();
2403 }
2404
2405 //assume it is a packed envelope
2406 envelope *copyEnvelope(envelope *env){
2407         envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());
2408         memcpy(newEnv,env,env->getTotalsize());
2409         return newEnv;
2410 }
2411
2412 #endif