Changes in message logging protocol to make it safer. Determinants are all received...
[charm.git] / src / ck-core / ckcausalmlog.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_CAUSAL_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_TEAM 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x)  //x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
41 #define DEBUG_RECOVERY(x) //x
42
43 extern const char *idx2str(const CkArrayIndex &ind);
44 extern const char *idx2str(const ArrayElement *el);
45
46 void getGlobalStep(CkGroupID gID);
47
48 bool fault_aware(CkObjID &recver);
49 void sendCheckpointData(int mode);
50 void createObjIDList(void *data,ChareMlogData *mlogData);
51 inline bool isLocal(int destPE);
52 inline bool isTeamLocal(int destPE);
53 void printLog(TProcessedLog *log);
54
55 int _restartFlag=0;
56 int _numRestartResponses=0;
57
58 //TODO: remove for perf runs
59 int countHashRefs=0; //count the number of gets
60 int countHashCollisions=0;
61
62 char *checkpointDirectory=".";
63 int unAckedCheckpoint=0;
64
65 int countLocal=0,countBuffered=0;
66 int countPiggy=0;
67 int countClearBufferedLocalCalls=0;
68
69 int countUpdateHomeAcks=0;
70
71 extern int teamSize;
72 extern int chkptPeriod;
73 extern bool fastRecovery;
74 extern int parallelRecovery;
75
76 char *killFile;
77 char *faultFile;
78 int killFlag=0;
79 int faultFlag=0;
80 int restartingMlogFlag=0;
81 void readKillFile();
82 double killTime=0.0;
83 double faultMean;
84 int checkpointCount=0;
85
86 CpvDeclare(Chare *,_currentObj);
87 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
88 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
89 CpvDeclare(Queue, _outOfOrderMessageQueue);
90 CpvDeclare(Queue, _delayedRemoteMessageQueue);
91 CpvDeclare(char **,_bufferedTicketRequests);
92 CpvDeclare(int *,_numBufferedTicketRequests);
93
94 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
95 /** @note All the determinants generated by a PE are stored in variable _localDets.
96  * As soon as a message is sent, then all the determinants are appended to the message,
97  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
98  * to delete the determinants. In the meantime the same determinants may be appended
99  * to other messages and more determinants can be added to _localDets.
100  * A simple solution to this problem was to have a primitive array and keep adding determinants
101  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
102  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
103  * determinant and a number of how many valid determinants there are behind it. We do not remove
104  * determinants until a checkpoint is made, since these determinants may have to be added to
105  * messages in case of a recovery.
106  */
107 // temporal storage for the outgoing determinants
108 CpvDeclare(char *, _localDets);
109 // number of buffered determinants
110 int _numBufferedDets;
111 // current index of first determinant in _localDets
112 int _indexBufferedDets;
113 // current phase of determinants
114 int _phaseBufferedDets;
115
116 // stores the determinants from other nodes, to send them in case of a crash
117 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
118 // max number of buffered determinants
119 int _maxBufferedDets;
120
121 // stores the incarnation number from every other processor
122 CpvDeclare(char *, _incarnation);
123
124 /***** *****/
125
126 #if COLLECT_STATS_MSGS
127 int *numMsgsTarget;
128 int *sizeMsgsTarget;
129 int totalMsgsTarget;
130 float totalMsgsSize;
131 #endif
132 #if COLLECT_STATS_DETS
133 int numPiggyDets;
134 int numDets;
135 int numDupDets;
136 #endif
137 #if COLLECT_STATS_MEMORY
138 int msgLogSize;
139 int bufferedDetsSize;
140 int storedDetsSize;
141 #endif
142 //TML: variables for measuring savings with teams in message logging
143 #if COLLECT_STATS_TEAM
144 float MLOGFT_totalLogSize = 0.0;
145 float MLOGFT_totalMessages = 0.0;
146 #endif
147
148 static double adjustChkptPeriod=0.0; //in ms
149 static double nextCheckpointTime=0.0;//in seconds
150 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
151
152 int _pingHandlerIdx;
153
154 char objString[100];
155 int _checkpointRequestHandlerIdx;
156 int _storeCheckpointHandlerIdx;
157 int _checkpointAckHandlerIdx;
158 int _getCheckpointHandlerIdx;
159 int _recvCheckpointHandlerIdx;
160 int _removeProcessedLogHandlerIdx;
161
162 int _verifyAckRequestHandlerIdx;
163 int _verifyAckHandlerIdx;
164 int _dummyMigrationHandlerIdx;
165
166
167 int     _getGlobalStepHandlerIdx;
168 int     _recvGlobalStepHandlerIdx;
169
170 int _updateHomeRequestHandlerIdx;
171 int _updateHomeAckHandlerIdx;
172 int _resendMessagesHandlerIdx;
173 int _sendDetsHandlerIdx;
174 int _sendDetsReplyHandlerIdx;
175 int _receivedTNDataHandlerIdx;
176 int _receivedDetDataHandlerIdx;
177 int _distributedLocationHandlerIdx;
178 int _storeDeterminantsHandlerIdx;
179 int _removeDeterminantsHandlerIdx;
180
181 //TML: integer constants for team-based message logging
182 int _restartHandlerIdx;
183 int _getRestartCheckpointHandlerIdx;
184 int _recvRestartCheckpointHandlerIdx;
185 void setTeamRecovery(void *data, ChareMlogData *mlogData);
186 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
187
188 int verifyAckTotal;
189 int verifyAckCount;
190
191 int verifyAckedRequests=0;
192
193 RestartRequest *storedRequest;
194
195 int _falseRestart =0; /**
196                                                                                                         For testing on clusters we might carry out restarts on 
197                                                                                                         a porcessor without actually starting it
198                                                                                                         1 -> false restart
199                                                                                                         0 -> restart after an actual crash
200                                                                                                 */              
201
202 //Load balancing globals
203 int onGoingLoadBalancing=0;
204 void *centralLb;
205 void (*resumeLbFnPtr)(void *);
206 int _receiveMlogLocationHandlerIdx;
207 int _receiveMigrationNoticeHandlerIdx;
208 int _receiveMigrationNoticeAckHandlerIdx;
209 int _checkpointBarrierHandlerIdx;
210 int _checkpointBarrierAckHandlerIdx;
211
212 CkVec<MigrationRecord> migratedNoticeList;
213 CkVec<RetainedMigratedObject *> retainedObjectList;
214 int donotCountMigration=0;
215 int countLBMigratedAway=0;
216 int countLBToMigrate=0;
217 int migrationDoneCalled=0;
218 int checkpointBarrierCount=0;
219 int globalResumeCount=0;
220 CkGroupID globalLBID;
221 int restartDecisionNumber=-1;
222
223 double lastCompletedAlarm=0;
224 double lastRestart=0;
225
226 //update location globals
227 int _receiveLocationHandlerIdx;
228
229 /** 
230  * @brief Initialize message logging data structures and register handlers
231  */
232 void _messageLoggingInit(){
233         //current object
234         CpvInitialize(Chare *,_currentObj);
235         
236         //registering handlers for message logging
237         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
238                 
239         //handlers for checkpointing
240         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
241         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
242         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
243         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
244
245         //handlers for restart
246         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
247         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
248         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
249         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
250         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
251         _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
252         _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
253         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
254         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
255         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
256         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
257         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
258         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
259
260         //TML: handlers for team-based message logging
261         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
262         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
263         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
264
265         // handlers for causal message logging
266         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
267         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
268         
269         //handlers for load balancing
270         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
271         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
272         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
273         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
274         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
275         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
276         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
277         
278         //handlers for updating locations
279         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
280         
281         //Cpv variables for message logging
282         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
283         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
284         CpvInitialize(Queue, _outOfOrderMessageQueue);
285         CpvInitialize(Queue, _delayedRemoteMessageQueue);
286         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
287         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
288         
289         CpvInitialize(char **,_bufferedTicketRequests);
290         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
291         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
292         for(int i=0;i<CkNumPes();i++){
293                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
294                 CpvAccess(_numBufferedTicketRequests)[i]=0;
295         }
296  
297         // Cpv variables for causal protocol
298         _numBufferedDets = 0;
299         _indexBufferedDets = 0;
300         _phaseBufferedDets = 0;
301         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
302         CpvInitialize(char *, _localDets);
303         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
304         CpvInitialize(char *, _incarnation);
305         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
306         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
307         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
308         for(int i=0; i<CmiNumPes(); i++){
309                 CpvAccess(_incarnation)[i] = 0;
310         }
311         
312         //Cpv variables for checkpoint
313         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
314         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
315
316         // registering user events for projections      
317         traceRegisterUserEvent("Remove Logs", 20);
318         traceRegisterUserEvent("Ticket Request Handler", 21);
319         traceRegisterUserEvent("Ticket Handler", 22);
320         traceRegisterUserEvent("Local Message Copy Handler", 23);
321         traceRegisterUserEvent("Local Message Ack Handler", 24);        
322         traceRegisterUserEvent("Preprocess current message",25);
323         traceRegisterUserEvent("Preprocess past message",26);
324         traceRegisterUserEvent("Preprocess future message",27);
325         traceRegisterUserEvent("Checkpoint",28);
326         traceRegisterUserEvent("Checkpoint Store",29);
327         traceRegisterUserEvent("Checkpoint Ack",30);
328         traceRegisterUserEvent("Send Ticket Request",31);
329         traceRegisterUserEvent("Generalticketrequest1",32);
330         traceRegisterUserEvent("TicketLogLocal",33);
331         traceRegisterUserEvent("next_ticket and SN",34);
332         traceRegisterUserEvent("Timeout for buffered remote messages",35);
333         traceRegisterUserEvent("Timeout for buffered local messages",36);
334         traceRegisterUserEvent("Inform Location Home",37);
335         traceRegisterUserEvent("Receive Location Handler",38);
336         
337         lastCompletedAlarm=CmiWallTimer();
338         lastRestart = CmiWallTimer();
339
340 #if COLLECT_STATS_MSGS
341 #if COLLECT_STATS_MSGS_TOTAL
342         totalMsgsTarget = 0;
343         totalMsgsSize = 0.0;
344 #else
345         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
346         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
347         for(int i=0; i<CmiNumPes(); i++){
348                 numMsgsTarget[i] = 0;
349                 sizeMsgsTarget[i] = 0;
350         }
351 #endif
352 #endif
353 #if COLLECT_STATS_DETS
354         numPiggyDets = 0;
355         numDets = 0;
356         numDupDets = 0;
357 #endif
358 #if COLLECT_STATS_MEMORY
359         msgLogSize = 0;
360         bufferedDetsSize = 0;
361         storedDetsSize = 0;
362 #endif
363
364
365 }
366
367 void killLocal(void *_dummy,double curWallTime);        
368
369 void readKillFile(){
370         FILE *fp=fopen(killFile,"r");
371         if(!fp){
372                 return;
373         }
374         int proc;
375         double sec;
376         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
377                 if(proc == CkMyPe()){
378                         killTime = CmiWallTimer()+sec;
379                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
380                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
381                 }
382         }
383         fclose(fp);
384 }
385
386 /**
387  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
388  * We assume an exponential distribution for the mean-time-between-failures.
389  */
390 void readFaultFile(){
391         FILE *fp=fopen(faultFile,"r");
392         if(!fp){
393                 return;
394         }
395         int proc;
396         double sec;
397         fscanf(fp,"%d %lf",&proc,&sec);
398         faultMean = sec;
399         if(proc == CkMyPe()){
400                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
401                 CcdCallFnAfter(killLocal,NULL,sec*1000);
402         }
403         fclose(fp);
404 }
405
406 void killLocal(void *_dummy,double curWallTime){
407         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
408         if(CmiWallTimer()<killTime-1){
409                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
410         }else{  
411                 kill(getpid(),SIGKILL);
412         }
413 }
414
415 /*** Auxiliary Functions ***/
416
417 /**
418  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
419  * determinants needs to be extended.
420  */
421 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
422         Determinant *det, *auxDet;
423         char *aux;
424
425         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
426
427         // checking if we are overflowing the buffered determinants array
428         if(_indexBufferedDets >= _maxBufferedDets){
429                 aux = CpvAccess(_localDets);
430                 _maxBufferedDets *= 2;
431                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
432                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
433                 CmiFree(aux);
434         }
435
436         // adding the new determinant at the end
437         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
438         det->sender = sender;
439         det->receiver = receiver;
440         det->SN = SN;
441         det->TN = TN;
442         _numBufferedDets++;
443         _indexBufferedDets++;
444
445 #if COLLECT_STATS_MEMORY
446         bufferedDetsSize++;
447 #endif
448 #if COLLECT_STATS_DETS
449         numDets++;
450 #endif
451 }
452
453 /************************ Message logging methods ****************/
454
455 /**
456  * Sends a group message that might be a broadcast.
457  */
458 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
459         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
460                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
461                 void *origMsg = EnvToUsr(env);
462                 for(int i=0;i<CmiNumPes();i++){
463                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
464                                 void *copyMsg = CkCopyMsg(&origMsg);
465                                 envelope *copyEnv = UsrToEnv(copyMsg);
466                                 copyEnv->SN=0;
467                                 copyEnv->TN=0;
468                                 copyEnv->sender.type = TypeInvalid;
469                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
470                                 sendGroupMsg(copyEnv,i,_infoIdx);
471                         }
472                 }
473                 return;
474         }
475
476         // initializing values of envelope
477         env->SN=0;
478         env->TN=0;
479         env->sender.type = TypeInvalid;
480
481         CkObjID recver;
482         recver.type = TypeGroup;
483         recver.data.group.id = env->getGroupNum();
484         recver.data.group.onPE = destPE;
485         sendCommonMsg(recver,env,destPE,_infoIdx);
486 }
487
488 /**
489  * Sends a nodegroup message that might be a broadcast.
490  */
491 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
492         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
493                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
494                 void *origMsg = EnvToUsr(env);
495                 for(int i=0;i<CmiNumNodes();i++){
496                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
497                                 void *copyMsg = CkCopyMsg(&origMsg);
498                                 envelope *copyEnv = UsrToEnv(copyMsg);
499                                 copyEnv->SN=0;
500                                 copyEnv->TN=0;
501                                 copyEnv->sender.type = TypeInvalid;
502                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
503                         }
504                 }
505                 return;
506         }
507
508         // initializing values of envelope
509         env->SN=0;
510         env->TN=0;
511         env->sender.type = TypeInvalid;
512
513         CkObjID recver;
514         recver.type = TypeNodeGroup;
515         recver.data.group.id = env->getGroupNum();
516         recver.data.group.onPE = destNode;
517         sendCommonMsg(recver,env,destNode,_infoIdx);
518 }
519
520 /**
521  * Sends a message to an array element.
522  */
523 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
524         CkObjID recver;
525         recver.type = TypeArray;
526         recver.data.array.id = env->getsetArrayMgr();
527         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
528
529         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
530                 char recverString[100],senderString[100];
531                 
532                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
533         }
534
535         // initializing values of envelope
536         env->SN = 0;
537         env->TN = 0;
538
539         sendCommonMsg(recver,env,destPE,_infoIdx);
540 };
541
542 /**
543  * Sends a message to a singleton chare.
544  */
545 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
546         CkObjID recver;
547         recver.type = TypeChare;
548         recver.data.chare.id = *pCid;
549
550         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
551                 char recverString[100],senderString[100];
552                 
553                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
554         }
555
556         // initializing values of envelope
557         env->SN = 0;
558         env->TN = 0;
559
560         sendCommonMsg(recver,env,destPE,_infoIdx);
561 };
562
563 /**
564  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
565  */
566 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
567         envelope *env = _env;
568         MCount ticketNumber = 0;
569         int resend=0; //is it a resend
570         char recverName[100];
571         double _startTime=CkWallTimer();
572         
573         DEBUG_MEM(CmiMemoryCheck());
574
575         if(CpvAccess(_currentObj) == NULL){
576 //              CkAssert(0);
577                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
578                 generalCldEnqueue(destPE,env,_infoIdx);
579                 return;
580         }
581         
582         // setting message logging data in the envelope
583         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
584         if(env->sender.type == TypeInvalid){
585                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
586         }else{
587                 envelope *copyEnv = copyEnvelope(env);
588                 env = copyEnv;
589                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
590                 env->SN = 0;
591         }
592         
593         DEBUG_MEM(CmiMemoryCheck());
594
595         CkObjID &sender = env->sender;
596         env->recver = recver;
597
598         Chare *obj = (Chare *)env->sender.getObject();
599           
600         if(env->SN == 0){
601                 DEBUG_MEM(CmiMemoryCheck());
602                 env->SN = obj->mlogData->nextSN(recver);
603         }else{
604                 resend = 1;
605         }
606         
607         char senderString[100];
608 //      if(env->SN != 1){
609                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
610         //      CmiPrintStackTrace(0);
611 /*      }else{
612                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
613         }*/
614                 
615         MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
616 //      CkPackMessage(&(mEntry->env));
617 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
618         
619         _startTime = CkWallTimer();
620
621         // uses the proper ticketing mechanism for local, team and general messages
622         if(isLocal(destPE)){
623                 sendLocalMsg(mEntry);
624         }else{
625                 if((teamSize > 1) && isTeamLocal(destPE)){
626
627                         // look to see if this message already has a ticket in the team-table
628                         Chare *senderObj = (Chare *)sender.getObject();
629                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
630                         if(ticketRow != NULL){
631                                 Ticket ticket = ticketRow->get(env->SN);
632                                 if(ticket.TN != 0){
633                                         ticketNumber = ticket.TN;
634                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
635                                 }
636                         }
637                 }
638                 
639                 // sending the message
640                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
641                 
642         }
643 }
644
645 /**
646  * Determines if the message is local or not. A message is local if:
647  * 1) Both the destination and origin are the same PE.
648  */
649 inline bool isLocal(int destPE){
650         // both the destination and the origin are the same PE
651         if(destPE == CkMyPe())
652                 return true;
653
654         return false;
655 }
656
657 /**
658  * Determines if the message is group local or not. A message is group local if:
659  * 1) They belong to the same group in the group-based message logging.
660  */
661 inline bool isTeamLocal(int destPE){
662
663         // they belong to the same group
664         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
665                 return true;
666
667         return false;
668 }
669
670 /**
671  * Method that does the actual send by creating a ticket request filling it up and sending it.
672  */
673 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
674         DEBUG_NOW(char recverString[100]);
675         DEBUG_NOW(char senderString[100]);
676
677         int totalSize;
678         StoreDeterminantsHeader header;
679         int sizes[2];
680         char *ptrs[2];
681
682         envelope *env = entry->env;
683         DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
684
685         // setting all the information
686         Chare *obj = (Chare *)entry->env->sender.getObject();
687         entry->env->recver = recver;
688         entry->env->SN = SN;
689         entry->env->TN = TN;
690         if(!resend){
691                 //TML: only stores message if either it goes to this processor or to a processor in a different group
692                 if(!isTeamLocal(entry->destPE)){
693                         obj->mlogData->addLogEntry(entry);
694 #if COLLECT_STATS_TEAM
695                         MLOGFT_totalMessages += 1.0;
696                         MLOGFT_totalLogSize += entry->env->getTotalsize();
697 #endif
698                 }else{
699                         // the message has to be deleted after it has been sent
700                         entry->env->freeMsg = true;
701                 }
702         }
703
704         // sending the determinants that causally affect this message
705         if(_numBufferedDets > 0){
706
707                 // modifying the actual number of determinants sent in message
708                 header.number = _numBufferedDets;
709                 header.index = _indexBufferedDets;
710                 header.phase = _phaseBufferedDets;
711                 header.PE = CmiMyPe();
712         
713                 // sending the message
714                 sizes[0] = sizeof(StoreDeterminantsHeader);
715                 sizes[1] = _numBufferedDets * sizeof(Determinant);
716                 ptrs[0] = (char *) &header;
717                 ptrs[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
718                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
719                 CmiSetHandler(&header, _storeDeterminantsHandlerIdx);
720                 CmiSyncVectorSend(destPE, 2, &sizes[0], &ptrs[0]);
721         }
722
723         // updating its message log entry
724         entry->indexBufDets = _indexBufferedDets;
725         entry->numBufDets = _numBufferedDets;
726
727         // sending the message
728         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
729
730         DEBUG_MEM(CmiMemoryCheck());
731 #if COLLECT_STATS_MSGS
732 #if COLLECT_STATS_MSGS_TOTAL
733         totalMsgsTarget++;
734         totalMsgsSize += (float)env->getTotalsize();
735 #else
736         numMsgsTarget[destPE]++;
737         sizeMsgsTarget[destPE] += env->getTotalsize();
738 #endif
739 #endif
740 #if COLLECT_STATS_DETS
741         numPiggyDets += _numBufferedDets;
742 #endif
743 #if COLLECT_STATS_MEMORY
744         msgLogSize += env->getTotalsize();
745 #endif
746 };
747
748
749 /**
750  * @brief Function to send a local message. It first gets a ticket and
751  * then enqueues the message. If we are recovering, then the message 
752  * is enqueued in a delay queue.
753  */
754 void sendLocalMsg(MlogEntry *entry){
755         DEBUG_PERF(double _startTime=CkWallTimer());
756         DEBUG_MEM(CmiMemoryCheck());
757         DEBUG(Chare *senderObj = (Chare *)entry->env->sender.getObject();)
758         DEBUG(char senderString[100]);
759         DEBUG(char recverString[100]);
760         Ticket ticket;
761
762         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
763
764         // getting the receiver local object
765         Chare *recverObj = (Chare *)entry->env->recver.getObject();
766
767         // if receiver object is not NULL, we will ask it for a ticket
768         if(recverObj){
769
770 /*HERE          // if we are recovery, then we must put this off for later
771                 if(recverObj->mlogData->restartFlag){
772                         CpvAccess(_delayedLocalMsgs)->enq(entry);
773                         return;
774                 }
775
776                 // check if this combination of sender and SN has been already a ticket
777                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
778                         
779                 // assigned a new ticket in case this message is completely new
780                 if(ticket.TN == 0){
781                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
782
783                         // if TN == 0 we enqueue this message since we are recovering from a crash
784                         if(ticket.TN == 0){
785                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
786                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
787                                 
788 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
789                                 return;
790                         }
791                 }
792
793                 //TODO: check for the case when an invalid ticket is returned
794                 //TODO: check for OLD or RECEIVED TICKETS
795                 entry->env->TN = ticket.TN;
796                 CkAssert(entry->env->TN > 0);
797                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
798         
799                 DEBUG_MEM(CmiMemoryCheck());
800
801                 // adding the determinant to _localDets
802                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
803 */
804                 // sends the local message
805                 _skipCldEnqueue(CmiMyPe(),entry->env,entry->_infoIdx);  
806
807                 DEBUG_MEM(CmiMemoryCheck());
808         }else{
809                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
810         }
811 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
812 };
813
814 /****
815         The handler functions
816 *****/
817
818 /*** CAUSAL PROTOCOL HANDLERS ***/
819
820 /**
821  * @brief Removes the determinants after a particular index in the _localDets array.
822  */
823 void _removeDeterminantsHandler(char *buffer){
824         RemoveDeterminantsHeader *header;
825         int index, phase;
826
827         // getting the header from the message
828         header = (RemoveDeterminantsHeader *)buffer;
829         index = header->index;
830         phase = header->phase;
831
832         // fprintf(stderr,"ACK index=%d\n",index);
833
834         // updating the number of buffered determinants
835         if(phase == _phaseBufferedDets){
836                 if(index > _indexBufferedDets)
837                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
838                 CmiAssert(index <= _indexBufferedDets);
839                 _numBufferedDets = _indexBufferedDets - index;
840         }
841
842         // releasing memory
843         CmiFree(buffer);
844
845 }
846
847 /**
848  * @brief Stores the determinants coming from other processor.
849  */
850 void _storeDeterminantsHandler(char *buffer){
851         StoreDeterminantsHeader *header;
852         RemoveDeterminantsHeader removeHeader;
853         Determinant *detPtr, *det;
854         int i, n, index, phase, destPE;
855         CkVec<Determinant> *vec;
856
857
858         // getting the header from the message and pointing to the first determinant
859         header = (StoreDeterminantsHeader *)buffer;
860         n = header->number;
861         index = header->index;
862         phase = header->phase;
863         destPE = header->PE;
864         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
865
866         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
867         DEBUG_MEM(CmiMemoryCheck());
868
869         // going through all determinants and storing them into _remoteDets
870         for(i = 0; i < n; i++){
871                 det = new Determinant();
872                 det->sender = detPtr->sender;
873                 det->receiver = detPtr->receiver;
874                 det->SN = detPtr->SN;
875                 det->TN = detPtr->TN;
876
877                 // getting the determinant array
878                 vec = CpvAccess(_remoteDets)->get(det->receiver);
879                 if(vec == NULL){
880                         vec = new CkVec<Determinant>();
881                         CpvAccess(_remoteDets)->put(det->receiver) = vec;
882                 }
883 #if COLLECT_STATS_DETS
884 #if COLLECT_STATS_DETS_DUP
885                 for(int j=0; j<vec->size(); j++){
886                         if(isSameDet(&(*vec)[j],det)){
887                                 numDupDets++;
888                                 break;
889                         }
890                 }
891 #endif
892 #endif
893 #if COLLECT_STATS_MEMORY
894                 storedDetsSize++;
895 #endif
896                 vec->push_back(*det);
897                 detPtr = detPtr++;
898         }
899
900         DEBUG_MEM(CmiMemoryCheck());
901
902         // freeing buffer
903         CmiFree(buffer);
904
905         // sending the ACK back to the sender
906         removeHeader.index = index;
907         removeHeader.phase = phase;
908         CmiSetHandler(&removeHeader,_removeDeterminantsHandlerIdx);
909         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)&removeHeader);
910         
911 }
912
913 /**
914  *  If there are any delayed requests, process them first before 
915  *  processing this request
916  * */
917 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
918         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
919         double  _startTime = CkWallTimer();
920         CmiFree(ticketRequest);
921 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
922 }
923
924
925 /**
926  * @brief Gets a ticket for a recently received message.
927  * @pre env->recver has to be on this processor.
928  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
929  */
930 inline bool _getTicket(envelope *env, int *flag){
931         DEBUG_MEM(CmiMemoryCheck());
932         DEBUG(char recverName[100]);
933         DEBUG(char senderString[100]);
934         Ticket ticket;
935         
936         // getting information from request
937         CkObjID sender = env->sender;
938         CkObjID recver = env->recver;
939         MCount SN = env->SN;
940         MCount TN = env->TN;
941         Chare *recverObj = (Chare *)recver.getObject();
942         
943         DEBUG(recver.toString(recverName);)
944
945         // verifying whether we are recovering from a failure or not
946         if(recverObj->mlogData->restartFlag)
947                 return false;
948
949         // checking ticket table to see if this message already received a ticket
950         ticket = recverObj->mlogData->getTicket(sender, SN);
951         TN = ticket.TN;
952         
953         // checking if the message is team local and if it has a ticket already assigned
954         if(teamSize > 1 && TN != 0){
955                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
956                 recverObj->mlogData->verifyTicket(sender,SN,TN);
957         }
958
959         //check if a ticket for this has been already handed out to an object that used to be local but 
960         // is no longer so.. need for parallel restart
961         if(TN == 0){
962                 ticket = recverObj->mlogData->next_ticket(sender,SN);
963                 *flag = NEW_TICKET;
964         } else {
965                 *flag = OLD_TICKET;
966         }
967         if(ticket.TN > recverObj->mlogData->tProcessed){
968                 ticket.state = NEW_TICKET;
969         } else {
970                 ticket.state = OLD_TICKET;
971         }
972         // @todo: check for the case when an invalid ticket is returned
973         if(ticket.TN == 0){
974                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
975                 return false;
976         }
977                 
978         // setting the ticket number in the envelope
979         env->TN = ticket.TN;
980         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
981         return true;
982
983 };
984
985 bool fault_aware(CkObjID &recver){
986         switch(recver.type){
987                 case TypeChare:
988                         return true;
989                 case TypeMainChare:
990                         return false;
991                 case TypeGroup:
992                 case TypeNodeGroup:
993                 case TypeArray:
994                         return true;
995                 default:
996                         return false;
997         }
998 };
999
1000 /* Preprocesses a received message */
1001 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
1002         DEBUG_NOW(char recverString[100]);
1003         DEBUG_NOW(char senderString[100]);
1004         DEBUG_MEM(CmiMemoryCheck());
1005         int flag;
1006         bool ticketSuccess;
1007
1008         // getting the receiver object
1009         CkObjID recver = env->recver;
1010         if(!fault_aware(recver)){
1011                 return 1;
1012                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
1013         }
1014
1015         Chare *obj = (Chare *)recver.getObject();
1016         *objPointer = obj;
1017         if(obj == NULL){
1018                 int possiblePE = recver.guessPE();
1019                 if(possiblePE != CkMyPe()){
1020                         int totalSize = env->getTotalsize();
1021                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1022                         
1023                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
1024                 }else{
1025                         // this is the case where a message is received and the object has not been initialized
1026                         // we delayed the delivery of the message
1027                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1028                         
1029                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1030                 }
1031                 return 0;
1032         }
1033
1034         // checking if message comes from an old incarnation
1035         // message must be discarded
1036         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1037                 CmiFree(env);
1038                 return 0;
1039         }
1040
1041         DEBUG_MEM(CmiMemoryCheck());
1042         DEBUG_PE(0,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1043
1044         // getting a ticket for this message
1045         ticketSuccess = _getTicket(env,&flag);
1046
1047         // we might be during recovery when this message arrives
1048         if(!ticketSuccess){
1049                 
1050                 // adding the message to a delay queue
1051                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1052                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1053
1054                 return 0;
1055         }
1056         
1057         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1058         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1059         
1060         if(flag == NEW_TICKET){
1061                 // storing determinant of message in data structure
1062                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1063         }
1064
1065         DEBUG_MEM(CmiMemoryCheck());
1066
1067         double _startTime = CkWallTimer();
1068 //env->sender.updatePosition(env->getSrcPe());
1069         if(env->TN == obj->mlogData->tProcessed+1){
1070                 //the message that needs to be processed now
1071                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1072                 // once we find a message that we can process we put back all the messages in the out of order queue
1073                 // back into the main scheduler queue. 
1074         DEBUG_MEM(CmiMemoryCheck());
1075                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1076                         void *qMsgPtr;
1077                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1078                         envelope *qEnv = (envelope *)qMsgPtr;
1079                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1080         DEBUG_MEM(CmiMemoryCheck());
1081                 }
1082 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1083                 //TODO: this might be a problem.. change made for leanMD
1084 //              CpvAccess(_currentObj) = obj;
1085         DEBUG_MEM(CmiMemoryCheck());
1086                 return 1;
1087         }
1088
1089         // checking if message has already been processed
1090         // message must be discarded
1091         if(env->TN <= obj->mlogData->tProcessed){
1092                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1093                 
1094                 CmiFree(env);
1095                 return 0;
1096         }
1097         //message that needs to be processed in the future
1098
1099         DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1100         //the message cant be processed now put it back in the out of order message Q, 
1101         //It will be transferred to the main queue later
1102         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1103 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1104         DEBUG_MEM(CmiMemoryCheck());
1105         
1106         return 0;
1107 }
1108
1109 /**
1110  * @brief Updates a few variables once a message has been processed.
1111  */
1112 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1113         DEBUG(char senderString[100]);
1114         if(obj){
1115                 if(sender.guessPE() == CkMyPe()){
1116                         if(entry != NULL){
1117                                 entry->env = NULL;
1118                         }
1119                 }
1120                 obj->mlogData->tProcessed++;
1121 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1122                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1123 //              CpvAccess(_currentObj)= NULL;
1124         }
1125         DEBUG_MEM(CmiMemoryCheck());
1126 }
1127
1128 /***
1129         Helpers for the handlers and message logging methods
1130 ***/
1131
1132 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1133 //      double _startTime = CkWallTimer();
1134         if(env->recver.type != TypeNodeGroup){
1135         //This repeats a step performed in skipCldEnq for messages sent to
1136         //other processors. I do this here so that messages to local processors
1137         //undergo the same transformation.. It lets the resend be uniform for 
1138         //all messages
1139 //              CmiSetXHandler(env,CmiGetHandler(env));
1140                 _skipCldEnqueue(destPE,env,_infoIdx);
1141         }else{
1142                 _noCldNodeEnqueue(destPE,env);
1143         }
1144 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1145 }
1146 //extern "C" int CmiGetNonLocalLength();
1147 /** This method is used to retry the ticket requests
1148  * that had been queued up earlier
1149  * */
1150
1151 int calledRetryTicketRequest=0;
1152
1153 void _pingHandler(CkPingMsg *msg){
1154         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1155         CmiFree(msg);
1156 }
1157
1158
1159 /*****************************************************************************
1160         Checkpointing methods..
1161                 Pack all the data on a processor and send it to the buddy periodically
1162                 Also used to throw away message logs
1163 *****************************************************************************/
1164 CkVec<TProcessedLog> processedTicketLog;
1165 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1166 void clearUpMigratedRetainedLists(int PE);
1167
1168 void checkpointAlarm(void *_dummy,double curWallTime){
1169         double diff = curWallTime-lastCompletedAlarm;
1170         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1171 /*      if(CkWallTimer()-lastRestart < 50){
1172                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1173                 return;
1174         }*/
1175         if(diff < ((chkptPeriod) - 2)){
1176                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1177                 return;
1178         }
1179         CheckpointRequest request;
1180         request.PE = CkMyPe();
1181         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1182         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1183 };
1184
1185 void _checkpointRequestHandler(CheckpointRequest *request){
1186         startMlogCheckpoint(NULL,CmiWallTimer());
1187 }
1188
1189 /**
1190  * @brief Starts the checkpoint phase after migration.
1191  */
1192 void startMlogCheckpoint(void *_dummy, double curWallTime){
1193         double _startTime = CkWallTimer();
1194
1195         // increasing the checkpoint counter
1196         checkpointCount++;
1197         
1198 #if DEBUG_CHECKPOINT
1199         if(CmiMyPe() == 0){
1200                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1201         }
1202 #endif
1203
1204         DEBUG_MEM(CmiMemoryCheck());
1205
1206         PUP::sizer psizer;
1207         psizer | checkpointCount;
1208         for(int i=0; i<CmiNumPes(); i++){
1209                 psizer | CpvAccess(_incarnation)[i];
1210         }
1211         CkPupROData(psizer);
1212         DEBUG_MEM(CmiMemoryCheck());
1213         CkPupGroupData(psizer,CmiTrue);
1214         DEBUG_MEM(CmiMemoryCheck());
1215         CkPupNodeGroupData(psizer,CmiTrue);
1216         DEBUG_MEM(CmiMemoryCheck());
1217         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1218         DEBUG_MEM(CmiMemoryCheck());
1219
1220         int dataSize = psizer.size();
1221         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1222         char *msg = (char *)CmiAlloc(totalSize);
1223         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1224         chkMsg->PE = CkMyPe();
1225         chkMsg->dataSize = dataSize;
1226         
1227         char *buf = &msg[sizeof(CheckPointDataMsg)];
1228         PUP::toMem pBuf(buf);
1229
1230         pBuf | checkpointCount;
1231         for(int i=0; i<CmiNumPes(); i++){
1232                 pBuf | CpvAccess(_incarnation)[i];
1233         }
1234         CkPupROData(pBuf);
1235         CkPupGroupData(pBuf,CmiTrue);
1236         CkPupNodeGroupData(pBuf,CmiTrue);
1237         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1238
1239         unAckedCheckpoint=1;
1240         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1241         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1242         
1243         /*
1244                 Store the highest Ticket number processed for each chare on this processor
1245         */
1246         processedTicketLog.removeAll();
1247         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1248
1249 #if DEBUG_CHECKPOINT
1250         if(CmiMyPe() == 0){
1251                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1252         }
1253 #endif
1254
1255 #if COLLECT_STATS_MEMORY
1256         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1257         msgLogSize = 0;
1258         bufferedDetsSize = 0;
1259         storedDetsSize = 0;
1260 #endif
1261
1262         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1263                 lastCompletedAlarm = curWallTime;
1264                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1265         }
1266         traceUserBracketEvent(28,_startTime,CkWallTimer());
1267 };
1268
1269 /**
1270  * @brief A chare adds the latest ticket number processed.
1271  */
1272 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1273         DEBUG(char objString[100]);
1274
1275         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1276         TProcessedLog logEntry;
1277         logEntry.recver = mlogData->objID;
1278         logEntry.tProcessed = mlogData->tProcessed;
1279         log->push_back(logEntry);
1280
1281         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1282 }
1283
1284 class ElementPacker : public CkLocIterator {
1285 private:
1286         CkLocMgr *locMgr;
1287         PUP::er &p;
1288 public:
1289         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1290         void addLocation(CkLocation &loc) {
1291                 CkArrayIndexMax idx=loc.getIndex();
1292                 CkGroupID gID = locMgr->ckGetGroupID();
1293                 p|gID;      // store loc mgr's GID as well for easier restore
1294                 p|idx;
1295                 p|loc;
1296         }
1297 };
1298
1299 /**
1300  * Pups all the array elements in this processor.
1301  */
1302 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1303         int numElements,i;
1304         int numGroups = CkpvAccess(_groupIDTable)->size();      
1305         if(!p.isUnpacking()){
1306                 numElements = CkCountArrayElements();
1307         }       
1308         p | numElements;
1309         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1310         if(!p.isUnpacking()){
1311                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1312         }else{
1313                 //Flush all recs of all LocMgrs before putting in new elements
1314 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1315                 for(int j=0;j<listsize;j++){
1316                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1317                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1318                                 listToSkip[j].idx.print();
1319                         }
1320                 }
1321                 
1322                 printf("numElements = %d\n",numElements);
1323         
1324                 for (int i=0; i<numElements; i++) {
1325                         CkGroupID gID;
1326                         CkArrayIndexMax idx;
1327                         p|gID;
1328                 p|idx;
1329                         int flag=0;
1330                         int matchedIdx=0;
1331                         for(int j=0;j<listsize;j++){
1332                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1333                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1334                                                 matchedIdx = j;
1335                                                 flag = 1;
1336                                                 break;
1337                                         }
1338                                 }
1339                         }
1340                         if(flag == 1){
1341                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1342                         }else{
1343                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1344                         }
1345                                 
1346                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1347                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1348                         mgr->resume(idx,p,create,flag);
1349                         if(flag == 1){
1350                                 int homePE = mgr->homePe(idx);
1351                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1352                         }
1353                 }
1354         }
1355 };
1356
1357
1358 void writeCheckpointToDisk(int size,char *chkpt){
1359         char fNameTemp[100];
1360         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1361         int fd = creat(fNameTemp,S_IRWXU);
1362         int ret = write(fd,chkpt,size);
1363         CkAssert(ret == size);
1364         close(fd);
1365         
1366         char fName[100];
1367         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1368         unlink(fName);
1369
1370         rename(fNameTemp,fName);
1371 }
1372
1373 //handler that receives the checkpoint from a processor
1374 //it stores it and acks it
1375 void _storeCheckpointHandler(char *msg){
1376         double _startTime=CkWallTimer();
1377                 
1378         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1379         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1380         
1381         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1382         
1383         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1384         if(oldChkpt != NULL){
1385                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1386                 CmiFree(oldmsg);
1387         }
1388         //turning off storing checkpoints
1389         
1390         int sendingPE = chkMsg->PE;
1391         
1392         CpvAccess(_storedCheckpointData)->buf = chkpt;
1393         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1394         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1395
1396         int count=0;
1397         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1398                 if(migratedNoticeList[j].fromPE == sendingPE){
1399                         migratedNoticeList[j].ackFrom = 1;
1400                 }else{
1401                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1402                 }
1403                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1404                         migratedNoticeList.remove(j);
1405                         count++;
1406                 }
1407                 
1408         }
1409         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1410         
1411         CheckPointAck ackMsg;
1412         ackMsg.PE = CkMyPe();
1413         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1414         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1415         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1416         
1417         traceUserBracketEvent(29,_startTime,CkWallTimer());
1418 };
1419
1420
1421 /**
1422  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1423  * @note The remove log request message looks like
1424                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1425  */
1426 void sendRemoveLogRequests(){
1427 #if SYNCHRONIZED_CHECKPOINT
1428         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1429 #endif
1430         double _startTime = CkWallTimer();      
1431
1432         // computing total message size
1433         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1434         char *requestMsg = (char *)CmiAlloc(totalSize);
1435
1436         // filling up the message
1437         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1438         request->PE = CkMyPe();
1439         request->numberObjects = processedTicketLog.size();
1440         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1441         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1442         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1443         int *numDeterminants = (int *)listDeterminants;
1444         numDeterminants[0] = 0; 
1445         listDeterminants = (char *)&numDeterminants[1];
1446
1447         // setting the handler in the message
1448         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1449         
1450         DEBUG_MEM(CmiMemoryCheck());
1451         for(int i=0;i<CkNumPes();i++){
1452                 CmiSyncSend(i,totalSize,requestMsg);
1453         }
1454         CmiFree(requestMsg);
1455
1456         clearUpMigratedRetainedLists(CmiMyPe());
1457         //TODO: clear ticketTable
1458         
1459         traceUserBracketEvent(30,_startTime,CkWallTimer());
1460
1461         DEBUG_MEM(CmiMemoryCheck());
1462 }
1463
1464
1465 void _checkpointAckHandler(CheckPointAck *ackMsg){
1466         DEBUG_MEM(CmiMemoryCheck());
1467         unAckedCheckpoint=0;
1468         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1469         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1470         if(onGoingLoadBalancing){
1471                 onGoingLoadBalancing = 0;
1472                 finishedCheckpointLoadBalancing();
1473         }else{
1474                 sendRemoveLogRequests();
1475         }
1476         CmiFree(ackMsg);
1477         
1478 };
1479
1480
1481 /**
1482  * @brief Inserts all the determinants into a hash table.
1483  */
1484 inline void populateDeterminantTable(char *data){
1485         DEBUG(char recverString[100]);
1486         DEBUG(char senderString[100]);
1487         int numDets, *numDetsPtr;
1488         Determinant *detList;
1489         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1490         SNToTicket *tickets;
1491         Ticket ticket;
1492
1493         RemoveLogRequest *request = (RemoveLogRequest *)data;
1494         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1495         
1496         numDetsPtr = (int *)&list[request->numberObjects];
1497         numDets = numDetsPtr[0];
1498         detList = (Determinant *)&numDetsPtr[1];
1499
1500         // inserting determinants into hashtable
1501         for(int i=0; i<numDets; i++){
1502                 table = detTable.get(detList[i].sender);
1503                 if(table == NULL){
1504                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1505                         detTable.put(detList[i].sender) = table;
1506                 }
1507                 tickets = table->get(detList[i].receiver);
1508                 if(tickets == NULL){
1509                         tickets = new SNToTicket();
1510                         table->put(detList[i].receiver) = tickets; 
1511                 }
1512                 ticket.TN = detList[i].TN;
1513                 tickets->put(detList[i].SN) = ticket;
1514         }
1515
1516         DEBUG_MEM(CmiMemoryCheck());    
1517
1518 }
1519
1520 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1521         int total;
1522         DEBUG(char nameString[100]);
1523         DEBUG_MEM(CmiMemoryCheck());
1524         CmiMemoryCheck();
1525         char *data = (char *)_data;
1526         RemoveLogRequest *request = (RemoveLogRequest *)data;
1527         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1528         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1529         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1530         SNToTicket *tickets;
1531         MCount TN;
1532
1533         int count=0;
1534         total = mlog->length();
1535         for(int i=0; i<total; i++){
1536                 MlogEntry *logEntry = mlog->deq();
1537
1538                 // looking message in determinant table
1539                 table = detTable.get(logEntry->env->sender);
1540                 if(table != NULL){
1541                         tickets = table->get(logEntry->env->recver);
1542                         if(tickets != NULL){
1543                                 TN = tickets->get(logEntry->env->SN).TN;
1544                                 if(TN != 0){
1545                                         logEntry->env->TN = TN;
1546                                 }
1547                         }
1548                 }
1549         
1550                 int match=0;
1551                 for(int j=0;j<request->numberObjects;j++){
1552                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1553                                 //this log Entry should be removed
1554                                 match = 1;
1555                                 break;
1556                         }
1557                 }
1558                 char senderString[100],recverString[100];
1559 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1560                 if(match){
1561                         count++;
1562                         delete logEntry;
1563                 }else{
1564                         mlog->enq(logEntry);
1565                 }
1566         }
1567         if(count > 0){
1568                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1569         }
1570         DEBUG_MEM(CmiMemoryCheck());
1571         CmiMemoryCheck();
1572 }
1573
1574 /**
1575  * @brief Removes messages in the log according to the received ticket numbers
1576  */
1577 void _removeProcessedLogHandler(char *requestMsg){
1578         double start = CkWallTimer();
1579
1580         // building map for determinants
1581         populateDeterminantTable(requestMsg);
1582
1583         // removing processed messages from the message log
1584         forAllCharesDo(removeProcessedLogs,requestMsg);
1585
1586         // @todo: clean determinant table 
1587         
1588         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1589         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1590         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1591         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1592         /*
1593                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1594         */
1595         DEBUG_MEM(CmiMemoryCheck());
1596         clearUpMigratedRetainedLists(request->PE);
1597         
1598         traceUserBracketEvent(20,start,CkWallTimer());
1599         CmiFree(requestMsg);    
1600 };
1601
1602
1603 void clearUpMigratedRetainedLists(int PE){
1604         int count=0;
1605         CmiMemoryCheck();
1606         
1607         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1608                 if(migratedNoticeList[j].toPE == PE){
1609                         migratedNoticeList[j].ackTo = 1;
1610                 }
1611                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1612                         migratedNoticeList.remove(j);
1613                         count++;
1614                 }
1615         }
1616         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1617         
1618         for(int j=retainedObjectList.size()-1;j>=0;j--){
1619                 if(retainedObjectList[j]->migRecord.toPE == PE){
1620                         RetainedMigratedObject *obj = retainedObjectList[j];
1621                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1622                         retainedObjectList.remove(j);
1623                         if(obj->msg != NULL){
1624                                 CmiMemoryCheck();
1625                                 CmiFree(obj->msg);
1626                         }
1627                         delete obj;
1628                 }
1629         }
1630 }
1631
1632 /***************************************************************
1633         Restart Methods and handlers
1634 ***************************************************************/        
1635
1636 /**
1637  * Function for restarting the crashed processor.
1638  * It sets the restart flag and contacts the buddy
1639  * processor to get the latest checkpoint.
1640  */
1641 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1642         RestartRequest msg;
1643
1644         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1645
1646         // setting the restart flag
1647         _restartFlag = 1;
1648         _numRestartResponses = 0;
1649
1650         // if we are using team-based message logging, all members of the group have to be restarted
1651         if(teamSize > 1){
1652                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1653                         if(i != CkMyPe() && i < CkNumPes()){
1654                                 // sending a message to the team member
1655                                 msg.PE = CkMyPe();
1656                             CmiSetHandler(&msg,_restartHandlerIdx);
1657                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1658                         }
1659                 }
1660         }
1661
1662         // requesting the latest checkpoint from its buddy
1663         msg.PE = CkMyPe();
1664         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1665         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1666 };
1667
1668 /**
1669  * Function to restart this processor.
1670  * The handler is invoked by a member of its same team in message logging.
1671  */
1672 void _restartHandler(RestartRequest *restartMsg){
1673         int i;
1674         int numGroups = CkpvAccess(_groupIDTable)->size();
1675         RestartRequest msg;
1676         
1677         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1678
1679     // setting the restart flag
1680         _restartFlag = 1;
1681         _numRestartResponses = 0;
1682
1683         // flushing all buffers
1684         //TEST END
1685 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1686         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1687         for(int i=0;i<numGroups;i++){
1688         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1689                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1690                 obj->flushStates();
1691                 obj->ckJustMigrated();
1692         }*/
1693
1694     // requesting the latest checkpoint from its buddy
1695         msg.PE = CkMyPe();
1696         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1697         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1698 }
1699
1700
1701 /**
1702  * Gets the stored checkpoint but calls another function in the sender.
1703  */
1704 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1705
1706         // retrieving the stored checkpoint
1707         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1708
1709         // making sure it is its buddy who is requesting the checkpoint
1710         CkAssert(restartMsg->PE == storedChkpt->PE);
1711
1712         storedRequest = restartMsg;
1713         verifyAckTotal = 0;
1714
1715         for(int i=0;i<migratedNoticeList.size();i++){
1716                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1717 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1718                         if(migratedNoticeList[i].ackFrom == 0){
1719                                 //need to verify if the object actually exists .. it might not
1720                                 //have been acked but it might exist on it
1721                                 VerifyAckMsg msg;
1722                                 msg.migRecord = migratedNoticeList[i];
1723                                 msg.index = i;
1724                                 msg.fromPE = CmiMyPe();
1725                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1726                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1727                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1728                                 verifyAckTotal++;
1729                         }
1730                 }
1731         }
1732
1733         // sending the checkpoint back to its buddy     
1734         if(verifyAckTotal == 0){
1735                 sendCheckpointData(MLOG_RESTARTED);
1736         }
1737         verifyAckCount = 0;
1738 }
1739
1740 /**
1741  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1742  */
1743 void _recvRestartCheckpointHandler(char *_restartData){
1744         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1745         MigrationRecord *migratedAwayElements;
1746
1747         globalLBID = restartData->lbGroupID;
1748         
1749         restartData->restartWallTime *= 1000;
1750         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1751         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1752         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1753
1754         
1755         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1756         char *buf = &_restartData[sizeof(RestartProcessorData)];
1757         
1758         if(restartData->numMigratedAwayElements != 0){
1759                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1760                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1761                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1762                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1763         }
1764
1765         // turning on the team recovery flag
1766         forAllCharesDo(setTeamRecovery,NULL);
1767         
1768         PUP::fromMem pBuf(buf);
1769         pBuf | checkpointCount;
1770         for(int i=0; i<CmiNumPes(); i++){
1771                 pBuf | CpvAccess(_incarnation)[i];
1772         }
1773         CkPupROData(pBuf);
1774         CkPupGroupData(pBuf,CmiFalse);
1775         CkPupNodeGroupData(pBuf,CmiFalse);
1776         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1777         CkAssert(pBuf.size() == restartData->checkPointSize);
1778         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1779
1780         // increasing the incarnation number of all the team members
1781         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1782                 CpvAccess(_incarnation)[i]++;
1783         }
1784         
1785         // turning off the team recovery flag
1786         forAllCharesDo(unsetTeamRecovery,NULL);
1787
1788         // initializing a few variables for handling local messages
1789         forAllCharesDo(initializeRestart,NULL);
1790         
1791         CmiFree(_restartData);  
1792
1793         /*HERE _initDone();
1794
1795         getGlobalStep(globalLBID);
1796         
1797         countUpdateHomeAcks = 0;
1798         RestartRequest updateHomeRequest;
1799         updateHomeRequest.PE = CmiMyPe();
1800         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1801         for(int i=0;i<CmiNumPes();i++){
1802                 if(i != CmiMyPe()){
1803                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1804                 }
1805         }
1806 */
1807
1808
1809         // Send out the request to resend logged messages to all other processors
1810         CkVec<TProcessedLog> objectVec;
1811         forAllCharesDo(createObjIDList, (void *)&objectVec);
1812         int numberObjects = objectVec.size();
1813         
1814         /*
1815                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1816         */
1817         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1818         char *resendMsg = (char *)CmiAlloc(totalSize);  
1819
1820         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1821         resendReq->PE =CkMyPe(); 
1822         resendReq->numberObjects = numberObjects;
1823         char *objList = &resendMsg[sizeof(ResendRequest)];
1824         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1825         
1826
1827         /* test for parallel restart migrate away object**/
1828 //      if(fastRecovery){
1829 //              distributeRestartedObjects();
1830 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1831 //      }
1832         
1833         /*      To make restart work for load balancing.. should only
1834         be used when checkpoint happens along with load balancing
1835         **/
1836 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1837
1838         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1839         CpvAccess(_currentObj) = lb;
1840         lb->ReceiveDummyMigration(restartDecisionNumber);
1841
1842         sleep(10);
1843         
1844         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1845         for(int i=0;i<CkNumPes();i++){
1846                 if(i != CkMyPe()){
1847                         CmiSyncSend(i,totalSize,resendMsg);
1848                 }       
1849         }
1850         _resendMessagesHandler(resendMsg);
1851
1852 }
1853
1854
1855 void CkMlogRestartDouble(void *,double){
1856         CkMlogRestart(NULL,NULL);
1857 };
1858
1859 //TML: restarting from local (group) failure
1860 void CkMlogRestartLocal(){
1861     CkMlogRestart(NULL,NULL);
1862 };
1863
1864 /**
1865  * Gets the stored checkpoint for its buddy processor.
1866  */
1867 void _getCheckpointHandler(RestartRequest *restartMsg){
1868
1869         // retrieving the stored checkpoint
1870         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1871
1872         // making sure it is its buddy who is requesting the checkpoint
1873         CkAssert(restartMsg->PE == storedChkpt->PE);
1874
1875         storedRequest = restartMsg;
1876         verifyAckTotal = 0;
1877
1878         for(int i=0;i<migratedNoticeList.size();i++){
1879                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1880 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1881                         if(migratedNoticeList[i].ackFrom == 0){
1882                                 //need to verify if the object actually exists .. it might not
1883                                 //have been acked but it might exist on it
1884                                 VerifyAckMsg msg;
1885                                 msg.migRecord = migratedNoticeList[i];
1886                                 msg.index = i;
1887                                 msg.fromPE = CmiMyPe();
1888                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1889                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1890                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1891                                 verifyAckTotal++;
1892                         }
1893                 }
1894         }
1895
1896         // sending the checkpoint back to its buddy     
1897         if(verifyAckTotal == 0){
1898                 sendCheckpointData(MLOG_CRASHED);
1899         }
1900         verifyAckCount = 0;
1901 }
1902
1903
1904 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1905         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1906         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1907         if(rec != NULL && rec->type() == CkLocRec::local){
1908                         //this location exists on this processor
1909                         //and needs to be removed       
1910                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1911                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1912                         
1913                         int localIdx = localRec->getLocalIndex();
1914                         LBDatabase *lbdb = localRec->getLBDB();
1915                         LDObjHandle ldHandle = localRec->getLdHandle();
1916                                 
1917                         locMgr->setDuringMigration(true);
1918                         
1919                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1920                         lbdb->UnregisterObj(ldHandle);
1921                         
1922                         locMgr->setDuringMigration(false);
1923                         
1924                         verifyAckedRequests++;
1925
1926         }
1927         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1928         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1929 };
1930
1931
1932 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1933         int index =     verifyReply->index;
1934         migratedNoticeList[index] = verifyReply->migRecord;
1935         verifyAckCount++;
1936         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1937         if(verifyAckCount == verifyAckTotal){
1938                 sendCheckpointData(MLOG_CRASHED);
1939         }
1940 }
1941
1942
1943 /**
1944  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
1945  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
1946  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
1947  */
1948 void sendCheckpointData(int mode){      
1949         RestartRequest *restartMsg = storedRequest;
1950         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1951         int numMigratedAwayElements = migratedNoticeList.size();
1952         if(migratedNoticeList.size() != 0){
1953                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1954 //                      CkAssert(migratedNoticeList.size() == 0);
1955         }
1956         
1957         
1958         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
1959         
1960         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
1961         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
1962         
1963         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
1964         
1965         char *msg = (char *)CmiAlloc(totalSize);
1966         
1967         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
1968         dataMsg->PE = CkMyPe();
1969         dataMsg->restartWallTime = CmiTimer();
1970         dataMsg->checkPointSize = storedChkpt->bufSize;
1971         
1972         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
1973 //      dataMsg->numMigratedAwayElements = 0;
1974         
1975         dataMsg->numMigratedInElements = 0;
1976         dataMsg->migratedElementSize = 0;
1977         dataMsg->lbGroupID = globalLBID;
1978         /*msg layout 
1979                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
1980                 Local MessageLog|
1981         */
1982         //store checkpoint data
1983         char *buf = &msg[sizeof(RestartProcessorData)];
1984
1985         if(dataMsg->numMigratedAwayElements != 0){
1986                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
1987                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
1988         }
1989         
1990
1991         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
1992         buf = &buf[storedChkpt->bufSize];
1993
1994         if(mode == MLOG_RESTARTED){
1995                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
1996                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
1997                 CmiFree(restartMsg);
1998         }else{
1999                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2000                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2001                 CmiFree(restartMsg);
2002         }
2003 };
2004
2005
2006 // this list is used to create a vector of the object ids of all
2007 //the chares on this processor currently and the highest TN processed by them 
2008 //the first argument is actually a CkVec<TProcessedLog> *
2009 void createObjIDList(void *data,ChareMlogData *mlogData){
2010         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2011         TProcessedLog entry;
2012         entry.recver = mlogData->objID;
2013         entry.tProcessed = mlogData->tProcessed;
2014         list->push_back(entry);
2015         DEBUG_TEAM(char objString[100]);
2016         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2017         DEBUG_RECOVERY(printLog(&entry));
2018 }
2019
2020
2021 /**
2022  * Receives the checkpoint data from its buddy, restores the state of all the objects
2023  * and asks everyone else to update its home.
2024  */
2025 void _recvCheckpointHandler(char *_restartData){
2026         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2027         MigrationRecord *migratedAwayElements;
2028
2029         globalLBID = restartData->lbGroupID;
2030         
2031         restartData->restartWallTime *= 1000;
2032         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2033         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2034         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2035
2036         
2037         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2038         char *buf = &_restartData[sizeof(RestartProcessorData)];
2039         
2040         if(restartData->numMigratedAwayElements != 0){
2041                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2042                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2043                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2044                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2045         }
2046         
2047         PUP::fromMem pBuf(buf);
2048
2049         pBuf | checkpointCount;
2050         for(int i=0; i<CmiNumPes(); i++){
2051                 pBuf | CpvAccess(_incarnation)[i];
2052         }
2053         CkPupROData(pBuf);
2054         CkPupGroupData(pBuf,CmiTrue);
2055         CkPupNodeGroupData(pBuf,CmiTrue);
2056         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2057         CkAssert(pBuf.size() == restartData->checkPointSize);
2058         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2059
2060         // increases the incarnation number
2061         CpvAccess(_incarnation)[CmiMyPe()]++;
2062         
2063         forAllCharesDo(initializeRestart,NULL);
2064         
2065         CmiFree(_restartData);
2066         
2067         _initDone();
2068
2069         getGlobalStep(globalLBID);
2070
2071         // sending request to send determinants
2072         _numRestartResponses = 0;
2073         
2074         // Send out the request to resend logged determinants to all other processors
2075         CkVec<TProcessedLog> objectVec;
2076         forAllCharesDo(createObjIDList, (void *)&objectVec);
2077         int numberObjects = objectVec.size();
2078         
2079         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2080         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2081         char *resendMsg = (char *)CmiAlloc(totalSize);  
2082
2083         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2084         resendReq->PE =CkMyPe(); 
2085         resendReq->numberObjects = numberObjects;
2086         char *objList = &resendMsg[sizeof(ResendRequest)];
2087         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2088
2089         CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
2090         for(int i=0;i<CkNumPes();i++){
2091                 if(i != CkMyPe()){
2092                         CmiSyncSend(i,totalSize,resendMsg);
2093                 }
2094         }
2095         CmiFree(resendMsg);
2096         
2097 }
2098
2099 /**
2100  * Receives the updateHome ACKs from all other processors. Once everybody
2101  * has replied, it sends a request to resend the logged messages.
2102  */
2103 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2104         countUpdateHomeAcks++;
2105         CmiFree(updateHomeAck);
2106         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2107         if(countUpdateHomeAcks != CmiNumPes()){
2108                 return;
2109         }
2110
2111         // Send out the request to resend logged messages to all other processors
2112         CkVec<TProcessedLog> objectVec;
2113         forAllCharesDo(createObjIDList, (void *)&objectVec);
2114         int numberObjects = objectVec.size();
2115         
2116         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2117         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2118         char *resendMsg = (char *)CmiAlloc(totalSize);  
2119
2120         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2121         resendReq->PE =CkMyPe(); 
2122         resendReq->numberObjects = numberObjects;
2123         char *objList = &resendMsg[sizeof(ResendRequest)];
2124         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2125
2126         /* test for parallel restart migrate away object**/
2127         if(fastRecovery){
2128                 distributeRestartedObjects();
2129                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2130         }
2131         
2132         /*      To make restart work for load balancing.. should only
2133         be used when checkpoint happens along with load balancing
2134         **/
2135 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2136
2137         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2138         CpvAccess(_currentObj) = lb;
2139         lb->ReceiveDummyMigration(restartDecisionNumber);
2140
2141 //HERE  sleep(10);
2142         
2143         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2144         for(int i=0;i<CkNumPes();i++){
2145                 if(i != CkMyPe()){
2146                         CmiSyncSend(i,totalSize,resendMsg);
2147                 }
2148         }
2149         _resendMessagesHandler(resendMsg);
2150         CmiFree(resendMsg);
2151
2152 };
2153
2154 /**
2155  * @brief Initializes variables and flags for restarting procedure.
2156  */
2157 void initializeRestart(void *data, ChareMlogData *mlogData){
2158         mlogData->resendReplyRecvd = 0;
2159         mlogData->receivedTNs = new CkVec<MCount>;
2160         mlogData->restartFlag = 1;
2161 };
2162
2163 /**
2164  * Updates the homePe of chare array elements.
2165  */
2166 void updateHomePE(void *data,ChareMlogData *mlogData){
2167         RestartRequest *updateRequest = (RestartRequest *)data;
2168         int PE = updateRequest->PE; //restarted PE
2169         //if this object is an array Element and its home is the restarted processor
2170         // the home processor needs to know its current location
2171         if(mlogData->objID.type == TypeArray){
2172                 //it is an array element
2173                 CkGroupID myGID = mlogData->objID.data.array.id;
2174                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2175                 CkArrayID aid(mlogData->objID.data.array.id);           
2176                 //check if the restarted processor is the home processor for this object
2177                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2178                 if(locMgr->homePe(myIdx) == PE){
2179                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2180                         DEBUG_RESTART(myIdx.print());
2181                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2182                 }
2183         }
2184 };
2185
2186
2187 /**
2188  * Updates the homePe for all chares in this processor.
2189  */
2190 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2191         int sender = updateRequest->PE;
2192         
2193         forAllCharesDo(updateHomePE,updateRequest);
2194         
2195         updateRequest->PE = CmiMyPe();
2196         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2197         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2198         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2199                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2200                 checkpointCount--;
2201                 startMlogCheckpoint(NULL,0);
2202         }
2203         if(sender == getCheckPointPE()){
2204                 for(int i=0;i<retainedObjectList.size();i++){
2205                         if(retainedObjectList[i]->acked == 0){
2206                                 MigrationNotice migMsg;
2207                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2208                                 migMsg.record = retainedObjectList[i];
2209                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2210                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2211                         }
2212                 }
2213         }
2214 }
2215
2216 /**
2217  * @brief Fills up the ticket vector for each chare.
2218  */
2219 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2220         DEBUG(char name[100]);
2221         ResendData *resendData = (ResendData *)data;
2222         int PE = resendData->PE; //restarted PE
2223         int count=0;
2224         CkHashtableIterator *iterator;
2225         void *objp;
2226         void *objkey;
2227         CkObjID *objID;
2228         SNToTicket *snToTicket;
2229         Ticket ticket;
2230         
2231         // traversing the team table looking up for the maximum TN received     
2232         iterator = mlogData->teamTable.iterator();
2233         while( (objp = iterator->next(&objkey)) != NULL ){
2234                 objID = (CkObjID *)objkey;
2235         
2236                 // traversing the resendData structure to add ticket numbers
2237                 for(int j=0;j<resendData->numberObjects;j++){
2238                         if((*objID) == (resendData->listObjects)[j].recver){
2239                                 snToTicket = *(SNToTicket **)objp;
2240 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2241                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2242                                         ticket = snToTicket->get(snIndex);      
2243                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2244                                                 //store the TNs that have been since the recver last checkpointed
2245                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2246                                         }
2247                                 }
2248                         }
2249                 }
2250         }
2251
2252         //releasing the memory for the iterator
2253         delete iterator;
2254 }
2255
2256
2257 /**
2258  * @brief Turns on the flag for team recovery that selectively restores
2259  * particular metadata information.
2260  */
2261 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2262         char name[100];
2263         mlogData->teamRecoveryFlag = 1; 
2264 }
2265
2266 /**
2267  * @brief Turns off the flag for team recovery.
2268  */
2269 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2270         mlogData->teamRecoveryFlag = 0;
2271 }
2272
2273 /**
2274  * Prints a processed log.
2275  */
2276 void printLog(TProcessedLog *log){
2277         char recverString[100];
2278         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2279 }
2280
2281 /**
2282  * Prints information about a message.
2283  */
2284 void printMsg(envelope *env, const char* par){
2285         char senderString[100];
2286         char recverString[100];
2287         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2288 }
2289
2290 /**
2291  * Prints information about a determinant.
2292  */
2293 void printDet(Determinant *det, const char* par){
2294         char senderString[100];
2295         char recverString[100];
2296         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2297 }
2298
2299 /**
2300  * @brief Resends all the logged messages to a particular chare list.
2301  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2302  * @param mlogData a particular chare living in this processor.
2303  */
2304 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2305         DEBUG_RESTART(char nameString[100]);
2306         DEBUG_RESTART(char recverString[100]);
2307         DEBUG_RESTART(char senderString[100]);
2308
2309         ResendData *resendData = (ResendData *)data;
2310         int PE = resendData->PE; //restarted PE
2311         int count=0;
2312         int ticketRequests=0;
2313         CkQ<MlogEntry *> *log = mlogData->getMlog();
2314
2315         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2316
2317         // traversing the message log to see if we must resend a message        
2318         for(int i=0;i<log->length();i++){
2319                 MlogEntry *logEntry = (*log)[i];
2320                 
2321                 // if we sent out the logs of a local message to buddy and it crashed
2322                 //before acknowledging 
2323                 envelope *env = logEntry->env;
2324                 if(env == NULL){
2325                         continue;
2326                 }
2327         
2328                 // resend if type is not invalid        
2329                 if(env->recver.type != TypeInvalid){
2330                         for(int j=0;j<resendData->numberObjects;j++){
2331                                 if(env->recver == (resendData->listObjects)[j].recver){
2332                                         if(PE != CkMyPe()){
2333                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2334                                                 if(env->recver.type == TypeNodeGroup){
2335                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2336                                                 }else{
2337                                                         CmiSetHandler(env,CmiGetXHandler(env));
2338                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2339                                                 }
2340                                         }else{
2341                                                 envelope *copyEnv = copyEnvelope(env);
2342                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2343                                         }
2344                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2345                                         count++;
2346                                 }
2347                         }//end of for loop of objects
2348                         
2349                 }       
2350         }
2351         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2352 }
2353
2354 /**
2355  * Send all remote determinants to a particular failed PE.
2356  * It only sends determinants to those objects on the list.
2357  */
2358 void _sendDetsHandler(char *msg){
2359         ResendData d;
2360         CkVec<Determinant> *detVec;
2361         ResendRequest *resendReq = (ResendRequest *)msg;
2362
2363         // CkPrintf("[%d] Sending determinants\n",CkMyPe());
2364
2365         // building the reply message
2366         char *listObjects = &msg[sizeof(ResendRequest)];
2367         d.numberObjects = resendReq->numberObjects;
2368         d.PE = resendReq->PE;
2369         d.listObjects = (TProcessedLog *)listObjects;
2370         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2371         detVec = new CkVec<Determinant>[d.numberObjects];
2372
2373         // adding the remote determinants to the resendReplyMsg
2374         // traversing all the remote determinants
2375         CkVec<Determinant> *vec;
2376         for(int i=0; i<d.numberObjects; i++){
2377                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2378                 if(vec != NULL){
2379                         for(int j=0; j<vec->size(); j++){
2380
2381                                 // only relevant determinants are to be sent
2382                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2383
2384                                         // adding the ticket in the ticket vector
2385                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2386
2387                                         // adding the determinant in the determinant vector
2388                                         detVec[i].push_back((*vec)[j]);
2389
2390                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2391                                 }
2392                         }
2393                 }
2394         }
2395
2396         int totalDetStored = 0;
2397         for(int i=0;i<d.numberObjects;i++){
2398                 totalDetStored += detVec[i].size();
2399         }
2400
2401         //send back the maximum ticket number for a message sent to each object on the 
2402         //restarted processor
2403         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2404         
2405         int totalTNStored=0;
2406         for(int i=0;i<d.numberObjects;i++){
2407                 totalTNStored += d.ticketVecs[i].size();
2408         }
2409         
2410         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2411         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2412         
2413         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2414         resendReply->PE = CkMyPe();
2415         resendReply->numberObjects = d.numberObjects;
2416         
2417         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2418         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2419         for(int i=0;i<d.numberObjects;i++){
2420                 replyObjects[i] = d.listObjects[i].recver;
2421         }
2422         
2423         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2424         for(int i=0;i<d.numberObjects;i++){
2425                 int vecsize = d.ticketVecs[i].size();
2426                 memcpy(ticketList,&vecsize,sizeof(int));
2427                 ticketList = &ticketList[sizeof(int)];
2428                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2429                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2430         }
2431
2432         // adding the stored remote determinants to the message
2433         for(int i=0;i<d.numberObjects;i++){
2434                 int vecsize = detVec[i].size();
2435                 memcpy(ticketList,&vecsize,sizeof(int));
2436                 ticketList = &ticketList[sizeof(int)];
2437                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2438                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2439         }
2440
2441         CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
2442         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2443         
2444         delete [] detVec;
2445         delete [] d.ticketVecs;
2446
2447         DEBUG_MEM(CmiMemoryCheck());
2448
2449         if(resendReq->PE != CkMyPe()){
2450                 CmiFree(msg);
2451         }       
2452 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2453         lastRestart = CmiWallTimer();
2454
2455 }
2456
2457 /**
2458  * Resends messages since last checkpoint to the list of objects included in the 
2459  * request. It also sends stored remote determinants to the particular failed PE.
2460  */
2461 void _resendMessagesHandler(char *msg){
2462         ResendData d;
2463         ResendRequest *resendReq = (ResendRequest *)msg;
2464
2465         //CkPrintf("[%d] Resending messages\n",CkMyPe());
2466
2467         // building the reply message
2468         char *listObjects = &msg[sizeof(ResendRequest)];
2469         d.numberObjects = resendReq->numberObjects;
2470         d.PE = resendReq->PE;
2471         d.listObjects = (TProcessedLog *)listObjects;
2472         
2473         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2474
2475         //TML: examines the origin processor to determine if it belongs to the same group.
2476         // In that case, it only returns the maximum ticket received for each object in the list.
2477         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2478                 forAllCharesDo(fillTicketForChare,&d);
2479         else
2480                 forAllCharesDo(resendMessageForChare,&d);
2481
2482         DEBUG_MEM(CmiMemoryCheck());
2483
2484         if(resendReq->PE != CkMyPe()){
2485                 CmiFree(msg);
2486         }       
2487 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2488         lastRestart = CmiWallTimer();
2489 }
2490
2491 void sortVec(CkVec<MCount> *TNvec);
2492 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2493
2494 /**
2495  * @brief Processes the messages in the delayed remote message queue
2496  */
2497 void processDelayedRemoteMsgQueue(){
2498         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2499         
2500         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2501                 void *qMsgPtr;
2502                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2503                 envelope *qEnv = (envelope *)qMsgPtr;
2504                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2505                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2506                 DEBUG_MEM(CmiMemoryCheck());
2507         }
2508
2509 }
2510
2511 /**
2512  * @brief Receives determinants stored on remote nodes.
2513  * Message format: |Header|ObjID list|TN list|Determinant list|
2514  * TN list = |number of TNs|list of TNs|...|
2515  */
2516 void _sendDetsReplyHandler(char *msg){
2517         ResendRequest *resendReply = (ResendRequest *)msg;
2518         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2519         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2520         
2521 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2522         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2523         for(int i =0; i< resendReply->numberObjects;i++){
2524                 Chare *obj = (Chare *)listObjects[i].getObject();
2525                 
2526                 int vecsize;
2527                 memcpy(&vecsize,listTickets,sizeof(int));
2528                 listTickets = &listTickets[sizeof(int)];
2529                 MCount *listTNs = (MCount *)listTickets;
2530                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2531         
2532                 if(obj != NULL){
2533                         //the object was restarted on the processor on which it existed
2534                         processReceivedTN(obj,vecsize,listTNs);
2535                 }else{
2536                         //pack up objID vecsize and listTNs and send it to the correct processor
2537                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2538                         char *TNMsg = (char *)CmiAlloc(totalSize);
2539                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2540                         receivedTNData->recver = listObjects[i];
2541                         receivedTNData->numTNs = vecsize;
2542                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2543                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2544                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2545                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2546                 }
2547
2548         }
2549
2550         // traversing all the retrieved determinants
2551         for(int i = 0; i < resendReply->numberObjects; i++){
2552                 Chare *obj = (Chare *)listObjects[i].getObject();
2553                 
2554                 int vecsize;
2555                 memcpy(&vecsize,listTickets,sizeof(int));
2556                 listTickets = &listTickets[sizeof(int)];
2557                 Determinant *listDets = (Determinant *)listTickets;     
2558                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2559         
2560                 if(obj != NULL){
2561                         //the object was restarted on the processor on which it existed
2562                         processReceivedDet(obj,vecsize,listDets);
2563                 } else {
2564                         // pack the determinants and ship them to the other processor
2565                         // pack up objID vecsize and listDets and send it to the correct processor
2566                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2567                         char *detMsg = (char *)CmiAlloc(totalSize);
2568                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2569                         receivedDetData->recver = listObjects[i];
2570                         receivedDetData->numDets = vecsize;
2571                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2572                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2573                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2574                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2575                 }
2576
2577         }
2578
2579         // checking if we have received all replies
2580         _numRestartResponses++;
2581         if(_numRestartResponses != CkNumPes())
2582                 return;
2583         else 
2584                 _numRestartResponses = 0;
2585
2586         
2587         // continuing with restart process; send out the request to resend logged messages to all other processors
2588         CkVec<TProcessedLog> objectVec;
2589         forAllCharesDo(createObjIDList, (void *)&objectVec);
2590         int numberObjects = objectVec.size();
2591         
2592         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2593         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2594         char *resendMsg = (char *)CmiAlloc(totalSize);  
2595
2596         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2597         resendReq->PE =CkMyPe(); 
2598         resendReq->numberObjects = numberObjects;
2599         char *objList = &resendMsg[sizeof(ResendRequest)];
2600         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2601
2602         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2603         CpvAccess(_currentObj) = lb;
2604         lb->ReceiveDummyMigration(restartDecisionNumber);
2605
2606 //HERE  sleep(10);
2607 //      CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
2608         
2609         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2610         for(int i=0;i<CkNumPes();i++){
2611                 if(i != CkMyPe()){
2612                         CmiSyncSend(i,totalSize,resendMsg);
2613                 }
2614         }
2615         _resendMessagesHandler(resendMsg);
2616         CmiFree(resendMsg);
2617
2618         /* test for parallel restart migrate away object**/
2619         if(fastRecovery){
2620                 distributeRestartedObjects();
2621                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2622         }
2623
2624 //      processDelayedRemoteMsgQueue();
2625
2626 };
2627
2628 /**
2629  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2630  */
2631 void _receivedDetDataHandler(ReceivedDetData *msg){
2632         DEBUG_NOW(char objName[100]);
2633         Chare *obj = (Chare *) msg->recver.getObject();
2634         if(obj){                
2635                 char *_msg = (char *)msg;
2636                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2637                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2638                 processReceivedDet(obj,msg->numDets,listDets);
2639                 CmiFree(msg);
2640         }else{
2641                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2642                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2643         }
2644 }
2645
2646 /**
2647  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2648  */
2649 void _receivedTNDataHandler(ReceivedTNData *msg){
2650         DEBUG_NOW(char objName[100]);
2651         Chare *obj = (Chare *) msg->recver.getObject();
2652         if(obj){                
2653                 char *_msg = (char *)msg;
2654                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2655                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2656                 processReceivedTN(obj,msg->numTNs,listTNs);
2657                 CmiFree(msg);
2658         }else{
2659                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2660                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2661         }
2662 };
2663
2664 /**
2665  * @brief Processes the received list of determinants from a particular PE.
2666  */
2667 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2668         Determinant *det;
2669
2670         // traversing the whole list of determinants
2671         for(int i=0; i<listSize; i++){
2672                 det = &listDets[i];
2673                 if(CkMyPe() == 4) printDet(det,"RECOVERY");
2674                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2675                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2676         }
2677         
2678         DEBUG_MEM(CmiMemoryCheck());
2679 }
2680         
2681 /**
2682  * @brief Processes the received list of tickets from a particular PE.
2683  */
2684 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2685         // increases the number of resendReply received
2686         obj->mlogData->resendReplyRecvd++;
2687
2688         DEBUG(char objName[100]);
2689         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2690         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2691         //if(obj->mlogData->receivedTNs == NULL)
2692         //      CkPrintf("NULL\n");     
2693         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2694
2695         // includes the tickets into the receivedTN structure
2696         for(int j=0;j<listSize;j++){
2697                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2698         }
2699         
2700         //if this object has received all the replies find the ticket numbers
2701         //that senders know about. Those less than the ticket number processed 
2702         //by the receiver can be thrown away. The rest need not be consecutive
2703         // ie there can be holes in the list of ticket numbers seen by senders
2704         if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
2705                 obj->mlogData->resendReplyRecvd = 0;
2706                 //sort the received TNS
2707                 sortVec(obj->mlogData->receivedTNs);
2708         
2709                 //after all the received tickets are in we need to sort them and then 
2710                 // calculate the holes  
2711                 if(obj->mlogData->receivedTNs->size() > 0){
2712                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2713                         int vecsize = obj->mlogData->receivedTNs->size();
2714                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2715                         
2716                         // updating tCount with the highest ticket handed out
2717                         if(teamSize > 1){
2718                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2719                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2720                         }else{
2721                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2722                         }
2723                         
2724                         if(numberHoles == 0){
2725                         }else{
2726                                 char objName[100];                                      
2727                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2728                                 obj->mlogData->numberHoles = numberHoles;
2729                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2730                                 int countHoles=0;
2731                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2732                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2733                                                 //the TNs are not consecutive at this point
2734                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2735                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2736                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2737                                                         countHoles++;
2738                                                 }       
2739                                         }
2740                                 }
2741                                 //Holes have been given new TN
2742                                 if(countHoles != numberHoles){
2743                                         char str[100];
2744                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2745                                 }
2746                                 CkAssert(countHoles == numberHoles);                                    
2747                                 obj->mlogData->currentHoles = numberHoles;
2748                         }
2749                 }
2750         
2751                 // cleaning up structures and getting ready to continue execution       
2752                 delete obj->mlogData->receivedTNs;
2753                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2754                 obj->mlogData->receivedTNs = NULL;
2755                 obj->mlogData->restartFlag = 0;
2756
2757                 // processDelayedRemoteMsgQueue();
2758
2759                 DEBUG_RESTART(char objString[100]);
2760                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2761         }
2762
2763 }
2764
2765
2766 void sortVec(CkVec<MCount> *TNvec){
2767         //sort it ->its bloddy bubble sort
2768         //TODO: use quicksort
2769         for(int i=0;i<TNvec->size();i++){
2770                 for(int j=i+1;j<TNvec->size();j++){
2771                         if((*TNvec)[j] < (*TNvec)[i]){
2772                                 MCount temp;
2773                                 temp = (*TNvec)[i];
2774                                 (*TNvec)[i] = (*TNvec)[j];
2775                                 (*TNvec)[j] = temp;
2776                         }
2777                 }
2778         }
2779         //make it unique .. since its sorted all equal units will be consecutive
2780         MCount *tempArray = new MCount[TNvec->size()];
2781         int     uniqueCount=-1;
2782         for(int i=0;i<TNvec->size();i++){
2783                 tempArray[i] = 0;
2784                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2785                         uniqueCount++;
2786                         tempArray[uniqueCount] = (*TNvec)[i];
2787                 }
2788         }
2789         uniqueCount++;
2790         TNvec->removeAll();
2791         for(int i=0;i<uniqueCount;i++){
2792                 TNvec->push_back(tempArray[i]);
2793         }
2794         delete [] tempArray;
2795 }       
2796
2797 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2798         if(TNVec->size() == 0){
2799                 return -1; //not found in an empty vec
2800         }
2801         //binary search to find 
2802         int left=0;
2803         int right = TNVec->size();
2804         int mid = (left +right)/2;
2805         while(searchTN != (*TNVec)[mid] && left < right){
2806                 if((*TNVec)[mid] > searchTN){
2807                         right = mid-1;
2808                 }else{
2809                         left = mid+1;
2810                 }
2811                 mid = (left + right)/2;
2812         }
2813         if(left < right){
2814                 //mid is the element to be returned
2815                 return mid;
2816         }else{
2817                 if(mid < TNVec->size() && mid >=0){
2818                         if((*TNVec)[mid] == searchTN){
2819                                 return mid;
2820                         }else{
2821                                 return -1;
2822                         }
2823                 }else{
2824                         return -1;
2825                 }
2826         }
2827 };
2828
2829
2830 /*
2831         Method to do parallel restart. Distribute some of the array elements to other processors.
2832         The problem is that we cant use to charm entry methods to do migration as it will get
2833         stuck in the protocol that is going to restart
2834         Note: in order to avoid interference between the objects being recovered, the current PE
2835     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2836     objects end up stepping into each other's shoes (interference).
2837 */
2838
2839 class ElementDistributor: public CkLocIterator{
2840         CkLocMgr *locMgr;
2841         int *targetPE;
2842
2843         void pupLocation(CkLocation &loc,PUP::er &p){
2844                 CkArrayIndexMax idx=loc.getIndex();
2845                 CkGroupID gID = locMgr->ckGetGroupID();
2846                 p|gID;      // store loc mgr's GID as well for easier restore
2847                 p|idx;
2848                 p|loc;
2849         };
2850 public:
2851         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2852
2853         void addLocation(CkLocation &loc){
2854
2855                 // leaving object on this PE
2856                 if(*targetPE == CkMyPe()){
2857                         *targetPE = (*targetPE +1)%CkNumPes();
2858                         return;
2859                 }
2860                         
2861                 CkArrayIndexMax idx = loc.getIndex();
2862                 CkLocRec_local *rec = loc.getLocalRecord();
2863                         
2864                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2865                 idx.print();
2866                         
2867                 //TODO: an element that is being moved should leave some trace behind so that
2868                 // the arraybroadcaster can forward messages to it
2869                         
2870                 //pack up this location and send it across
2871                 PUP::sizer psizer;
2872                 pupLocation(loc,psizer);
2873                 int totalSize = psizer.size()+CmiMsgHeaderSizeBytes;
2874                 char *msg = (char *)CmiAlloc(totalSize);
2875                 char *buf = &msg[CmiMsgHeaderSizeBytes];
2876                 PUP::toMem pmem(buf);
2877                 pmem.becomeDeleting();
2878                 pupLocation(loc,pmem);
2879                         
2880                 locMgr->setDuringMigration(CmiTrue);
2881                 delete rec;
2882                 locMgr->setDuringMigration(CmiFalse);
2883                 locMgr->inform(idx,*targetPE);
2884
2885                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2886                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2887
2888                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2889
2890                 //decide on the target processor for the next object
2891                 *targetPE = *targetPE + 1;
2892                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2893                         *targetPE = CkMyPe() + 1;
2894                 }
2895         }
2896
2897 };
2898
2899 /**
2900  * Distributes objects to accelerate recovery after a failure.
2901  */
2902 void distributeRestartedObjects(){
2903         int numGroups = CkpvAccess(_groupIDTable)->size();      
2904         int i;
2905         int targetPE=CkMyPe()+1;
2906         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2907 };
2908
2909 /**
2910  * Handler to update information about an object just received.
2911  */
2912 void _distributedLocationHandler(char *receivedMsg){
2913         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2914         char *buf = &receivedMsg[CmiMsgHeaderSizeBytes];
2915         PUP::fromMem pmem(buf);
2916         CkGroupID gID;
2917         CkArrayIndexMax idx;
2918         pmem |gID;
2919         pmem |idx;
2920         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2921         donotCountMigration=1;
2922         mgr->resume(idx,pmem,CmiTrue);
2923         donotCountMigration=0;
2924         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2925         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2926         idx.print();
2927
2928         CkLocRec *rec = mgr->elementRec(idx);
2929         CmiAssert(rec->type() == CkLocRec::local);
2930         
2931         CkVec<CkMigratable *> eltList;
2932         mgr->migratableList((CkLocRec_local *)rec,eltList);
2933         for(int i=0;i<eltList.size();i++){
2934                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
2935                         CpvAccess(_currentObj) = eltList[i];
2936                         eltList[i]->ResumeFromSync();
2937                 }
2938         }
2939 }
2940
2941
2942 /** this method is used to send messages to a restarted processor to tell
2943  * it that a particular expected object is not going to get to it */
2944 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
2945         DummyMigrationMsg buf;
2946         buf.flag = MLOG_OBJECT;
2947         buf.lbID = lbID;
2948         buf.mgrID = locMgrID;
2949         buf.idx = idx;
2950         buf.locationPE = locationPE;
2951         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2952         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
2953 };
2954
2955
2956 /**this method is used by a restarted processor to tell other processors
2957  * that they are not going to receive these many objects.. just the count
2958  * not the objects themselves ***/
2959
2960 void sendDummyMigrationCounts(int *dummyCounts){
2961         DummyMigrationMsg buf;
2962         buf.flag = MLOG_COUNT;
2963         buf.lbID = globalLBID;
2964         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
2965         for(int i=0;i<CmiNumPes();i++){
2966                 if(i != CmiMyPe() && dummyCounts[i] != 0){
2967                         buf.count = dummyCounts[i];
2968                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
2969                 }
2970         }
2971 }
2972
2973
2974 /** this handler is used to process a dummy migration msg.
2975  * it looks up the load balancer and calls migrated for it */
2976
2977 void _dummyMigrationHandler(DummyMigrationMsg *msg){
2978         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
2979         if(msg->flag == MLOG_OBJECT){
2980                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
2981                 LDObjHandle h;
2982                 lb->Migrated(h,1);
2983         }
2984         if(msg->flag == MLOG_COUNT){
2985                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
2986                 msg->count -= verifyAckedRequests;
2987                 for(int i=0;i<msg->count;i++){
2988                         LDObjHandle h;
2989                         lb->Migrated(h,1);
2990                 }
2991         }
2992         verifyAckedRequests=0;
2993         CmiFree(msg);
2994 };
2995
2996 /*****************************************************
2997         Implementation of a method that can be used to call
2998         any method on the ChareMlogData of all the chares on
2999         a processor currently
3000 ******************************************************/
3001
3002
3003 class ElementCaller :  public CkLocIterator {
3004 private:
3005         CkLocMgr *locMgr;
3006         MlogFn fnPointer;
3007         void *data;
3008 public:
3009         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3010                 locMgr = _locMgr;
3011                 fnPointer = _fnPointer;
3012                 data = _data;
3013         };
3014         void addLocation(CkLocation &loc){
3015                 CkVec<CkMigratable *> list;
3016                 CkLocRec_local *local = loc.getLocalRecord();
3017                 locMgr->migratableList (local,list);
3018                 for(int i=0;i<list.size();i++){
3019                         CkMigratable *migratableElement = list[i];
3020                         fnPointer(data,migratableElement->mlogData);
3021                 }
3022         }
3023 };
3024
3025 /**
3026  * Map function pointed by fnPointer over all the chares living in this processor.
3027  */
3028 void forAllCharesDo(MlogFn fnPointer, void *data){
3029         int numGroups = CkpvAccess(_groupIDTable)->size();
3030         for(int i=0;i<numGroups;i++){
3031                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3032                 fnPointer(data,obj->mlogData);
3033         }
3034         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3035         for(int i=0;i<numNodeGroups;i++){
3036                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3037                 fnPointer(data,obj->mlogData);
3038         }
3039         int i;
3040         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3041 };
3042
3043
3044 /******************************************************************
3045  Load Balancing
3046 ******************************************************************/
3047
3048 /**
3049  * This is the first time Converse is called after AtSync method has been called by every local object.
3050  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3051  * message logging, we can take advantage of this situation and garbage collect at this point.
3052  */
3053 void initMlogLBStep(CkGroupID gid){
3054         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3055         countLBMigratedAway = 0;
3056         countLBToMigrate=0;
3057         onGoingLoadBalancing=1;
3058         migrationDoneCalled=0;
3059         checkpointBarrierCount=0;
3060         if(globalLBID.idx != 0){
3061                 CmiAssert(globalLBID.idx == gid.idx);
3062         }
3063         globalLBID = gid;
3064 #if SYNCHRONIZED_CHECKPOINT
3065         garbageCollectMlog();
3066 #endif
3067 }
3068
3069 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3070         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3071         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3072         
3073         resumeLbFnPtr = _fnPtr;
3074         centralLb = _centralLb;
3075         migrationDoneCalled = 1;
3076         if(countLBToMigrate == countLBMigratedAway){
3077                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3078                 startMlogCheckpoint(NULL,CmiWallTimer());       
3079         }
3080 };
3081
3082 void finishedCheckpointLoadBalancing(){
3083         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3084         CheckpointBarrierMsg msg;
3085         msg.fromPE = CmiMyPe();
3086         msg.checkpointCount = checkpointCount;
3087
3088         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3089         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3090         
3091 };
3092
3093
3094 void sendMlogLocation(int targetPE, envelope *env){
3095 #if !SYNCHRONIZED_CHECKPOINT
3096         void *_msg = EnvToUsr(env);
3097         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3098
3099         int existing = 0;
3100         //if this object is already in the retainedobjectlust destined for this
3101         //processor it should not be sent
3102         
3103         for(int i=0;i<retainedObjectList.size();i++){
3104                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3105                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3106                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3107                         existing = 1;
3108                         break;
3109                 }
3110         }
3111
3112         if(existing){
3113                 return;
3114         }
3115         
3116         countLBToMigrate++;
3117         
3118         MigrationNotice migMsg;
3119         migMsg.migRecord.gID = msg->gid;
3120         migMsg.migRecord.idx = msg->idx;
3121         migMsg.migRecord.fromPE = CkMyPe();
3122         migMsg.migRecord.toPE =  targetPE;
3123         
3124         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3125         
3126         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3127         retainedObject->migRecord = migMsg.migRecord;
3128         retainedObject->acked  = 0;
3129         
3130         CkPackMessage(&env);
3131         
3132         migMsg.record = retainedObject;
3133         retainedObject->msg = env;
3134         int size = retainedObject->size = env->getTotalsize();
3135         
3136         retainedObjectList.push_back(retainedObject);
3137         
3138         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3139         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3140         
3141         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3142 #endif
3143 }
3144
3145 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3146         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3147         migratedNoticeList.push_back(msg->migRecord);
3148
3149         MigrationNoticeAck buf;
3150         buf.record = msg->record;
3151         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3152         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3153 }
3154
3155 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3156         
3157         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3158         retainedObject->acked = 1;
3159
3160         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3161         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3162
3163         //inform home about the new location of this object
3164         CkGroupID gID = retainedObject->migRecord.gID ;
3165         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3166         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3167         
3168         countLBMigratedAway++;
3169         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3170                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3171                 startMlogCheckpoint(NULL,CmiWallTimer());
3172         }
3173 };
3174
3175 void _receiveMlogLocationHandler(void *buf){
3176         envelope *env = (envelope *)buf;
3177         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3178         CkUnpackMessage(&env);
3179         void *_msg = EnvToUsr(env);
3180         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3181         CkGroupID gID= msg->gid;
3182         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3183         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3184         CpvAccess(_currentObj)=mgr;
3185         mgr->immigrate(msg);
3186 };
3187
3188
3189 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3190 /*      if(mlogData->objID.type == TypeArray){
3191                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3192         //      TODO: make sure later that atSync has been called and it needs 
3193         //      to be resumed from sync
3194         //
3195                 CpvAccess(_currentObj) = elt;
3196                 elt->ResumeFromSync();
3197         }*/
3198 }
3199
3200 /**
3201  * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
3202  */
3203 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3204         if(checkpointBarrierCount == CmiNumPes()){
3205                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3206                 for(int i=0;i<CmiNumPes();i++){
3207                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3208                 }
3209         }
3210 }
3211
3212 /**
3213  * @brief Processor 0 receives a contribution from every other processor after checkpoint.
3214  */ 
3215 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3216         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3217         if(msg->checkpointCount == checkpointCount){
3218                 checkpointBarrierCount++;
3219                 checkAndSendCheckpointBarrierAcks(msg);
3220         }else{
3221                 if(msg->checkpointCount-1 == checkpointCount){
3222                         checkpointBarrierCount++;
3223                         checkAndSendCheckpointBarrierAcks(msg);
3224                 }else{
3225                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3226                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3227                 }
3228         }
3229