Removing message forwarding at the message-logging layer.
[charm.git] / src / ck-core / ckmessagelogging.C
1 /**
2   * Fast Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of causal message-logging approach. It does NOT use determinants at all.
5         * Supports multiple concurrent failures.
6         * Assumes the application is iterative and structured in communication.
7   */
8
9 #include "charm.h"
10 #include "ck.h"
11 #include "ckmessagelogging.h"
12 #include "queueing.h"
13 #include <sys/types.h>
14 #include <signal.h>
15 #include "CentralLB.h"
16
17 #ifdef _FAULT_MLOG_
18
19 // Collects some statistics about message logging. 
20 #define COLLECT_STATS_MSGS 0
21 #define COLLECT_STATS_MSGS_TOTAL 0
22 #define COLLECT_STATS_MSG_COUNT 0
23 #define COLLECT_STATS_MEMORY 0
24 #define COLLECT_STATS_TEAM 0
25
26 #define RECOVERY_SEND "SEND"
27 #define RECOVERY_PROCESS "PROCESS"
28
29 #define DEBUG_MEM(x)  //x
30 #define DEBUG(x) // x
31 #define DEBUG_RESTART(x)  //x
32 #define DEBUGLB(x)   // x
33 #define DEBUG_TEAM(x)  // x
34 #define DEBUG_PERF(x) // x
35 #define DEBUG_CHECKPOINT 1
36 #define DEBUG_NOW(x) x
37 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
38 #define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
39 #define DEBUG_RECOVERY(x) //x
40
41 extern const char *idx2str(const CkArrayIndex &ind);
42 extern const char *idx2str(const ArrayElement *el);
43
44 void getGlobalStep(CkGroupID gID);
45
46 bool fault_aware(CkObjID &recver);
47 void sendCheckpointData();
48 void createObjIDList(void *data,ChareMlogData *mlogData);
49 inline bool isLocal(int destPE);
50 inline bool isTeamLocal(int destPE);
51 void printLog(CkObjID *log);
52
53 int _restartFlag=0;
54 int _numRestartResponses=0;
55
56 char *checkpointDirectory=".";
57 int unAckedCheckpoint=0;
58 int countUpdateHomeAcks=0;
59
60 extern int teamSize;
61 extern int chkptPeriod;
62 extern bool fastRecovery;
63 extern int parallelRecovery;
64
65 char *killFile;
66 char *faultFile;
67 int killFlag=0;
68 int faultFlag=0;
69 int restartingMlogFlag=0;
70 void readKillFile();
71 double killTime=0.0;
72 double faultMean;
73 int checkpointCount=0;
74
75 /***** VARIABLES FOR MESSAGE LOGGING *****/
76 // stores the id of current object sending a message
77 CpvDeclare(Chare *,_currentObj);
78 // stores checkpoint from buddy
79 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
80 // stores the incarnation number from every other processor
81 CpvDeclare(char *, _incarnation);
82 /***** *****/
83
84 /***** VARIABLES FOR PARALLEL RECOVERY *****/
85 CpvDeclare(int, _numEmigrantRecObjs);
86 CpvDeclare(int, _numImmigrantRecObjs);
87 CpvDeclare(CkVec<CkLocation *> *, _immigrantRecObjs);
88 /***** *****/
89
90 #if COLLECT_STATS_MSGS
91 int *numMsgsTarget;
92 int *sizeMsgsTarget;
93 int totalMsgsTarget;
94 float totalMsgsSize;
95 #endif
96 #if COLLECT_STATS_MEMORY
97 int msgLogSize;
98 int bufferedDetsSize;
99 int storedDetsSize;
100 #endif
101
102 /***** IDS FOR MESSAGE LOGGING HANDLERS *****/
103 int _pingHandlerIdx;
104 int _checkpointRequestHandlerIdx;
105 int _storeCheckpointHandlerIdx;
106 int _checkpointAckHandlerIdx;
107 int _getCheckpointHandlerIdx;
108 int _recvCheckpointHandlerIdx;
109 int _verifyAckRequestHandlerIdx;
110 int _verifyAckHandlerIdx;
111 int _dummyMigrationHandlerIdx;
112 int     _getGlobalStepHandlerIdx;
113 int     _recvGlobalStepHandlerIdx;
114 int _updateHomeRequestHandlerIdx;
115 int _updateHomeAckHandlerIdx;
116 int _resendMessagesHandlerIdx;
117 int _receivedDetDataHandlerIdx;
118 int _distributedLocationHandlerIdx;
119 int _sendBackLocationHandlerIdx;
120
121 void setTeamRecovery(void *data, ChareMlogData *mlogData);
122 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
123 int verifyAckTotal;
124 int verifyAckCount;
125 int verifyAckedRequests=0;
126 RestartRequest *storedRequest;
127 int _falseRestart =0; /**
128                                                                                                         For testing on clusters we might carry out restarts on 
129                                                                                                         a porcessor without actually starting it
130                                                                                                         1 -> false restart
131                                                                                                         0 -> restart after an actual crash
132                                                                                                 */
133
134 //Load balancing globals
135 int onGoingLoadBalancing=0;
136 void *centralLb;
137 void (*resumeLbFnPtr)(void *);
138 int _receiveMlogLocationHandlerIdx;
139 int _receiveMigrationNoticeHandlerIdx;
140 int _receiveMigrationNoticeAckHandlerIdx;
141 int _checkpointBarrierHandlerIdx;
142 int _checkpointBarrierAckHandlerIdx;
143 CkVec<MigrationRecord> migratedNoticeList;
144 CkVec<RetainedMigratedObject *> retainedObjectList;
145 int donotCountMigration=0;
146 int countLBMigratedAway=0;
147 int countLBToMigrate=0;
148 int migrationDoneCalled=0;
149 int checkpointBarrierCount=0;
150 int globalResumeCount=0;
151 CkGroupID globalLBID;
152 int restartDecisionNumber=-1;
153 double lastCompletedAlarm=0;
154 double lastRestart=0;
155
156 //update location globals
157 int _receiveLocationHandlerIdx;
158
159 #if CMK_CONVERSE_MPI
160 static int heartBeatHandlerIdx;
161 static int heartBeatCheckHandlerIdx;
162 static int partnerFailureHandlerIdx;
163 static double lastPingTime = -1;
164
165 extern "C" void mpi_restart_crashed(int pe, int rank);
166 extern "C" int  find_spare_mpirank(int pe);
167
168 void heartBeatPartner();
169 void heartBeatHandler(void *msg);
170 void heartBeatCheckHandler();
171 void partnerFailureHandler(char *msg);
172 int getReverseCheckPointPE();
173 #endif
174
175 /***** *****/
176
177 /** 
178  * @brief Initialize message logging data structures and register handlers
179  */
180 void _messageLoggingInit(){
181         if(CkMyPe() == 0)
182                 CkPrintf("[%d] Fast Message Logging Support \n",CkMyPe());
183
184         //current object
185         CpvInitialize(Chare *,_currentObj);
186         
187         //registering handlers for message logging
188         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
189                 
190         //handlers for checkpointing
191         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
192         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
193         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
194
195         //handlers for restart
196         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
197         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
198         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
199         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
200         _sendBackLocationHandlerIdx=CkRegisterHandler((CmiHandler)_sendBackLocationHandler);
201         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
202         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
203         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
204
205         //handlers for load balancing
206         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
207         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
208         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
209         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
210         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
211         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
212         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
213         
214         //handlers for updating locations
215         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
216
217         // handlers for failure detection in MPI layer
218 #if CMK_CONVERSE_MPI
219         heartBeatHandlerIdx = CkRegisterHandler((CmiHandler)heartBeatHandler);
220         heartBeatCheckHandlerIdx = CkRegisterHandler((CmiHandler)heartBeatCheckHandler);
221         partnerFailureHandlerIdx = CkRegisterHandler((CmiHandler)partnerFailureHandler);
222 #endif
223         
224         // Cpv variables for message-logging protocol
225         CpvInitialize(char *, _incarnation);
226         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
227         for(int i=0; i<CmiNumPes(); i++){
228                 CpvAccess(_incarnation)[i] = 0;
229         }
230
231         // Cpv variables for parallel recovery
232         CpvInitialize(int, _numEmigrantRecObjs);
233     CpvAccess(_numEmigrantRecObjs) = 0;
234     CpvInitialize(int, _numImmigrantRecObjs);
235     CpvAccess(_numImmigrantRecObjs) = 0;
236
237     CpvInitialize(CkVec<CkLocation *> *, _immigrantRecObjs);
238     CpvAccess(_immigrantRecObjs) = new CkVec<CkLocation *>;
239
240         //Cpv variables for checkpoint
241         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
242         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
243
244         // registering user events for projections      
245         traceRegisterUserEvent("Remove Logs", 20);
246         traceRegisterUserEvent("Ticket Request Handler", 21);
247         traceRegisterUserEvent("Ticket Handler", 22);
248         traceRegisterUserEvent("Local Message Copy Handler", 23);
249         traceRegisterUserEvent("Local Message Ack Handler", 24);        
250         traceRegisterUserEvent("Preprocess current message",25);
251         traceRegisterUserEvent("Preprocess past message",26);
252         traceRegisterUserEvent("Preprocess future message",27);
253         traceRegisterUserEvent("Checkpoint",28);
254         traceRegisterUserEvent("Checkpoint Store",29);
255         traceRegisterUserEvent("Checkpoint Ack",30);
256         traceRegisterUserEvent("Send Ticket Request",31);
257         traceRegisterUserEvent("Generalticketrequest1",32);
258         traceRegisterUserEvent("TicketLogLocal",33);
259         traceRegisterUserEvent("next_ticket and SN",34);
260         traceRegisterUserEvent("Timeout for buffered remote messages",35);
261         traceRegisterUserEvent("Timeout for buffered local messages",36);
262         traceRegisterUserEvent("Inform Location Home",37);
263         traceRegisterUserEvent("Receive Location Handler",38);
264         
265         lastCompletedAlarm=CmiWallTimer();
266         lastRestart = CmiWallTimer();
267
268         // fault detection for MPI layer
269 #if CMK_CONVERSE_MPI
270   void heartBeatPartner();
271   void heartBeatCheckHandler();
272   CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
273   CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
274 #endif
275
276 #if COLLECT_STATS_MSGS
277 #if COLLECT_STATS_MSGS_TOTAL
278         totalMsgsTarget = 0;
279         totalMsgsSize = 0.0;
280 #else
281         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
282         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
283         for(int i=0; i<CmiNumPes(); i++){
284                 numMsgsTarget[i] = 0;
285                 sizeMsgsTarget[i] = 0;
286         }
287 #endif
288 #endif
289 #if COLLECT_STATS_MEMORY
290         msgLogSize = 0;
291         bufferedDetsSize = 0;
292         storedDetsSize = 0;
293 #endif
294
295 }
296
297 #if CMK_CONVERSE_MPI
298
299 /**
300  * Receives the notification of a failure and updates pe-to-rank mapping.
301  */
302 void partnerFailureHandler(char *msg)
303 {
304    int diepe = *(int *)(msg+CmiMsgHeaderSizeBytes);
305
306    // send message to crash pe to let it restart
307    int newrank = find_spare_mpirank(diepe);
308    int buddy = getCheckPointPE();
309    if (buddy == diepe)  {
310      mpi_restart_crashed(diepe, newrank);
311      CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
312    }
313 }
314
315 /**
316  * Registers last time it knew about the PE that checkpoints on it.
317  */
318 void heartBeatHandler(void *msg)
319 {
320         lastPingTime = CmiWallTimer();
321         CmiFree(msg);
322 }
323
324 /**
325  * Checks whether the PE that checkpoints on it is still alive.
326  */
327 void heartBeatCheckHandler()
328 {
329         double now = CmiWallTimer();
330         if (lastPingTime > 0 && now - lastPingTime > 4) {
331                 int i, pe, buddy;
332                 // tell everyone that PE is down
333                 buddy = getReverseCheckPointPE();
334                 CmiPrintf("[%d] detected buddy processor %d died %f %f. \n", CmiMyPe(), buddy, now, lastPingTime);
335                 
336                 for (int pe = 0; pe < CmiNumPes(); pe++) {
337                         if (pe == buddy) continue;
338                         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
339                         *(int *)(msg+CmiMsgHeaderSizeBytes) = buddy;
340                         CmiSetHandler(msg, partnerFailureHandlerIdx);
341                         CmiSyncSendAndFree(pe, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
342                 }
343         }
344         else 
345                 CcdCallOnCondition(CcdPERIODIC_5s,(CcdVoidFn)heartBeatCheckHandler,NULL);
346 }
347
348 /**
349  * Pings buddy to let it know this PE is alive. Used for failure detection.
350  */
351 void heartBeatPartner()
352 {
353         int buddy = getCheckPointPE();
354         //printf("[%d] heartBeatPartner %d\n", CmiMyPe(), buddy);
355         char *msg = (char*)CmiAlloc(CmiMsgHeaderSizeBytes+sizeof(int));
356         *(int *)(msg+CmiMsgHeaderSizeBytes) = CmiMyPe();
357         CmiSetHandler(msg, heartBeatHandlerIdx);
358         CmiSyncSendAndFree(buddy, CmiMsgHeaderSizeBytes+sizeof(int), (char *)msg);
359         CcdCallOnCondition(CcdPERIODIC_100ms,(CcdVoidFn)heartBeatPartner,NULL);
360 }
361 #endif
362
363 void killLocal(void *_dummy,double curWallTime);        
364
365 void readKillFile(){
366         FILE *fp=fopen(killFile,"r");
367         if(!fp){
368                 return;
369         }
370         int proc;
371         double sec;
372         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
373                 if(proc == CkMyPe()){
374                         killTime = CmiWallTimer()+sec;
375                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
376                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
377                 }
378         }
379         fclose(fp);
380 }
381
382 /**
383  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
384  * We assume an exponential distribution for the mean-time-between-failures.
385  */
386 void readFaultFile(){
387         FILE *fp=fopen(faultFile,"r");
388         if(!fp){
389                 return;
390         }
391         int proc;
392         double sec;
393         fscanf(fp,"%d %lf",&proc,&sec);
394         faultMean = sec;
395         if(proc == CkMyPe()){
396                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
397                 CcdCallFnAfter(killLocal,NULL,sec*1000);
398         }
399         fclose(fp);
400 }
401
402 void killLocal(void *_dummy,double curWallTime){
403         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
404         if(CmiWallTimer()<killTime-1){
405                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
406         }else{
407 #if CMK_CONVERSE_MPI
408                 CkDieNow();
409 #else
410                 kill(getpid(),SIGKILL);
411 #endif
412         }
413 }
414
415 #if ! CMK_CONVERSE_MPI
416 void CkDieNow()
417 {
418         // kill -9 for non-mpi version
419         CmiPrintf("[%d] die now.\n", CmiMyPe());
420         killTime = CmiWallTimer()+0.001;
421         CcdCallFnAfter(killLocal,NULL,1);
422 }
423 #endif
424
425
426 /*** Auxiliary Functions ***/
427
428 /************************ Message logging methods ****************/
429
430 /**
431  * Sends a group message that might be a broadcast.
432  */
433 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
434         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
435                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
436                 void *origMsg = EnvToUsr(env);
437                 for(int i=0;i<CmiNumPes();i++){
438                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
439                                 void *copyMsg = CkCopyMsg(&origMsg);
440                                 envelope *copyEnv = UsrToEnv(copyMsg);
441                                 copyEnv->SN=0;
442                                 copyEnv->sender.type = TypeInvalid;
443                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
444                                 sendGroupMsg(copyEnv,i,_infoIdx);
445                         }
446                 }
447                 return;
448         }
449
450         // initializing values of envelope
451         env->SN=0;
452         env->sender.type = TypeInvalid;
453
454         CkObjID recver;
455         recver.type = TypeGroup;
456         recver.data.group.id = env->getGroupNum();
457         recver.data.group.onPE = destPE;
458         sendCommonMsg(recver,env,destPE,_infoIdx);
459 }
460
461 /**
462  * Sends a nodegroup message that might be a broadcast.
463  */
464 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
465         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
466                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
467                 void *origMsg = EnvToUsr(env);
468                 for(int i=0;i<CmiNumNodes();i++){
469                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
470                                 void *copyMsg = CkCopyMsg(&origMsg);
471                                 envelope *copyEnv = UsrToEnv(copyMsg);
472                                 copyEnv->SN=0;
473                                 copyEnv->sender.type = TypeInvalid;
474                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
475                         }
476                 }
477                 return;
478         }
479
480         // initializing values of envelope
481         env->SN=0;
482         env->sender.type = TypeInvalid;
483
484         CkObjID recver;
485         recver.type = TypeNodeGroup;
486         recver.data.group.id = env->getGroupNum();
487         recver.data.group.onPE = destNode;
488         sendCommonMsg(recver,env,destNode,_infoIdx);
489 }
490
491 /**
492  * Sends a message to an array element.
493  */
494 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
495         CkObjID recver;
496         recver.type = TypeArray;
497         recver.data.array.id = env->getsetArrayMgr();
498         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
499
500         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
501                 char recverString[100],senderString[100];
502                 
503                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
504         }
505
506         // initializing values of envelope
507         env->SN = 0;
508
509         sendCommonMsg(recver,env,destPE,_infoIdx);
510 };
511
512 /**
513  * Sends a message to a singleton chare.
514  */
515 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
516         CkObjID recver;
517         recver.type = TypeChare;
518         recver.data.chare.id = *pCid;
519
520         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
521                 char recverString[100],senderString[100];
522                 
523                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
524         }
525
526         // initializing values of envelope
527         env->SN = 0;
528
529         sendCommonMsg(recver,env,destPE,_infoIdx);
530 };
531
532 /**
533  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
534  */
535 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
536         envelope *env = _env;
537         int resend=0; //is it a resend
538         DEBUG(char recverName[100]);
539         DEBUG(char senderString[100]);
540         
541         DEBUG_MEM(CmiMemoryCheck());
542
543         if(CpvAccess(_currentObj) == NULL){
544 //              CkAssert(0);
545                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
546                 generalCldEnqueue(destPE,env,_infoIdx);
547                 return;
548         }
549
550         // checking if this message should bypass determinants in message-logging
551         if(env->flags & CK_BYPASS_DET_MLOG){
552                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
553                 env->recver = recver;
554                 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
555                 generalCldEnqueue(destPE,env,_infoIdx);
556                 return;
557         }
558         
559         // setting message logging data in the envelope
560         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
561         env->sender = CpvAccess(_currentObj)->mlogData->objID;
562         env->SN = 0;
563         
564         DEBUG_MEM(CmiMemoryCheck());
565
566         CkObjID &sender = env->sender;
567         env->recver = recver;
568
569         Chare *obj = (Chare *)env->sender.getObject();
570           
571         if(env->SN == 0){
572                 DEBUG_MEM(CmiMemoryCheck());
573                 env->SN = obj->mlogData->nextSN(recver);
574         }else{
575                 resend = 1;
576         }
577
578         // uses the proper sending mechanism for local or remote messages
579         if(isLocal(destPE)){
580                 sendLocalMsg(env, _infoIdx);
581         }else{
582                 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
583                 sendRemoteMsg(sender,recver,destPE,mEntry,env->SN,resend);
584         }
585 }
586
587 /**
588  * Determines if the message is local or not. A message is local if:
589  * 1) Both the destination and origin are the same PE.
590  */
591 inline bool isLocal(int destPE){
592         // both the destination and the origin are the same PE
593         if(destPE == CkMyPe())
594                 return true;
595
596         return false;
597 }
598
599 /**
600  * Determines if the message is group local or not. A message is group local if:
601  * 1) They belong to the same team in the team-based message logging.
602  */
603 inline bool isTeamLocal(int destPE){
604
605         // they belong to the same group
606         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
607                 return true;
608
609         return false;
610 }
611
612 /**
613  * Method that does the actual send by creating a ticket request filling it up and sending it.
614  */
615 void sendRemoteMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend){
616         DEBUG_NOW(char recverString[100]);
617         DEBUG_NOW(char senderString[100]);
618
619         int totalSize;
620
621         envelope *env = entry->env;
622         DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
623
624         // setting all the information
625         Chare *obj = (Chare *)entry->env->sender.getObject();
626         entry->env->recver = recver;
627         entry->env->SN = SN;
628         if(!resend){
629                 obj->mlogData->addLogEntry(entry);
630 #if COLLECT_STATS_TEAM
631                 MLOGFT_totalMessages += 1.0;
632                 MLOGFT_totalLogSize += entry->env->getTotalsize();
633 #endif
634         }
635
636         // sending the message
637         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
638
639         DEBUG_MEM(CmiMemoryCheck());
640 #if COLLECT_STATS_MSGS
641 #if COLLECT_STATS_MSGS_TOTAL
642         totalMsgsTarget++;
643         totalMsgsSize += (float)env->getTotalsize();
644 #else
645         numMsgsTarget[destPE]++;
646         sizeMsgsTarget[destPE] += env->getTotalsize();
647 #endif
648 #endif
649 #if COLLECT_STATS_MEMORY
650         msgLogSize += env->getTotalsize();
651 #endif
652 };
653
654
655 /**
656  * @brief Function to send a local message. It first gets a ticket and
657  * then enqueues the message. If we are recovering, then the message 
658  * is enqueued in a delay queue.
659  */
660 void sendLocalMsg(envelope *env, int _infoIdx){
661         DEBUG_PERF(double _startTime=CkWallTimer());
662         DEBUG_MEM(CmiMemoryCheck());
663         DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
664         DEBUG(char senderString[100]);
665         DEBUG(char recverString[100]);
666
667         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
668
669         // getting the receiver local object
670         Chare *recverObj = (Chare *)env->recver.getObject();
671
672         // if receiver object is not NULL, we will ask it for a ticket
673         if(recverObj){
674
675                 // sends the local message
676                 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
677
678                 DEBUG_MEM(CmiMemoryCheck());
679         }else{
680                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
681         }
682 };
683
684 /****
685         The handler functions
686 *****/
687
688 bool fault_aware(CkObjID &recver){
689         switch(recver.type){
690                 case TypeChare:
691                         return true;
692                 case TypeMainChare:
693                         return false;
694                 case TypeGroup:
695                 case TypeNodeGroup:
696                 case TypeArray:
697                         return true;
698                 default:
699                         return false;
700         }
701 };
702
703 /* Preprocesses a received message */
704 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
705         DEBUG_NOW(char recverString[100]);
706         DEBUG_NOW(char senderString[100]);
707         DEBUG_MEM(CmiMemoryCheck());
708         int flag;
709         bool ticketSuccess;
710
711         // getting the receiver object
712         CkObjID recver = env->recver;
713
714         // checking for determinants bypass in message logging
715         if(env->flags & CK_BYPASS_DET_MLOG){
716                 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
717                 return 1;       
718         }
719
720         // checking if receiver is fault aware
721         if(!fault_aware(recver)){
722                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
723                 return 1;
724         }
725
726         // checking if object is NULL 
727         Chare *obj = (Chare *)recver.getObject();
728         *objPointer = obj;
729         if(obj == NULL){
730                 // the objects does not exist on this PE, we let Charm forward the message and update the location manager.
731                 DEBUG(printf("[%d] Message SN %d sender %s for NULL recver %s\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
732                 return 1;
733         }
734
735         // checking if message comes from an old incarnation, message must be discarded
736         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
737                 CmiFree(env);
738                 return 0;
739         }
740
741         DEBUG_MEM(CmiMemoryCheck());
742         DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
743
744         // checking if message has already been processed, message must be discarded    
745         if(obj->mlogData->checkAndStoreSsn(env->sender,env->SN)){
746                 DEBUG(printf("[%d] Message SN %d sender %s for recver %s being ignored\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
747                 CmiFree(env);
748                 return 0;
749         }
750
751         // message can be processed at this point
752         DEBUG(printf("[%d] Message SN %d sender %s for recver %s being delivered\n",CkMyPe(),env->SN,env->sender.toString(senderString),recver.toString(recverString)));
753         return 1;
754 }
755
756 /**
757  * @brief Updates a few variables once a message has been processed.
758  */
759 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
760 }
761
762 /***
763         Helpers for the handlers and message logging methods
764 ***/
765
766 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
767 //      double _startTime = CkWallTimer();
768         if(env->recver.type != TypeNodeGroup){
769         //This repeats a step performed in skipCldEnq for messages sent to
770         //other processors. I do this here so that messages to local processors
771         //undergo the same transformation.. It lets the resend be uniform for 
772         //all messages
773 //              CmiSetXHandler(env,CmiGetHandler(env));
774                 _skipCldEnqueue(destPE,env,_infoIdx);
775         }else{
776                 _noCldNodeEnqueue(destPE,env);
777         }
778 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
779 }
780 //extern "C" int CmiGetNonLocalLength();
781
782 void _pingHandler(CkPingMsg *msg){
783         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
784         CmiFree(msg);
785 }
786
787
788 /*****************************************************************************
789         Checkpointing methods..
790                 Pack all the data on a processor and send it to the buddy periodically
791                 Also used to throw away message logs
792 *****************************************************************************/
793 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
794 void clearUpMigratedRetainedLists(int PE);
795
796 void checkpointAlarm(void *_dummy,double curWallTime){
797         double diff = curWallTime-lastCompletedAlarm;
798         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
799 /*      if(CkWallTimer()-lastRestart < 50){
800                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
801                 return;
802         }*/
803         if(diff < ((chkptPeriod) - 2)){
804                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
805                 return;
806         }
807         CheckpointRequest request;
808         request.PE = CkMyPe();
809         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
810         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
811 };
812
813 void _checkpointRequestHandler(CheckpointRequest *request){
814         startMlogCheckpoint(NULL,CmiWallTimer());
815 }
816
817 /**
818  * @brief Starts the checkpoint phase after migration.
819  */
820 void startMlogCheckpoint(void *_dummy, double curWallTime){
821         double _startTime = CkWallTimer();
822
823         // increasing the checkpoint counter
824         checkpointCount++;
825         
826 #if DEBUG_CHECKPOINT
827         if(CmiMyPe() == 0){
828                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
829         }
830 #endif
831
832         DEBUG_MEM(CmiMemoryCheck());
833
834         PUP::sizer psizer;
835         psizer | checkpointCount;
836         for(int i=0; i<CmiNumPes(); i++){
837                 psizer | CpvAccess(_incarnation)[i];
838         }
839         CkPupROData(psizer);
840         DEBUG_MEM(CmiMemoryCheck());
841         CkPupGroupData(psizer,CmiTrue);
842         DEBUG_MEM(CmiMemoryCheck());
843         CkPupNodeGroupData(psizer,CmiTrue);
844         DEBUG_MEM(CmiMemoryCheck());
845         pupArrayElementsSkip(psizer,CmiTrue,NULL);
846         DEBUG_MEM(CmiMemoryCheck());
847
848         int dataSize = psizer.size();
849         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
850         char *msg = (char *)CmiAlloc(totalSize);
851         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
852         chkMsg->PE = CkMyPe();
853         chkMsg->dataSize = dataSize;
854         
855         char *buf = &msg[sizeof(CheckPointDataMsg)];
856         PUP::toMem pBuf(buf);
857
858         pBuf | checkpointCount;
859         for(int i=0; i<CmiNumPes(); i++){
860                 pBuf | CpvAccess(_incarnation)[i];
861         }
862         CkPupROData(pBuf);
863         CkPupGroupData(pBuf,CmiTrue);
864         CkPupNodeGroupData(pBuf,CmiTrue);
865         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
866
867         unAckedCheckpoint=1;
868         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
869         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
870
871 #if DEBUG_CHECKPOINT
872         if(CmiMyPe() == 0){
873                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
874         }
875 #endif
876
877 #if COLLECT_STATS_MEMORY
878         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
879         msgLogSize = 0;
880         bufferedDetsSize = 0;
881         storedDetsSize = 0;
882 #endif
883
884         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
885                 lastCompletedAlarm = curWallTime;
886                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
887         }
888         traceUserBracketEvent(28,_startTime,CkWallTimer());
889 };
890
891
892 class ElementPacker : public CkLocIterator {
893 private:
894         CkLocMgr *locMgr;
895         PUP::er &p;
896 public:
897         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
898         void addLocation(CkLocation &loc) {
899                 CkArrayIndexMax idx=loc.getIndex();
900                 CkGroupID gID = locMgr->ckGetGroupID();
901                 p|gID;      // store loc mgr's GID as well for easier restore
902                 p|idx;
903                 p|loc;
904         }
905 };
906
907 /**
908  * Pups all the array elements in this processor.
909  */
910 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
911         int numElements,i;
912         int numGroups = CkpvAccess(_groupIDTable)->size();      
913         if(!p.isUnpacking()){
914                 numElements = CkCountArrayElements();
915         }       
916         p | numElements;
917         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
918         if(!p.isUnpacking()){
919                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
920         }else{
921                 //Flush all recs of all LocMgrs before putting in new elements
922 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
923                 for(int j=0;j<listsize;j++){
924                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
925                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
926                                 listToSkip[j].idx.print();
927                         }
928                 }
929                 
930                 printf("numElements = %d\n",numElements);
931         
932                 for (int i=0; i<numElements; i++) {
933                         CkGroupID gID;
934                         CkArrayIndexMax idx;
935                         p|gID;
936                 p|idx;
937                         int flag=0;
938                         int matchedIdx=0;
939                         for(int j=0;j<listsize;j++){
940                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
941                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
942                                                 matchedIdx = j;
943                                                 flag = 1;
944                                                 break;
945                                         }
946                                 }
947                         }
948                         if(flag == 1){
949                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
950                         }else{
951                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
952                         }
953                                 
954                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
955                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
956                         mgr->resume(idx,p,create,flag);
957                         if(flag == 1){
958                                 int homePE = mgr->homePe(idx);
959                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
960                         }
961                 }
962         }
963 };
964
965
966 void writeCheckpointToDisk(int size,char *chkpt){
967         char fNameTemp[100];
968         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
969         int fd = creat(fNameTemp,S_IRWXU);
970         int ret = write(fd,chkpt,size);
971         CkAssert(ret == size);
972         close(fd);
973         
974         char fName[100];
975         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
976         unlink(fName);
977
978         rename(fNameTemp,fName);
979 }
980
981 //handler that receives the checkpoint from a processor
982 //it stores it and acks it
983 void _storeCheckpointHandler(char *msg){
984         double _startTime=CkWallTimer();
985                 
986         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
987         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
988         
989         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
990         
991         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
992         if(oldChkpt != NULL){
993                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
994                 CmiFree(oldmsg);
995         }
996         //turning off storing checkpoints
997         
998         int sendingPE = chkMsg->PE;
999         
1000         CpvAccess(_storedCheckpointData)->buf = chkpt;
1001         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1002         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1003
1004         int count=0;
1005         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1006                 if(migratedNoticeList[j].fromPE == sendingPE){
1007                         migratedNoticeList[j].ackFrom = 1;
1008                 }else{
1009                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1010                 }
1011                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1012                         migratedNoticeList.remove(j);
1013                         count++;
1014                 }
1015                 
1016         }
1017         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1018         
1019         CheckPointAck ackMsg;
1020         ackMsg.PE = CkMyPe();
1021         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1022         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1023         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1024         
1025         traceUserBracketEvent(29,_startTime,CkWallTimer());
1026 };
1027
1028
1029 void _checkpointAckHandler(CheckPointAck *ackMsg){
1030         DEBUG_MEM(CmiMemoryCheck());
1031         unAckedCheckpoint=0;
1032         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1033         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1034         if(onGoingLoadBalancing){
1035                 onGoingLoadBalancing = 0;
1036                 finishedCheckpointLoadBalancing();
1037         }
1038         CmiFree(ackMsg);
1039 };
1040
1041 void clearUpMigratedRetainedLists(int PE){
1042         int count=0;
1043         CmiMemoryCheck();
1044         
1045         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1046                 if(migratedNoticeList[j].toPE == PE){
1047                         migratedNoticeList[j].ackTo = 1;
1048                 }
1049                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1050                         migratedNoticeList.remove(j);
1051                         count++;
1052                 }
1053         }
1054         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1055         
1056         for(int j=retainedObjectList.size()-1;j>=0;j--){
1057                 if(retainedObjectList[j]->migRecord.toPE == PE){
1058                         RetainedMigratedObject *obj = retainedObjectList[j];
1059                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1060                         retainedObjectList.remove(j);
1061                         if(obj->msg != NULL){
1062                                 CmiMemoryCheck();
1063                                 CmiFree(obj->msg);
1064                         }
1065                         delete obj;
1066                 }
1067         }
1068 }
1069
1070 /***************************************************************
1071         Restart Methods and handlers
1072 ***************************************************************/        
1073
1074 /**
1075  * Function for restarting the crashed processor.
1076  * It sets the restart flag and contacts the buddy
1077  * processor to get the latest checkpoint.
1078  */
1079 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1080         RestartRequest msg;
1081
1082         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1083
1084         // setting the restart flag
1085         _restartFlag = 1;
1086         _numRestartResponses = 0;
1087
1088         // requesting the latest checkpoint from its buddy
1089         msg.PE = CkMyPe();
1090         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1091         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1092 };
1093
1094 void CkMlogRestartDouble(void *,double){
1095         CkMlogRestart(NULL,NULL);
1096 };
1097
1098 /**
1099  * Gets the stored checkpoint for its buddy processor.
1100  */
1101 void _getCheckpointHandler(RestartRequest *restartMsg){
1102
1103         // retrieving the stored checkpoint
1104         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1105
1106         // making sure it is its buddy who is requesting the checkpoint
1107         CkAssert(restartMsg->PE == storedChkpt->PE);
1108
1109         storedRequest = restartMsg;
1110         verifyAckTotal = 0;
1111
1112         for(int i=0;i<migratedNoticeList.size();i++){
1113                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1114 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1115                         if(migratedNoticeList[i].ackFrom == 0){
1116                                 //need to verify if the object actually exists .. it might not
1117                                 //have been acked but it might exist on it
1118                                 VerifyAckMsg msg;
1119                                 msg.migRecord = migratedNoticeList[i];
1120                                 msg.index = i;
1121                                 msg.fromPE = CmiMyPe();
1122                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1123                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1124                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1125                                 verifyAckTotal++;
1126                         }
1127                 }
1128         }
1129
1130         // sending the checkpoint back to its buddy     
1131         if(verifyAckTotal == 0){
1132                 sendCheckpointData();
1133         }
1134         verifyAckCount = 0;
1135 }
1136
1137
1138 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1139         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1140         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1141         if(rec != NULL && rec->type() == CkLocRec::local){
1142                         //this location exists on this processor
1143                         //and needs to be removed       
1144                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1145                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1146                         
1147                         int localIdx = localRec->getLocalIndex();
1148                         LBDatabase *lbdb = localRec->getLBDB();
1149                         LDObjHandle ldHandle = localRec->getLdHandle();
1150                                 
1151                         locMgr->setDuringMigration(true);
1152                         
1153                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1154                         lbdb->UnregisterObj(ldHandle);
1155                         
1156                         locMgr->setDuringMigration(false);
1157                         
1158                         verifyAckedRequests++;
1159
1160         }
1161         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1162         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1163 };
1164
1165
1166 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1167         int index =     verifyReply->index;
1168         migratedNoticeList[index] = verifyReply->migRecord;
1169         verifyAckCount++;
1170         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1171         if(verifyAckCount == verifyAckTotal){
1172                 sendCheckpointData();
1173         }
1174 }
1175
1176
1177 /**
1178  * Sends the checkpoint to its buddy. 
1179  */
1180 void sendCheckpointData(){      
1181         RestartRequest *restartMsg = storedRequest;
1182         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1183         int numMigratedAwayElements = migratedNoticeList.size();
1184         if(migratedNoticeList.size() != 0){
1185                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1186 //                      CkAssert(migratedNoticeList.size() == 0);
1187         }
1188         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1189         
1190         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1191         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1192         
1193         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1194         
1195         char *msg = (char *)CmiAlloc(totalSize);
1196         
1197         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1198         dataMsg->PE = CkMyPe();
1199         dataMsg->restartWallTime = CmiTimer();
1200         dataMsg->checkPointSize = storedChkpt->bufSize;
1201         
1202         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1203 //      dataMsg->numMigratedAwayElements = 0;
1204         
1205         dataMsg->numMigratedInElements = 0;
1206         dataMsg->migratedElementSize = 0;
1207         dataMsg->lbGroupID = globalLBID;
1208         /*msg layout 
1209                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1210                 Local MessageLog|
1211         */
1212         //store checkpoint data
1213         char *buf = &msg[sizeof(RestartProcessorData)];
1214
1215         if(dataMsg->numMigratedAwayElements != 0){
1216                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1217                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1218         }
1219         
1220
1221         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1222         buf = &buf[storedChkpt->bufSize];
1223
1224         CmiSetHandler(msg,_recvCheckpointHandlerIdx);
1225         CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1226         CmiFree(restartMsg);
1227
1228 };
1229
1230
1231 // this list is used to create a vector of the object ids of all
1232 //the chares on this processor currently and the highest TN processed by them 
1233 //the first argument is actually a CkVec<TProcessedLog> *
1234 void createObjIDList(void *data, ChareMlogData *mlogData){
1235         CkVec<CkObjID> *list = (CkVec<CkObjID> *)data;
1236         CkObjID entry;
1237         entry = mlogData->objID;
1238         list->push_back(entry);
1239         DEBUG_RECOVERY(printLog(&entry));
1240 }
1241
1242
1243 /**
1244  * Receives the checkpoint data from its buddy, restores the state of all the objects
1245  * and asks everyone else to update its home.
1246  */
1247 void _recvCheckpointHandler(char *_restartData){
1248         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1249         MigrationRecord *migratedAwayElements;
1250
1251         globalLBID = restartData->lbGroupID;
1252         
1253         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1254         char *buf = &_restartData[sizeof(RestartProcessorData)];
1255         
1256         if(restartData->numMigratedAwayElements != 0){
1257                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1258                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1259                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1260                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1261         }
1262         
1263         PUP::fromMem pBuf(buf);
1264
1265         pBuf | checkpointCount;
1266         for(int i=0; i<CmiNumPes(); i++){
1267                 pBuf | CpvAccess(_incarnation)[i];
1268         }
1269         CkPupROData(pBuf);
1270         CkPupGroupData(pBuf,CmiTrue);
1271         CkPupNodeGroupData(pBuf,CmiTrue);
1272         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1273         CkAssert(pBuf.size() == restartData->checkPointSize);
1274         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1275
1276         // increases the incarnation number
1277         CpvAccess(_incarnation)[CmiMyPe()]++;
1278         
1279         forAllCharesDo(initializeRestart,NULL);
1280         
1281         CmiFree(_restartData);
1282         
1283         _initDone();
1284
1285         getGlobalStep(globalLBID);
1286
1287         
1288 }
1289
1290 /**
1291  * @brief Initializes variables and flags for restarting procedure.
1292  */
1293 void initializeRestart(void *data, ChareMlogData *mlogData){
1294         mlogData->resendReplyRecvd = 0;
1295         mlogData->restartFlag = 1;
1296 };
1297
1298 /**
1299  * Updates the homePe of chare array elements.
1300  */
1301 void updateHomePE(void *data,ChareMlogData *mlogData){
1302         RestartRequest *updateRequest = (RestartRequest *)data;
1303         int PE = updateRequest->PE; //restarted PE
1304         //if this object is an array Element and its home is the restarted processor
1305         // the home processor needs to know its current location
1306         if(mlogData->objID.type == TypeArray){
1307                 //it is an array element
1308                 CkGroupID myGID = mlogData->objID.data.array.id;
1309                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
1310                 CkArrayID aid(mlogData->objID.data.array.id);           
1311                 //check if the restarted processor is the home processor for this object
1312                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
1313                 if(locMgr->homePe(myIdx) == PE){
1314                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
1315                         DEBUG_RESTART(myIdx.print());
1316                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
1317                 }
1318         }
1319 };
1320
1321 /**
1322  * Prints a processed log.
1323  */
1324 void printLog(CkObjID &recver){
1325         char recverString[100];
1326         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" \n",CkMyPe(),recver.toString(recverString));
1327 }
1328
1329 /**
1330  * Prints information about a message.
1331  */
1332 void printMsg(envelope *env, const char* par){
1333         char senderString[100];
1334         char recverString[100];
1335         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
1336 }
1337
1338 /**
1339  * @brief Resends all the logged messages to a particular chare list.
1340  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
1341  * @param mlogData a particular chare living in this processor.
1342  */
1343 void resendMessageForChare(void *data, ChareMlogData *mlogData){
1344         DEBUG_RESTART(char nameString[100]);
1345         DEBUG_RESTART(char recverString[100]);
1346         DEBUG_RESTART(char senderString[100]);
1347
1348         ResendData *resendData = (ResendData *)data;
1349         int PE = resendData->PE; //restarted PE
1350         int count=0;
1351         int ticketRequests=0;
1352         CkQ<MlogEntry *> *log = mlogData->getMlog();
1353
1354         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
1355
1356         // traversing the message log to see if we must resend a message        
1357         for(int i=0;i<log->length();i++){
1358                 MlogEntry *logEntry = (*log)[i];
1359                 
1360                 // if we sent out the logs of a local message to buddy and it crashed
1361                 //before acknowledging 
1362                 envelope *env = logEntry->env;
1363                 if(env == NULL){
1364                         continue;
1365                 }
1366         
1367                 // resend if type is not invalid        
1368                 if(env->recver.type != TypeInvalid){
1369                         for(int j=0;j<resendData->numberObjects;j++){
1370                                 if(env->recver == (resendData->listObjects)[j]){
1371                                         if(PE != CkMyPe()){
1372                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
1373                                                 if(env->recver.type == TypeNodeGroup){
1374                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
1375                                                 }else{
1376                                                         CmiSetHandler(env,CmiGetXHandler(env));
1377                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
1378                                                 }
1379                                         }else{
1380                                                 envelope *copyEnv = copyEnvelope(env);
1381                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
1382                                         }
1383                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
1384                                         count++;
1385                                 }
1386                         }//end of for loop of objects
1387                         
1388                 }       
1389         }
1390         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
1391 }
1392
1393 /**
1394  * Resends messages since last checkpoint to the list of objects included in the 
1395  * request. It also sends stored remote determinants to the particular failed PE.
1396  */
1397 void _resendMessagesHandler(char *msg){
1398         ResendData d;
1399         ResendRequest *resendReq = (ResendRequest *)msg;
1400
1401         // building the reply message
1402         char *listObjects = &msg[sizeof(ResendRequest)];
1403         d.numberObjects = resendReq->numberObjects;
1404         d.PE = resendReq->PE;
1405         d.listObjects = (CkObjID *)listObjects;
1406         
1407         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
1408
1409         // resends messages for the list of objects
1410         forAllCharesDo(resendMessageForChare,&d);
1411
1412         DEBUG_MEM(CmiMemoryCheck());
1413
1414         if(resendReq->PE != CkMyPe()){
1415                 CmiFree(msg);
1416         }       
1417 }
1418
1419 /*
1420         Method to do parallel restart. Distribute some of the array elements to other processors.
1421         The problem is that we cant use to charm entry methods to do migration as it will get
1422         stuck in the protocol that is going to restart
1423         Note: in order to avoid interference between the objects being recovered, the current PE
1424     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
1425     objects end up stepping into each other's shoes (interference).
1426 */
1427
1428 class ElementDistributor: public CkLocIterator{
1429         CkLocMgr *locMgr;
1430         int *targetPE;
1431
1432         void pupLocation(CkLocation &loc,PUP::er &p){
1433                 CkArrayIndexMax idx=loc.getIndex();
1434                 CkGroupID gID = locMgr->ckGetGroupID();
1435                 p|gID;      // store loc mgr's GID as well for easier restore
1436                 p|idx;
1437                 p|loc;
1438         };
1439 public:
1440         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
1441
1442         void addLocation(CkLocation &loc){
1443
1444                 // leaving object on this PE
1445                 if(*targetPE == CkMyPe()){
1446                         *targetPE = (*targetPE +1)%CkNumPes();
1447                         return;
1448                 }
1449                         
1450                 CkArrayIndexMax idx = loc.getIndex();
1451                 CkLocRec_local *rec = loc.getLocalRecord();
1452                 CkLocMgr *locMgr = loc.getManager();
1453                 CkVec<CkMigratable *> eltList;
1454                         
1455                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
1456                 idx.print();
1457
1458                 // incrementing number of emigrant objects
1459                 CpvAccess(_numEmigrantRecObjs)++;
1460         locMgr->migratableList((CkLocRec_local *)rec,eltList);
1461                 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
1462                 
1463                 // let everybody else know the object is leaving
1464                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
1465                 reductionMgr->incNumEmigrantRecObjs();
1466         
1467                 //pack up this location and send it across
1468                 PUP::sizer psizer;
1469                 pupLocation(loc,psizer);
1470                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
1471                 char *msg = (char *)CmiAlloc(totalSize);
1472                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
1473                 distributeMsg->PE = CkMyPe();
1474                 char *buf = &msg[sizeof(DistributeObjectMsg)];
1475                 PUP::toMem pmem(buf);
1476                 pmem.becomeDeleting();
1477                 pupLocation(loc,pmem);
1478                         
1479                 locMgr->setDuringMigration(CmiTrue);
1480                 delete rec;
1481                 locMgr->setDuringMigration(CmiFalse);
1482                 locMgr->inform(idx,*targetPE);
1483
1484                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
1485                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
1486
1487                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
1488
1489                 //decide on the target processor for the next object
1490                 *targetPE = *targetPE + 1;
1491                 if(*targetPE > (CkMyPe() + parallelRecovery)){
1492                         *targetPE = CkMyPe() + 1;
1493                 }
1494         }
1495
1496 };
1497
1498 /**
1499  * Distributes objects to accelerate recovery after a failure.
1500  */
1501 void distributeRestartedObjects(){
1502         int numGroups = CkpvAccess(_groupIDTable)->size();      
1503         int i;
1504         int targetPE=CkMyPe()+1;
1505         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
1506 };
1507
1508 /**
1509  * Handler to receive back a location.
1510  */
1511 void _sendBackLocationHandler(char *receivedMsg){
1512         printf("Array element received at processor %d after recovery\n",CkMyPe());
1513         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
1514         int sourcePE = distributeMsg->PE;
1515         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
1516         PUP::fromMem pmem(buf);
1517         CkGroupID gID;
1518         CkArrayIndexMax idx;
1519         pmem |gID;
1520         pmem |idx;
1521         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1522         donotCountMigration=1;
1523         mgr->resume(idx,pmem,CmiTrue);
1524         donotCountMigration=0;
1525         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
1526         printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
1527         idx.print();
1528
1529         // decrementing number of emigrant objects at reduction manager
1530         CkVec<CkMigratable *> eltList;
1531         CkLocRec *rec = mgr->elementRec(idx);
1532         mgr->migratableList((CkLocRec_local *)rec,eltList);
1533         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
1534         reductionMgr->decNumEmigrantRecObjs();
1535         reductionMgr->decGCount();
1536
1537         // checking if it has received all emigrant recovering objects
1538         CpvAccess(_numEmigrantRecObjs)--;
1539         if(CpvAccess(_numEmigrantRecObjs) == 0){
1540                 (*resumeLbFnPtr)(centralLb);
1541         }
1542
1543 }
1544
1545 /**
1546  * Handler to update information about an object just received.
1547  */
1548 void _distributedLocationHandler(char *receivedMsg){
1549         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
1550         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
1551         int sourcePE = distributeMsg->PE;
1552         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
1553         PUP::fromMem pmem(buf);
1554         CkGroupID gID;
1555         CkArrayIndexMax idx;
1556         pmem |gID;
1557         pmem |idx;
1558         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1559         donotCountMigration=1;
1560         mgr->resume(idx,pmem,CmiTrue);
1561         donotCountMigration=0;
1562         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
1563         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
1564         idx.print();
1565
1566         CkLocRec *rec = mgr->elementRec(idx);
1567         CmiAssert(rec->type() == CkLocRec::local);
1568
1569         // adding object to the list of immigrant recovery objects
1570         CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,(CkLocRec_local *)rec));
1571         CpvAccess(_numImmigrantRecObjs)++;
1572         
1573         CkVec<CkMigratable *> eltList;
1574         mgr->migratableList((CkLocRec_local *)rec,eltList);
1575         for(int i=0;i<eltList.size();i++){
1576                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
1577                         CpvAccess(_currentObj) = eltList[i];
1578                         eltList[i]->mlogData->immigrantRecFlag = 1;
1579                         eltList[i]->mlogData->immigrantSourcePE = sourcePE;
1580
1581                         // incrementing immigrant counter at reduction manager
1582                         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
1583                         reductionMgr->incNumImmigrantRecObjs();
1584                         reductionMgr->decGCount();
1585
1586                         eltList[i]->ResumeFromSync();
1587                 }
1588         }
1589 }
1590
1591
1592 /** this method is used to send messages to a restarted processor to tell
1593  * it that a particular expected object is not going to get to it */
1594 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
1595         DummyMigrationMsg buf;
1596         buf.flag = MLOG_OBJECT;
1597         buf.lbID = lbID;
1598         buf.mgrID = locMgrID;
1599         buf.idx = idx;
1600         buf.locationPE = locationPE;
1601         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
1602         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
1603 };
1604
1605
1606 /**this method is used by a restarted processor to tell other processors
1607  * that they are not going to receive these many objects.. just the count
1608  * not the objects themselves ***/
1609
1610 void sendDummyMigrationCounts(int *dummyCounts){
1611         DummyMigrationMsg buf;
1612         buf.flag = MLOG_COUNT;
1613         buf.lbID = globalLBID;
1614         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
1615         for(int i=0;i<CmiNumPes();i++){
1616                 if(i != CmiMyPe() && dummyCounts[i] != 0){
1617                         buf.count = dummyCounts[i];
1618                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
1619                 }
1620         }
1621 }
1622
1623
1624 /** this handler is used to process a dummy migration msg.
1625  * it looks up the load balancer and calls migrated for it */
1626
1627 void _dummyMigrationHandler(DummyMigrationMsg *msg){
1628         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
1629         if(msg->flag == MLOG_OBJECT){
1630                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
1631                 LDObjHandle h;
1632                 lb->Migrated(h,1);
1633         }
1634         if(msg->flag == MLOG_COUNT){
1635                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
1636                 msg->count -= verifyAckedRequests;
1637                 for(int i=0;i<msg->count;i++){
1638                         LDObjHandle h;
1639                         lb->Migrated(h,1);
1640                 }
1641         }
1642         verifyAckedRequests=0;
1643         CmiFree(msg);
1644 };
1645
1646 /*****************************************************
1647         Implementation of a method that can be used to call
1648         any method on the ChareMlogData of all the chares on
1649         a processor currently
1650 ******************************************************/
1651
1652
1653 class ElementCaller :  public CkLocIterator {
1654 private:
1655         CkLocMgr *locMgr;
1656         MlogFn fnPointer;
1657         void *data;
1658 public:
1659         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
1660                 locMgr = _locMgr;
1661                 fnPointer = _fnPointer;
1662                 data = _data;
1663         };
1664         void addLocation(CkLocation &loc){
1665                 CkVec<CkMigratable *> list;
1666                 CkLocRec_local *local = loc.getLocalRecord();
1667                 locMgr->migratableList (local,list);
1668                 for(int i=0;i<list.size();i++){
1669                         CkMigratable *migratableElement = list[i];
1670                         fnPointer(data,migratableElement->mlogData);
1671                 }
1672         }
1673 };
1674
1675 /**
1676  * Map function pointed by fnPointer over all the chares living in this processor.
1677  */
1678 void forAllCharesDo(MlogFn fnPointer, void *data){
1679         int numGroups = CkpvAccess(_groupIDTable)->size();
1680         for(int i=0;i<numGroups;i++){
1681                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
1682                 fnPointer(data,obj->mlogData);
1683         }
1684         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
1685         for(int i=0;i<numNodeGroups;i++){
1686                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
1687                 fnPointer(data,obj->mlogData);
1688         }
1689         int i;
1690         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
1691 };
1692
1693
1694 /******************************************************************
1695  Load Balancing
1696 ******************************************************************/
1697
1698 /**
1699  * This is the first time Converse is called after AtSync method has been called by every local object.
1700  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
1701  * message logging, we can take advantage of this situation and garbage collect at this point.
1702  */
1703 void initMlogLBStep(CkGroupID gid){
1704         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
1705         countLBMigratedAway = 0;
1706         countLBToMigrate=0;
1707         onGoingLoadBalancing=1;
1708         migrationDoneCalled=0;
1709         checkpointBarrierCount=0;
1710         if(globalLBID.idx != 0){
1711                 CmiAssert(globalLBID.idx == gid.idx);
1712         }
1713         globalLBID = gid;
1714 #if SYNCHRONIZED_CHECKPOINT
1715         garbageCollectMlog();
1716 #endif
1717 }
1718
1719 /**
1720  * Pups a location
1721  */
1722 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
1723         CkArrayIndexMax idx = loc->getIndex();
1724         CkGroupID gID = locMgr->ckGetGroupID();
1725         p|gID;      // store loc mgr's GID as well for easier restore
1726         p|idx;
1727         p|*loc;
1728 };
1729
1730 /**
1731  * Sends back the immigrant recovering object to their origin PE.
1732  */
1733 void sendBackImmigrantRecObjs(){
1734         CkLocation *loc;
1735         CkLocMgr *locMgr;
1736         CkArrayIndexMax idx;
1737         CkLocRec_local *rec;
1738         PUP::sizer psizer;
1739         int targetPE;
1740         CkVec<CkMigratable *> eltList;
1741         CkReductionMgr *reductionMgr;
1742  
1743         // looping through all elements in immigrant recovery objects vector
1744         for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
1745
1746                 // getting the components of each location
1747                 loc = (*CpvAccess(_immigrantRecObjs))[i];
1748                 idx = loc->getIndex();
1749                 rec = loc->getLocalRecord();
1750                 locMgr = loc->getManager();
1751         locMgr->migratableList((CkLocRec_local *)rec,eltList);
1752                 targetPE = eltList[i]->mlogData->immigrantSourcePE;
1753
1754                 // decrement counter at array manager
1755                 reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
1756                 reductionMgr->decNumImmigrantRecObjs();
1757
1758                 CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
1759                 idx.print();
1760
1761                 // let everybody else know the object is leaving
1762                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
1763                         
1764                 //pack up this location and send it across
1765                 pupLocation(loc,locMgr,psizer);
1766                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
1767                 char *msg = (char *)CmiAlloc(totalSize);
1768                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
1769                 distributeMsg->PE = CkMyPe();
1770                 char *buf = &msg[sizeof(DistributeObjectMsg)];
1771                 PUP::toMem pmem(buf);
1772                 pmem.becomeDeleting();
1773                 pupLocation(loc,locMgr,pmem);
1774                 
1775                 locMgr->setDuringMigration(CmiTrue);
1776                 delete rec;
1777                 locMgr->setDuringMigration(CmiFalse);
1778                 locMgr->inform(idx,targetPE);
1779
1780                 // sending the object
1781                 CmiSetHandler(msg,_sendBackLocationHandlerIdx);
1782                 CmiSyncSendAndFree(targetPE,totalSize,msg);
1783
1784                 // freeing memory
1785                 delete loc;
1786
1787                 CmiAssert(locMgr->lastKnown(idx) == targetPE);
1788                 
1789         }
1790
1791         // cleaning up all data structures
1792         CpvAccess(_immigrantRecObjs)->removeAll();
1793         CpvAccess(_numImmigrantRecObjs) = 0;
1794
1795 }
1796
1797 /**
1798  * Restores objects after parallel recovery, either by sending back the immigrant objects or 
1799  * by waiting for all emigrant objects to be back.
1800  */
1801 void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
1802         resumeLbFnPtr = _fnPtr;
1803         centralLb = _centralLb;
1804
1805         // sending back the immigrant recovering objects
1806         if(CpvAccess(_numImmigrantRecObjs) > 0){
1807                 sendBackImmigrantRecObjs();     
1808         }
1809
1810         // checking whether it needs to wait for emigrant recovery objects
1811         if(CpvAccess(_numEmigrantRecObjs) > 0)
1812                 return;
1813
1814         // otherwise, load balancing process is finished
1815         (*resumeLbFnPtr)(centralLb);
1816 }
1817
1818 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
1819         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
1820         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
1821
1822         resumeLbFnPtr = _fnPtr;
1823         centralLb = _centralLb;
1824         migrationDoneCalled = 1;
1825         if(countLBToMigrate == countLBMigratedAway){
1826                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
1827                 startMlogCheckpoint(NULL,CmiWallTimer());
1828         }
1829 };
1830
1831 void finishedCheckpointLoadBalancing(){
1832         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
1833         CheckpointBarrierMsg msg;
1834         msg.fromPE = CmiMyPe();
1835         msg.checkpointCount = checkpointCount;
1836
1837         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
1838         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
1839         
1840 };
1841
1842 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
1843         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
1844         migratedNoticeList.push_back(msg->migRecord);
1845
1846         MigrationNoticeAck buf;
1847         buf.record = msg->record;
1848         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
1849         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
1850 }
1851
1852 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
1853         
1854         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
1855         retainedObject->acked = 1;
1856
1857         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
1858         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
1859
1860         //inform home about the new location of this object
1861         CkGroupID gID = retainedObject->migRecord.gID ;
1862         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1863         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
1864         
1865         countLBMigratedAway++;
1866         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
1867                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
1868                 startMlogCheckpoint(NULL,CmiWallTimer());
1869         }
1870 };
1871
1872 void _receiveMlogLocationHandler(void *buf){
1873         envelope *env = (envelope *)buf;
1874         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
1875         CkUnpackMessage(&env);
1876         void *_msg = EnvToUsr(env);
1877         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
1878         CkGroupID gID= msg->gid;
1879         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
1880         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1881         CpvAccess(_currentObj)=mgr;
1882         mgr->immigrate(msg);
1883 };
1884
1885 /**
1886  * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
1887  */
1888 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
1889         if(checkpointBarrierCount == CmiNumPes()){
1890                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
1891                 for(int i=0;i<CmiNumPes();i++){
1892                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
1893                 }
1894         }
1895 }
1896
1897 /**
1898  * @brief Processor 0 receives a contribution from every other processor after checkpoint.
1899  */ 
1900 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
1901         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
1902         if(msg->checkpointCount == checkpointCount){
1903                 checkpointBarrierCount++;
1904                 checkAndSendCheckpointBarrierAcks(msg);
1905         }else{
1906                 if(msg->checkpointCount-1 == checkpointCount){
1907                         checkpointBarrierCount++;
1908                         checkAndSendCheckpointBarrierAcks(msg);
1909                 }else{
1910                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
1911                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
1912                 }
1913         }
1914
1915         // deleting the received message
1916         CmiFree(msg);
1917 }
1918
1919 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
1920         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
1921         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
1922
1923         // resuming LB function pointer
1924         (*resumeLbFnPtr)(centralLb);
1925
1926         // deleting message
1927         CmiFree(msg);
1928 }
1929
1930 /**
1931  * @brief Function to remove all messages in the message log of a particular chare.
1932  */
1933 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
1934         int total;
1935         MlogEntry *logEntry;
1936         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1937
1938         // traversing the whole message log and removing all elements
1939         total = mlog->length();
1940         for(int i=0; i<total; i++){
1941                 logEntry = mlog->deq();
1942                 delete logEntry;
1943         }
1944
1945 }
1946
1947 /**
1948  * @brief Garbage collects the message log and other data structures.
1949  */
1950 void garbageCollectMlog(){
1951         DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
1952
1953         // removing all messages in message log for every chare
1954         forAllCharesDo(garbageCollectMlogForChare, NULL);
1955 }
1956
1957 /**
1958         method that informs an array elements home processor of its current location
1959         It is a converse method to bypass the charm++ message logging framework
1960 */
1961
1962 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
1963         double _startTime = CmiWallTimer();
1964         CurrentLocationMsg msg;
1965         msg.mgrID = locMgrID;
1966         msg.idx = idx;
1967         msg.locationPE = currentPE;
1968         msg.fromPE = CkMyPe();
1969
1970         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
1971         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
1972         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
1973         traceUserBracketEvent(37,_startTime,CmiWallTimer());
1974 }
1975
1976
1977 void _receiveLocationHandler(CurrentLocationMsg *data){
1978         double _startTime = CmiWallTimer();
1979         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
1980         if(mgr == NULL){
1981                 CmiFree(data);
1982                 return;
1983         }
1984         CkLocRec *rec = mgr->elementNrec(data->idx);
1985         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
1986         if(rec != NULL){
1987                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
1988                         if(data->fromPE == data->locationPE){
1989                                 CmiAbort("Another processor has the same object");
1990                         }
1991                 }
1992         }
1993         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
1994                 int targetPE = data->fromPE;
1995                 data->fromPE = CmiMyPe();
1996                 data->locationPE = CmiMyPe();
1997                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
1998                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
1999         }else{
2000                 mgr->inform(data->idx,data->locationPE);
2001         }
2002         CmiFree(data);
2003         traceUserBracketEvent(38,_startTime,CmiWallTimer());
2004 }
2005
2006
2007
2008 void getGlobalStep(CkGroupID gID){
2009         LBStepMsg msg;
2010         int destPE = 0;
2011         msg.lbID = gID;
2012         msg.fromPE = CmiMyPe();
2013         msg.step = -1;
2014         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
2015         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
2016 };
2017
2018 void _getGlobalStepHandler(LBStepMsg *msg){
2019         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2020         msg->step = lb->step();
2021         CmiAssert(msg->fromPE != CmiMyPe());
2022         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
2023         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
2024         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
2025 };
2026
2027 /**
2028  * @brief Receives the global step handler from PE 0
2029  */
2030 void _recvGlobalStepHandler(LBStepMsg *msg){
2031         
2032         // updating restart decision number
2033         restartDecisionNumber = msg->step;
2034         CmiFree(msg);
2035
2036         CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
2037
2038         // continuing with restart process; send out the request to resend logged messages to all other processors
2039         CkVec<CkObjID> objectVec;
2040         forAllCharesDo(createObjIDList, (void *)&objectVec);
2041         int numberObjects = objectVec.size();
2042         
2043         //      resendMsg layout: |ResendRequest|Array of CkObjID|
2044         int totalSize = sizeof(ResendRequest) + numberObjects * sizeof(CkObjID);
2045         char *resendMsg = (char *)CmiAlloc(totalSize);  
2046
2047         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2048         resendReq->PE = CkMyPe(); 
2049         resendReq->numberObjects = numberObjects;
2050         char *objList = &resendMsg[sizeof(ResendRequest)];
2051         memcpy(objList,objectVec.getVec(),numberObjects * sizeof(CkObjID));     
2052
2053         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2054         CpvAccess(_currentObj) = lb;
2055         lb->ReceiveDummyMigration(restartDecisionNumber);
2056
2057         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2058         for(int i=0;i<CkNumPes();i++){
2059                 if(i != CkMyPe()){
2060                         CmiSyncSend(i,totalSize,resendMsg);
2061                 }
2062         }
2063         _resendMessagesHandler(resendMsg);
2064         CmiFree(resendMsg);
2065
2066         /* test for parallel restart migrate away object**/
2067         if(fastRecovery){
2068                 distributeRestartedObjects();
2069                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2070         }
2071
2072 };
2073
2074 /**
2075  * @brief Function to wrap up performance information.
2076  */
2077 void _messageLoggingExit(){
2078         
2079         // printing the signature for causal message logging
2080         if(CkMyPe() == 0)
2081                 printf("[%d] FastMessageLoggingExit \n",CmiMyPe());
2082
2083 #if COLLECT_STATS_MSGS
2084 #if COLLECT_STATS_MSGS_TOTAL
2085         printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
2086         printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
2087 #else
2088         printf("[%d] TARGETS: ",CmiMyPe());
2089         for(int i=0; i<CmiNumPes(); i++){
2090 #if COLLECT_STATS_MSG_COUNT
2091                 printf("%d ",numMsgsTarget[i]);
2092 #else
2093                 printf("%d ",sizeMsgsTarget[i]);
2094 #endif
2095         }
2096         printf("\n");
2097 #endif
2098 #endif
2099 }
2100
2101 void MlogEntry::pup(PUP::er &p){
2102         p | destPE;
2103         p | _infoIdx;
2104         int size;
2105         if(!p.isUnpacking()){
2106 /*              CkAssert(env);
2107                 if(!env->isPacked()){
2108                         CkPackMessage(&env);
2109                 }*/
2110                 if(env == NULL){
2111                         //message was probably local and has been removed from logs
2112                         size = 0;
2113                 }else{
2114                         size = env->getTotalsize();
2115                 }       
2116         }
2117         p | size;
2118         if(p.isUnpacking()){
2119                 if(size > 0){
2120                         env = (envelope *)_allocEnv(ForChareMsg,size);
2121                 }else{
2122                         env = NULL;
2123                 }
2124         }
2125         if(size > 0){
2126                 p((char *)env,size);
2127         }
2128 };
2129
2130
2131 /**********************************
2132         * The methods of the message logging
2133         * data structure stored in each chare
2134         ********************************/
2135
2136 MCount ChareMlogData::nextSN(const CkObjID &recver){
2137         MCount *SN = ssnTable.getPointer(recver);
2138         if(SN==NULL){
2139                 ssnTable.put(recver) = 1;
2140                 return 1;
2141         }else{
2142                 (*SN)++;
2143                 return *SN;
2144         }
2145 };
2146  
2147 /**
2148  * Adds an entry into the message log.
2149  */
2150 void ChareMlogData::addLogEntry(MlogEntry *entry){
2151         DEBUG(char nameString[100]);
2152         DEBUG(printf("[%d] Adding logEntry %p to the log of %s with SN %d\n",CkMyPe(),entry,objID.toString(nameString),entry->env->SN));
2153         DEBUG_MEM(CmiMemoryCheck());
2154
2155         // enqueuing the entry in the message log
2156         mlog.enq(entry);
2157 };
2158
2159 /**
2160  * Checks whether a ssn has been already received. The collateral effect is the ssn get added to the list.
2161  */
2162 int ChareMlogData::checkAndStoreSsn(const CkObjID &sender, MCount ssn){
2163         RSSN *rssn;
2164         rssn = receivedSsnTable.get(sender);
2165         if(rssn == NULL){
2166                 rssn = new RSSN();
2167                 receivedSsnTable.put(sender) = rssn;
2168         }
2169         return rssn->checkAndStore(ssn);
2170 }
2171
2172 /**
2173  * Pup method for the metadata.
2174  * We are preventing the whole message log to be stored (as proposed by Sayantan for dealing with multiple failures).
2175  * Then, we only support one failure at a time. Read Sayantan's thesis, sections 4.2 and 4.3 for more details.
2176  */
2177 void ChareMlogData::pup(PUP::er &p){
2178         int startSize=0;
2179         char nameStr[100];
2180         if(p.isSizing()){
2181                 PUP::sizer *sizep = (PUP::sizer *)&p;
2182                 startSize = sizep->size();
2183         }
2184         p | objID;
2185         if(p.isUnpacking()){
2186                 DEBUG(CmiPrintf("[%d] Obj %s being unpacked with tCount %d tProcessed %d \n",CmiMyPe(),objID.toString(nameStr),tCount,tProcessed));
2187         }
2188         p | toResumeOrNot;
2189         p | resumeCount;
2190         DEBUG(CmiPrintf("[%d] Obj %s toResumeOrNot %d resumeCount %d \n",CmiMyPe(),objID.toString(nameStr),toResumeOrNot,resumeCount));
2191         
2192         ssnTable.pup(p);
2193         
2194         // pupping receivedSsnTable
2195         int rssnTableSize;
2196         if(!p.isUnpacking()){
2197                 rssnTableSize = receivedSsnTable.numObjects();
2198         }
2199         p | rssnTableSize;
2200         if(!p.isUnpacking()){
2201                 CkHashtableIterator *iter = receivedSsnTable.iterator();
2202                 while(iter->hasNext()){
2203                         CkObjID *objID;
2204                         RSSN **row = (RSSN **)iter->next((void **)&objID);
2205                         p | (*objID);
2206                         (*row)->pup(p);
2207                 }
2208                 delete iter;
2209         }else{
2210                 for(int i=0; i<rssnTableSize; i++){
2211                         CkObjID objID;
2212                         p | objID;
2213                         RSSN *row = new RSSN;
2214                         row->pup(p);
2215                         receivedSsnTable.put(objID) = row;
2216                 }
2217         }
2218         
2219         p | resendReplyRecvd;
2220         p | restartFlag;
2221
2222         if(p.isSizing()){
2223                 PUP::sizer *sizep = (PUP::sizer *)&p;
2224                 int pupSize = sizep->size()-startSize;
2225                 DEBUG(char name[40]);
2226                 DEBUG(CkPrintf("[%d]PUP::sizer of %s shows size %d\n",CkMyPe(),objID.toString(name),pupSize));
2227         //      CkAssert(pupSize <100000000);
2228         }
2229         
2230         double _finTime = CkWallTimer();
2231         DEBUG(CkPrintf("[%d] Pup took %.6lf\n",CkMyPe(),_finTime - _startTime));
2232 };
2233
2234 /****************
2235 *****************/
2236
2237 /**
2238  * Getting the pe number of the current processor's buddy.
2239  * In the team-based approach each processor might checkpoint in the next team, but currently
2240  * teams are only meant to reduce memory overhead.
2241  * Note: function getReverseCheckPointPE performs the reverse map. It must be changed accordingly.
2242  */
2243 int getCheckPointPE(){
2244         return (CmiMyPe() + 1) % CmiNumPes();
2245 }
2246
2247 /**
2248  * Getting the pe that checkpoints on this pe.
2249  */
2250 int getReverseCheckPointPE(){
2251         return (CmiMyPe() - 1 + CmiNumPes()) % CmiNumPes();
2252 }
2253
2254 //assume it is a packed envelope
2255 envelope *copyEnvelope(envelope *env){
2256         envelope *newEnv = (envelope *)CmiAlloc(env->getTotalsize());
2257         memcpy(newEnv,env,env->getTotalsize());
2258         return newEnv;
2259 }
2260
2261 #endif