Making use of both message-logging techniques homogeneous in Charm++ core.
[charm.git] / src / ck-core / ckmessagelogging.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_MLOG_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_TEAM 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x)  //x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
41 #define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
42 #define DEBUG_RECOVERY(x) //x
43
44 extern const char *idx2str(const CkArrayIndex &ind);
45 extern const char *idx2str(const ArrayElement *el);
46
47 void getGlobalStep(CkGroupID gID);
48
49 bool fault_aware(CkObjID &recver);
50 void sendCheckpointData(int mode);
51 void createObjIDList(void *data,ChareMlogData *mlogData);
52 inline bool isLocal(int destPE);
53 inline bool isTeamLocal(int destPE);
54 void printLog(TProcessedLog *log);
55
56 int _restartFlag=0;
57 int _numRestartResponses=0;
58
59 //TODO: remove for perf runs
60 int countHashRefs=0; //count the number of gets
61 int countHashCollisions=0;
62
63 char *checkpointDirectory=".";
64 int unAckedCheckpoint=0;
65
66 int countLocal=0,countBuffered=0;
67 int countPiggy=0;
68 int countClearBufferedLocalCalls=0;
69
70 int countUpdateHomeAcks=0;
71
72 extern int teamSize;
73 extern int chkptPeriod;
74 extern bool fastRecovery;
75 extern int parallelRecovery;
76
77 char *killFile;
78 char *faultFile;
79 int killFlag=0;
80 int faultFlag=0;
81 int restartingMlogFlag=0;
82 void readKillFile();
83 double killTime=0.0;
84 double faultMean;
85 int checkpointCount=0;
86
87 CpvDeclare(Chare *,_currentObj);
88 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
89 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
90 CpvDeclare(Queue, _outOfOrderMessageQueue);
91 CpvDeclare(Queue, _delayedRemoteMessageQueue);
92 CpvDeclare(char **,_bufferedTicketRequests);
93 CpvDeclare(int *,_numBufferedTicketRequests);
94
95 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
96 /** @note All the determinants generated by a PE are stored in variable _localDets.
97  * As soon as a message is sent, then all the determinants are appended to the message,
98  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
99  * to delete the determinants. In the meantime the same determinants may be appended
100  * to other messages and more determinants can be added to _localDets.
101  * A simple solution to this problem was to have a primitive array and keep adding determinants
102  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
103  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
104  * determinant and a number of how many valid determinants there are behind it. We do not remove
105  * determinants until a checkpoint is made, since these determinants may have to be added to
106  * messages in case of a recovery.
107  */
108 // temporal storage for the outgoing determinants
109 CpvDeclare(char *, _localDets);
110 // number of buffered determinants
111 int _numBufferedDets;
112 // current index of first determinant in _localDets
113 int _indexBufferedDets;
114 // current phase of determinants
115 int _phaseBufferedDets;
116
117 // stores the determinants from other nodes, to send them in case of a crash
118 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
119 // max number of buffered determinants
120 int _maxBufferedDets;
121
122 // stores the incarnation number from every other processor
123 CpvDeclare(char *, _incarnation);
124
125 // storage for remove determinants header
126 CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
127 // storage for store determinants header
128 CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
129 // storage for the sizes in vector-send to store determinants
130 CpvDeclare(int *, _storeDetsSizes);
131 // storage for the pointers in vector-send to store determinants
132 CpvDeclare(char **, _storeDetsPtrs);
133
134 /***** *****/
135
136 /***** VARIABLES FOR PARALLEL RECOVERY *****/
137 CpvDeclare(int, _numEmigrantRecObjs);
138 CpvDeclare(int, _numImmigrantRecObjs);
139 CpvDeclare(CkVec<CkLocation *> *, _immigrantRecObjs);
140 /***** *****/
141
142 #if COLLECT_STATS_MSGS
143 int *numMsgsTarget;
144 int *sizeMsgsTarget;
145 int totalMsgsTarget;
146 float totalMsgsSize;
147 #endif
148 #if COLLECT_STATS_DETS
149 int numPiggyDets;
150 int numDets;
151 int numDupDets;
152 #endif
153 #if COLLECT_STATS_MEMORY
154 int msgLogSize;
155 int bufferedDetsSize;
156 int storedDetsSize;
157 #endif
158 //TML: variables for measuring savings with teams in message logging
159 #if COLLECT_STATS_TEAM
160 float MLOGFT_totalLogSize = 0.0;
161 float MLOGFT_totalMessages = 0.0;
162 #endif
163
164 static double adjustChkptPeriod=0.0; //in ms
165 static double nextCheckpointTime=0.0;//in seconds
166 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
167
168 int _pingHandlerIdx;
169
170 char objString[100];
171 int _checkpointRequestHandlerIdx;
172 int _storeCheckpointHandlerIdx;
173 int _checkpointAckHandlerIdx;
174 int _getCheckpointHandlerIdx;
175 int _recvCheckpointHandlerIdx;
176 int _removeProcessedLogHandlerIdx;
177
178 int _verifyAckRequestHandlerIdx;
179 int _verifyAckHandlerIdx;
180 int _dummyMigrationHandlerIdx;
181
182
183 int     _getGlobalStepHandlerIdx;
184 int     _recvGlobalStepHandlerIdx;
185
186 int _updateHomeRequestHandlerIdx;
187 int _updateHomeAckHandlerIdx;
188 int _resendMessagesHandlerIdx;
189 int _sendDetsHandlerIdx;
190 int _sendDetsReplyHandlerIdx;
191 int _receivedTNDataHandlerIdx;
192 int _receivedDetDataHandlerIdx;
193 int _distributedLocationHandlerIdx;
194 int _sendBackLocationHandlerIdx;
195 int _storeDeterminantsHandlerIdx;
196 int _removeDeterminantsHandlerIdx;
197
198 //TML: integer constants for team-based message logging
199 int _restartHandlerIdx;
200 int _getRestartCheckpointHandlerIdx;
201 int _recvRestartCheckpointHandlerIdx;
202 void setTeamRecovery(void *data, ChareMlogData *mlogData);
203 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
204
205 int verifyAckTotal;
206 int verifyAckCount;
207
208 int verifyAckedRequests=0;
209
210 RestartRequest *storedRequest;
211
212 int _falseRestart =0; /**
213                                                                                                         For testing on clusters we might carry out restarts on 
214                                                                                                         a porcessor without actually starting it
215                                                                                                         1 -> false restart
216                                                                                                         0 -> restart after an actual crash
217                                                                                                 */
218
219 //Load balancing globals
220 int onGoingLoadBalancing=0;
221 void *centralLb;
222 void (*resumeLbFnPtr)(void *);
223 int _receiveMlogLocationHandlerIdx;
224 int _receiveMigrationNoticeHandlerIdx;
225 int _receiveMigrationNoticeAckHandlerIdx;
226 int _checkpointBarrierHandlerIdx;
227 int _checkpointBarrierAckHandlerIdx;
228
229 CkVec<MigrationRecord> migratedNoticeList;
230 CkVec<RetainedMigratedObject *> retainedObjectList;
231 int donotCountMigration=0;
232 int countLBMigratedAway=0;
233 int countLBToMigrate=0;
234 int migrationDoneCalled=0;
235 int checkpointBarrierCount=0;
236 int globalResumeCount=0;
237 CkGroupID globalLBID;
238 int restartDecisionNumber=-1;
239
240 double lastCompletedAlarm=0;
241 double lastRestart=0;
242
243 //update location globals
244 int _receiveLocationHandlerIdx;
245
246
247 /** 
248  * @brief Initialize message logging data structures and register handlers
249  */
250 void _messageLoggingInit(){
251         //current object
252         CpvInitialize(Chare *,_currentObj);
253         
254         //registering handlers for message logging
255         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
256                 
257         //handlers for checkpointing
258         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
259         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
260         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
261         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
262
263         //handlers for restart
264         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
265         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
266         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
267         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
268         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
269         _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
270         _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
271         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
272         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
273         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
274         _sendBackLocationHandlerIdx=CkRegisterHandler((CmiHandler)_sendBackLocationHandler);
275         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
276         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
277         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
278
279         //TML: handlers for team-based message logging
280         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
281         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
282         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
283
284         // handlers for causal message logging
285         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
286         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
287         
288         //handlers for load balancing
289         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
290         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
291         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
292         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
293         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
294         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
295         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
296         
297         //handlers for updating locations
298         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
299         
300         //Cpv variables for message logging
301         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
302         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
303         CpvInitialize(Queue, _outOfOrderMessageQueue);
304         CpvInitialize(Queue, _delayedRemoteMessageQueue);
305         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
306         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
307         
308         CpvInitialize(char **,_bufferedTicketRequests);
309         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
310         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
311         for(int i=0;i<CkNumPes();i++){
312                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
313                 CpvAccess(_numBufferedTicketRequests)[i]=0;
314         }
315  
316         // Cpv variables for causal protocol
317         _numBufferedDets = 0;
318         _indexBufferedDets = 0;
319         _phaseBufferedDets = 0;
320         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
321         CpvInitialize(char *, _localDets);
322         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
323         CpvInitialize(char *, _incarnation);
324         CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
325         CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
326         CpvInitialize(int *, _storeDetsSizes);
327         CpvInitialize(char **, _storeDetsPtrs);
328         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
329         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
330         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
331         for(int i=0; i<CmiNumPes(); i++){
332                 CpvAccess(_incarnation)[i] = 0;
333         }
334         CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
335         CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
336         CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
337         CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
338
339         // Cpv variables for parallel recovery
340         CpvInitialize(int, _numEmigrantRecObjs);
341     CpvAccess(_numEmigrantRecObjs) = 0;
342     CpvInitialize(int, _numImmigrantRecObjs);
343     CpvAccess(_numImmigrantRecObjs) = 0;
344
345     CpvInitialize(CkVec<CkLocation *> *, _immigrantRecObjs);
346     CpvAccess(_immigrantRecObjs) = new CkVec<CkLocation *>;
347
348         //Cpv variables for checkpoint
349         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
350         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
351
352         // registering user events for projections      
353         traceRegisterUserEvent("Remove Logs", 20);
354         traceRegisterUserEvent("Ticket Request Handler", 21);
355         traceRegisterUserEvent("Ticket Handler", 22);
356         traceRegisterUserEvent("Local Message Copy Handler", 23);
357         traceRegisterUserEvent("Local Message Ack Handler", 24);        
358         traceRegisterUserEvent("Preprocess current message",25);
359         traceRegisterUserEvent("Preprocess past message",26);
360         traceRegisterUserEvent("Preprocess future message",27);
361         traceRegisterUserEvent("Checkpoint",28);
362         traceRegisterUserEvent("Checkpoint Store",29);
363         traceRegisterUserEvent("Checkpoint Ack",30);
364         traceRegisterUserEvent("Send Ticket Request",31);
365         traceRegisterUserEvent("Generalticketrequest1",32);
366         traceRegisterUserEvent("TicketLogLocal",33);
367         traceRegisterUserEvent("next_ticket and SN",34);
368         traceRegisterUserEvent("Timeout for buffered remote messages",35);
369         traceRegisterUserEvent("Timeout for buffered local messages",36);
370         traceRegisterUserEvent("Inform Location Home",37);
371         traceRegisterUserEvent("Receive Location Handler",38);
372         
373         lastCompletedAlarm=CmiWallTimer();
374         lastRestart = CmiWallTimer();
375
376 #if COLLECT_STATS_MSGS
377 #if COLLECT_STATS_MSGS_TOTAL
378         totalMsgsTarget = 0;
379         totalMsgsSize = 0.0;
380 #else
381         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
382         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
383         for(int i=0; i<CmiNumPes(); i++){
384                 numMsgsTarget[i] = 0;
385                 sizeMsgsTarget[i] = 0;
386         }
387 #endif
388 #endif
389 #if COLLECT_STATS_DETS
390         numPiggyDets = 0;
391         numDets = 0;
392         numDupDets = 0;
393 #endif
394 #if COLLECT_STATS_MEMORY
395         msgLogSize = 0;
396         bufferedDetsSize = 0;
397         storedDetsSize = 0;
398 #endif
399
400
401 }
402
403 void killLocal(void *_dummy,double curWallTime);        
404
405 void readKillFile(){
406         FILE *fp=fopen(killFile,"r");
407         if(!fp){
408                 return;
409         }
410         int proc;
411         double sec;
412         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
413                 if(proc == CkMyPe()){
414                         killTime = CmiWallTimer()+sec;
415                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
416                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
417                 }
418         }
419         fclose(fp);
420 }
421
422 /**
423  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
424  * We assume an exponential distribution for the mean-time-between-failures.
425  */
426 void readFaultFile(){
427         FILE *fp=fopen(faultFile,"r");
428         if(!fp){
429                 return;
430         }
431         int proc;
432         double sec;
433         fscanf(fp,"%d %lf",&proc,&sec);
434         faultMean = sec;
435         if(proc == CkMyPe()){
436                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
437                 CcdCallFnAfter(killLocal,NULL,sec*1000);
438         }
439         fclose(fp);
440 }
441
442 void killLocal(void *_dummy,double curWallTime){
443         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
444         if(CmiWallTimer()<killTime-1){
445                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
446         }else{  
447                 kill(getpid(),SIGKILL);
448         }
449 }
450
451 /*** Auxiliary Functions ***/
452
453 /**
454  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
455  * determinants needs to be extended.
456  */
457 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
458         Determinant *det, *auxDet;
459         char *aux;
460
461         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
462
463         // checking if we are overflowing the buffered determinants array
464         if(_indexBufferedDets >= _maxBufferedDets){
465                 aux = CpvAccess(_localDets);
466                 _maxBufferedDets *= 2;
467                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
468                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
469                 CmiFree(aux);
470         }
471
472         // adding the new determinant at the end
473         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
474         det->sender = sender;
475         det->receiver = receiver;
476         det->SN = SN;
477         det->TN = TN;
478         _numBufferedDets++;
479         _indexBufferedDets++;
480
481 #if COLLECT_STATS_MEMORY
482         bufferedDetsSize++;
483 #endif
484 #if COLLECT_STATS_DETS
485         numDets++;
486 #endif
487 }
488
489 /************************ Message logging methods ****************/
490
491 /**
492  * Sends a group message that might be a broadcast.
493  */
494 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
495         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
496                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
497                 void *origMsg = EnvToUsr(env);
498                 for(int i=0;i<CmiNumPes();i++){
499                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
500                                 void *copyMsg = CkCopyMsg(&origMsg);
501                                 envelope *copyEnv = UsrToEnv(copyMsg);
502                                 copyEnv->SN=0;
503                                 copyEnv->TN=0;
504                                 copyEnv->sender.type = TypeInvalid;
505                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
506                                 sendGroupMsg(copyEnv,i,_infoIdx);
507                         }
508                 }
509                 return;
510         }
511
512         // initializing values of envelope
513         env->SN=0;
514         env->TN=0;
515         env->sender.type = TypeInvalid;
516
517         CkObjID recver;
518         recver.type = TypeGroup;
519         recver.data.group.id = env->getGroupNum();
520         recver.data.group.onPE = destPE;
521         sendCommonMsg(recver,env,destPE,_infoIdx);
522 }
523
524 /**
525  * Sends a nodegroup message that might be a broadcast.
526  */
527 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
528         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
529                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
530                 void *origMsg = EnvToUsr(env);
531                 for(int i=0;i<CmiNumNodes();i++){
532                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
533                                 void *copyMsg = CkCopyMsg(&origMsg);
534                                 envelope *copyEnv = UsrToEnv(copyMsg);
535                                 copyEnv->SN=0;
536                                 copyEnv->TN=0;
537                                 copyEnv->sender.type = TypeInvalid;
538                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
539                         }
540                 }
541                 return;
542         }
543
544         // initializing values of envelope
545         env->SN=0;
546         env->TN=0;
547         env->sender.type = TypeInvalid;
548
549         CkObjID recver;
550         recver.type = TypeNodeGroup;
551         recver.data.group.id = env->getGroupNum();
552         recver.data.group.onPE = destNode;
553         sendCommonMsg(recver,env,destNode,_infoIdx);
554 }
555
556 /**
557  * Sends a message to an array element.
558  */
559 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
560         CkObjID recver;
561         recver.type = TypeArray;
562         recver.data.array.id = env->getsetArrayMgr();
563         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
564
565         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
566                 char recverString[100],senderString[100];
567                 
568                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
569         }
570
571         // initializing values of envelope
572         env->SN = 0;
573         env->TN = 0;
574
575         sendCommonMsg(recver,env,destPE,_infoIdx);
576 };
577
578 /**
579  * Sends a message to a singleton chare.
580  */
581 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
582         CkObjID recver;
583         recver.type = TypeChare;
584         recver.data.chare.id = *pCid;
585
586         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
587                 char recverString[100],senderString[100];
588                 
589                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
590         }
591
592         // initializing values of envelope
593         env->SN = 0;
594         env->TN = 0;
595
596         sendCommonMsg(recver,env,destPE,_infoIdx);
597 };
598
599 /**
600  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
601  */
602 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
603         envelope *env = _env;
604         MCount ticketNumber = 0;
605         int resend=0; //is it a resend
606         char recverName[100];
607         char senderString[100];
608         double _startTime=CkWallTimer();
609         
610         DEBUG_MEM(CmiMemoryCheck());
611
612         if(CpvAccess(_currentObj) == NULL){
613 //              CkAssert(0);
614                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
615                 generalCldEnqueue(destPE,env,_infoIdx);
616                 return;
617         }
618
619         // checking if this message should bypass determinants in message-logging
620         if(env->flags & CK_BYPASS_DET_MLOG){
621                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
622                 env->recver = recver;
623                 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
624                 generalCldEnqueue(destPE,env,_infoIdx);
625                 return;
626         }
627         
628         // setting message logging data in the envelope
629         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
630         if(env->sender.type == TypeInvalid){
631                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
632         }else{
633 //              envelope *copyEnv = copyEnvelope(env);
634 //              env = copyEnv;
635                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
636                 env->SN = 0;
637         }
638         
639         DEBUG_MEM(CmiMemoryCheck());
640
641         CkObjID &sender = env->sender;
642         env->recver = recver;
643
644         Chare *obj = (Chare *)env->sender.getObject();
645           
646         if(env->SN == 0){
647                 DEBUG_MEM(CmiMemoryCheck());
648                 env->SN = obj->mlogData->nextSN(recver);
649         }else{
650                 resend = 1;
651         }
652         
653 //      if(env->SN != 1){
654                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
655         //      CmiPrintStackTrace(0);
656 /*      }else{
657                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
658         }*/
659                 
660 //      CkPackMessage(&(mEntry->env));
661 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
662         
663         _startTime = CkWallTimer();
664
665         // uses the proper ticketing mechanism for local, team and general messages
666         if(isLocal(destPE)){
667                 sendLocalMsg(env, _infoIdx);
668         }else{
669                 if((teamSize > 1) && isTeamLocal(destPE)){
670
671                         // look to see if this message already has a ticket in the team-table
672                         Chare *senderObj = (Chare *)sender.getObject();
673                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
674                         if(ticketRow != NULL){
675                                 Ticket ticket = ticketRow->get(env->SN);
676                                 if(ticket.TN != 0){
677                                         ticketNumber = ticket.TN;
678                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
679                                 }
680                         }
681                 }
682                 
683                 // sending the message
684                 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
685                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
686                 
687         }
688 }
689
690 /**
691  * Determines if the message is local or not. A message is local if:
692  * 1) Both the destination and origin are the same PE.
693  */
694 inline bool isLocal(int destPE){
695         // both the destination and the origin are the same PE
696         if(destPE == CkMyPe())
697                 return true;
698
699         return false;
700 }
701
702 /**
703  * Determines if the message is group local or not. A message is group local if:
704  * 1) They belong to the same group in the group-based message logging.
705  */
706 inline bool isTeamLocal(int destPE){
707
708         // they belong to the same group
709         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
710                 return true;
711
712         return false;
713 }
714
715 /**
716  * Method that does the actual send by creating a ticket request filling it up and sending it.
717  */
718 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
719         DEBUG_NOW(char recverString[100]);
720         DEBUG_NOW(char senderString[100]);
721
722         int totalSize;
723
724         envelope *env = entry->env;
725         DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
726
727         // setting all the information
728         Chare *obj = (Chare *)entry->env->sender.getObject();
729         entry->env->recver = recver;
730         entry->env->SN = SN;
731         entry->env->TN = TN;
732         if(!resend){
733                 //TML: only stores message if either it goes to this processor or to a processor in a different group
734                 if(!isTeamLocal(entry->destPE)){
735                         obj->mlogData->addLogEntry(entry);
736 #if COLLECT_STATS_TEAM
737                         MLOGFT_totalMessages += 1.0;
738                         MLOGFT_totalLogSize += entry->env->getTotalsize();
739 #endif
740                 }else{
741                         // the message has to be deleted after it has been sent
742                         entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
743                 }
744         }
745
746         // sending the determinants that causally affect this message
747         if(_numBufferedDets > 0){
748
749                 // modifying the actual number of determinants sent in message
750                 CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
751                 CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
752                 CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
753                 CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
754         
755                 // sending the message
756                 CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
757                 CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
758                 CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
759                 CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
760                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
761                 CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
762                 CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
763         }
764
765         // updating its message log entry
766         entry->indexBufDets = _indexBufferedDets;
767         entry->numBufDets = _numBufferedDets;
768
769         // sending the message
770         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
771
772         DEBUG_MEM(CmiMemoryCheck());
773 #if COLLECT_STATS_MSGS
774 #if COLLECT_STATS_MSGS_TOTAL
775         totalMsgsTarget++;
776         totalMsgsSize += (float)env->getTotalsize();
777 #else
778         numMsgsTarget[destPE]++;
779         sizeMsgsTarget[destPE] += env->getTotalsize();
780 #endif
781 #endif
782 #if COLLECT_STATS_DETS
783         numPiggyDets += _numBufferedDets;
784 #endif
785 #if COLLECT_STATS_MEMORY
786         msgLogSize += env->getTotalsize();
787 #endif
788 };
789
790
791 /**
792  * @brief Function to send a local message. It first gets a ticket and
793  * then enqueues the message. If we are recovering, then the message 
794  * is enqueued in a delay queue.
795  */
796 void sendLocalMsg(envelope *env, int _infoIdx){
797         DEBUG_PERF(double _startTime=CkWallTimer());
798         DEBUG_MEM(CmiMemoryCheck());
799         DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
800         DEBUG(char senderString[100]);
801         DEBUG(char recverString[100]);
802         Ticket ticket;
803
804         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
805
806         // getting the receiver local object
807         Chare *recverObj = (Chare *)env->recver.getObject();
808
809         // if receiver object is not NULL, we will ask it for a ticket
810         if(recverObj){
811
812 /*HERE          // if we are recovery, then we must put this off for later
813                 if(recverObj->mlogData->restartFlag){
814                         CpvAccess(_delayedLocalMsgs)->enq(entry);
815                         return;
816                 }
817
818                 // check if this combination of sender and SN has been already a ticket
819                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
820                         
821                 // assigned a new ticket in case this message is completely new
822                 if(ticket.TN == 0){
823                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
824
825                         // if TN == 0 we enqueue this message since we are recovering from a crash
826                         if(ticket.TN == 0){
827                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
828                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
829                                 
830 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
831                                 return;
832                         }
833                 }
834
835                 //TODO: check for the case when an invalid ticket is returned
836                 //TODO: check for OLD or RECEIVED TICKETS
837                 entry->env->TN = ticket.TN;
838                 CkAssert(entry->env->TN > 0);
839                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
840         
841                 DEBUG_MEM(CmiMemoryCheck());
842
843                 // adding the determinant to _localDets
844                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
845 */
846                 // sends the local message
847                 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
848
849                 DEBUG_MEM(CmiMemoryCheck());
850         }else{
851                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
852         }
853 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
854 };
855
856 /****
857         The handler functions
858 *****/
859
860 /*** CAUSAL PROTOCOL HANDLERS ***/
861
862 /**
863  * @brief Removes the determinants after a particular index in the _localDets array.
864  */
865 void _removeDeterminantsHandler(char *buffer){
866         RemoveDeterminantsHeader *header;
867         int index, phase;
868
869         // getting the header from the message
870         header = (RemoveDeterminantsHeader *)buffer;
871         index = header->index;
872         phase = header->phase;
873
874         // fprintf(stderr,"ACK index=%d\n",index);
875
876         // updating the number of buffered determinants
877         if(phase == _phaseBufferedDets){
878                 if(index > _indexBufferedDets)
879                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
880                 CmiAssert(index <= _indexBufferedDets);
881                 _numBufferedDets = _indexBufferedDets - index;
882         }
883
884         // releasing memory
885         CmiFree(buffer);
886
887 }
888
889 /**
890  * @brief Stores the determinants coming from other processor.
891  */
892 void _storeDeterminantsHandler(char *buffer){
893         StoreDeterminantsHeader *header;
894         Determinant *detPtr, det;
895         int i, n, index, phase, destPE;
896         CkVec<Determinant> *vec;
897
898         // getting the header from the message and pointing to the first determinant
899         header = (StoreDeterminantsHeader *)buffer;
900         n = header->number;
901         index = header->index;
902         phase = header->phase;
903         destPE = header->PE;
904         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
905
906         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
907         DEBUG_MEM(CmiMemoryCheck());
908
909         // going through all determinants and storing them into _remoteDets
910         for(i = 0; i < n; i++){
911                 det.sender = detPtr->sender;
912                 det.receiver = detPtr->receiver;
913                 det.SN = detPtr->SN;
914                 det.TN = detPtr->TN;
915
916                 // getting the determinant array
917                 vec = CpvAccess(_remoteDets)->get(det.receiver);
918                 if(vec == NULL){
919                         vec = new CkVec<Determinant>();
920                         CpvAccess(_remoteDets)->put(det.receiver) = vec;
921                 }
922 #if COLLECT_STATS_DETS
923 #if COLLECT_STATS_DETS_DUP
924                 for(int j=0; j<vec->size(); j++){
925                         if(isSameDet(&(*vec)[j],&det)){
926                                 numDupDets++;
927                                 break;
928                         }
929                 }
930 #endif
931 #endif
932 #if COLLECT_STATS_MEMORY
933                 storedDetsSize++;
934 #endif
935                 vec->push_back(det);
936                 detPtr = detPtr++;
937         }
938
939         DEBUG_MEM(CmiMemoryCheck());
940
941         // freeing buffer
942         CmiFree(buffer);
943
944         // sending the ACK back to the sender
945         CpvAccess(_removeDetsHeader)->index = index;
946         CpvAccess(_removeDetsHeader)->phase = phase;
947         CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
948         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
949         
950 }
951
952 /**
953  *  If there are any delayed requests, process them first before 
954  *  processing this request
955  * */
956 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
957         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
958         double  _startTime = CkWallTimer();
959         CmiFree(ticketRequest);
960 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
961 }
962
963
964 /**
965  * @brief Gets a ticket for a recently received message.
966  * @pre env->recver has to be on this processor.
967  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
968  */
969 inline bool _getTicket(envelope *env, int *flag){
970         DEBUG_MEM(CmiMemoryCheck());
971         DEBUG(char recverName[100]);
972         DEBUG(char senderString[100]);
973         Ticket ticket;
974         
975         // getting information from request
976         CkObjID sender = env->sender;
977         CkObjID recver = env->recver;
978         MCount SN = env->SN;
979         MCount TN = env->TN;
980         Chare *recverObj = (Chare *)recver.getObject();
981         
982         DEBUG(recver.toString(recverName);)
983
984         // verifying whether we are recovering from a failure or not
985         if(recverObj->mlogData->restartFlag)
986                 return false;
987
988         // checking ticket table to see if this message already received a ticket
989         ticket = recverObj->mlogData->getTicket(sender, SN);
990         TN = ticket.TN;
991         
992         // checking if the message is team local and if it has a ticket already assigned
993         if(teamSize > 1 && TN != 0){
994                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
995                 recverObj->mlogData->verifyTicket(sender,SN,TN);
996         }
997
998         //check if a ticket for this has been already handed out to an object that used to be local but 
999         // is no longer so.. need for parallel restart
1000         if(TN == 0){
1001                 ticket = recverObj->mlogData->next_ticket(sender,SN);
1002                 *flag = NEW_TICKET;
1003         } else {
1004                 *flag = OLD_TICKET;
1005         }
1006         if(ticket.TN > recverObj->mlogData->tProcessed){
1007                 ticket.state = NEW_TICKET;
1008         } else {
1009                 ticket.state = OLD_TICKET;
1010         }
1011         // @todo: check for the case when an invalid ticket is returned
1012         if(ticket.TN == 0){
1013                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
1014                 return false;
1015         }
1016                 
1017         // setting the ticket number in the envelope
1018         env->TN = ticket.TN;
1019         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
1020         return true;
1021
1022 };
1023
1024 bool fault_aware(CkObjID &recver){
1025         switch(recver.type){
1026                 case TypeChare:
1027                         return true;
1028                 case TypeMainChare:
1029                         return false;
1030                 case TypeGroup:
1031                 case TypeNodeGroup:
1032                 case TypeArray:
1033                         return true;
1034                 default:
1035                         return false;
1036         }
1037 };
1038
1039 /* Preprocesses a received message */
1040 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
1041         DEBUG_NOW(char recverString[100]);
1042         DEBUG_NOW(char senderString[100]);
1043         DEBUG_MEM(CmiMemoryCheck());
1044         int flag;
1045         bool ticketSuccess;
1046
1047         // getting the receiver object
1048         CkObjID recver = env->recver;
1049
1050         // checking for determinants bypass in message logging
1051         if(env->flags & CK_BYPASS_DET_MLOG){
1052                 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
1053                 return 1;       
1054         }
1055
1056         // checking if receiver is fault aware
1057         if(!fault_aware(recver)){
1058                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
1059                 return 1;
1060         }
1061
1062         Chare *obj = (Chare *)recver.getObject();
1063         *objPointer = obj;
1064         if(obj == NULL){
1065                 int possiblePE = recver.guessPE();
1066                 if(possiblePE != CkMyPe()){
1067                         int totalSize = env->getTotalsize();
1068                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1069                         
1070                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
1071                 }else{
1072                         // this is the case where a message is received and the object has not been initialized
1073                         // we delayed the delivery of the message
1074                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1075                         
1076                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1077                 }
1078                 return 0;
1079         }
1080
1081         // checking if message comes from an old incarnation
1082         // message must be discarded
1083         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1084                 CmiFree(env);
1085                 return 0;
1086         }
1087
1088         DEBUG_MEM(CmiMemoryCheck());
1089         DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1090
1091         // getting a ticket for this message
1092         ticketSuccess = _getTicket(env,&flag);
1093
1094         // we might be during recovery when this message arrives
1095         if(!ticketSuccess){
1096                 
1097                 // adding the message to a delay queue
1098                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1099                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1100
1101                 return 0;
1102         }
1103         
1104         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1105         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1106         
1107         if(flag == NEW_TICKET){
1108                 // storing determinant of message in data structure
1109                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1110         }
1111
1112         DEBUG_MEM(CmiMemoryCheck());
1113
1114         double _startTime = CkWallTimer();
1115 //env->sender.updatePosition(env->getSrcPe());
1116         if(env->TN == obj->mlogData->tProcessed+1){
1117                 //the message that needs to be processed now
1118                 DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1119                 // once we find a message that we can process we put back all the messages in the out of order queue
1120                 // back into the main scheduler queue. 
1121         DEBUG_MEM(CmiMemoryCheck());
1122                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1123                         void *qMsgPtr;
1124                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1125                         envelope *qEnv = (envelope *)qMsgPtr;
1126                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1127         DEBUG_MEM(CmiMemoryCheck());
1128                 }
1129 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1130                 //TODO: this might be a problem.. change made for leanMD
1131 //              CpvAccess(_currentObj) = obj;
1132         DEBUG_MEM(CmiMemoryCheck());
1133                 return 1;
1134         }
1135
1136         // checking if message has already been processed
1137         // message must be discarded
1138         if(env->TN <= obj->mlogData->tProcessed){
1139                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1140                 
1141                 CmiFree(env);
1142                 return 0;
1143         }
1144         //message that needs to be processed in the future
1145
1146         DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1147         //the message cant be processed now put it back in the out of order message Q, 
1148         //It will be transferred to the main queue later
1149         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1150 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1151         DEBUG_MEM(CmiMemoryCheck());
1152         
1153         return 0;
1154 }
1155
1156 /**
1157  * @brief Updates a few variables once a message has been processed.
1158  */
1159 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1160         DEBUG(char senderString[100]);
1161         if(obj){
1162                 if(sender.guessPE() == CkMyPe()){
1163                         if(entry != NULL){
1164                                 entry->env = NULL;
1165                         }
1166                 }
1167                 obj->mlogData->tProcessed++;
1168 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1169                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1170 //              CpvAccess(_currentObj)= NULL;
1171         }
1172         DEBUG_MEM(CmiMemoryCheck());
1173 }
1174
1175 /***
1176         Helpers for the handlers and message logging methods
1177 ***/
1178
1179 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1180 //      double _startTime = CkWallTimer();
1181         if(env->recver.type != TypeNodeGroup){
1182         //This repeats a step performed in skipCldEnq for messages sent to
1183         //other processors. I do this here so that messages to local processors
1184         //undergo the same transformation.. It lets the resend be uniform for 
1185         //all messages
1186 //              CmiSetXHandler(env,CmiGetHandler(env));
1187                 _skipCldEnqueue(destPE,env,_infoIdx);
1188         }else{
1189                 _noCldNodeEnqueue(destPE,env);
1190         }
1191 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1192 }
1193 //extern "C" int CmiGetNonLocalLength();
1194 /** This method is used to retry the ticket requests
1195  * that had been queued up earlier
1196  * */
1197
1198 int calledRetryTicketRequest=0;
1199
1200 void _pingHandler(CkPingMsg *msg){
1201         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1202         CmiFree(msg);
1203 }
1204
1205
1206 /*****************************************************************************
1207         Checkpointing methods..
1208                 Pack all the data on a processor and send it to the buddy periodically
1209                 Also used to throw away message logs
1210 *****************************************************************************/
1211 CkVec<TProcessedLog> processedTicketLog;
1212 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1213 void clearUpMigratedRetainedLists(int PE);
1214
1215 void checkpointAlarm(void *_dummy,double curWallTime){
1216         double diff = curWallTime-lastCompletedAlarm;
1217         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1218 /*      if(CkWallTimer()-lastRestart < 50){
1219                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1220                 return;
1221         }*/
1222         if(diff < ((chkptPeriod) - 2)){
1223                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1224                 return;
1225         }
1226         CheckpointRequest request;
1227         request.PE = CkMyPe();
1228         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1229         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1230 };
1231
1232 void _checkpointRequestHandler(CheckpointRequest *request){
1233         startMlogCheckpoint(NULL,CmiWallTimer());
1234 }
1235
1236 /**
1237  * @brief Starts the checkpoint phase after migration.
1238  */
1239 void startMlogCheckpoint(void *_dummy, double curWallTime){
1240         double _startTime = CkWallTimer();
1241
1242         // increasing the checkpoint counter
1243         checkpointCount++;
1244         
1245 #if DEBUG_CHECKPOINT
1246         if(CmiMyPe() == 0){
1247                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1248         }
1249 #endif
1250
1251         DEBUG_MEM(CmiMemoryCheck());
1252
1253         PUP::sizer psizer;
1254         psizer | checkpointCount;
1255         for(int i=0; i<CmiNumPes(); i++){
1256                 psizer | CpvAccess(_incarnation)[i];
1257         }
1258         CkPupROData(psizer);
1259         DEBUG_MEM(CmiMemoryCheck());
1260         CkPupGroupData(psizer,CmiTrue);
1261         DEBUG_MEM(CmiMemoryCheck());
1262         CkPupNodeGroupData(psizer,CmiTrue);
1263         DEBUG_MEM(CmiMemoryCheck());
1264         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1265         DEBUG_MEM(CmiMemoryCheck());
1266
1267         int dataSize = psizer.size();
1268         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1269         char *msg = (char *)CmiAlloc(totalSize);
1270         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1271         chkMsg->PE = CkMyPe();
1272         chkMsg->dataSize = dataSize;
1273         
1274         char *buf = &msg[sizeof(CheckPointDataMsg)];
1275         PUP::toMem pBuf(buf);
1276
1277         pBuf | checkpointCount;
1278         for(int i=0; i<CmiNumPes(); i++){
1279                 pBuf | CpvAccess(_incarnation)[i];
1280         }
1281         CkPupROData(pBuf);
1282         CkPupGroupData(pBuf,CmiTrue);
1283         CkPupNodeGroupData(pBuf,CmiTrue);
1284         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1285
1286         unAckedCheckpoint=1;
1287         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1288         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1289         
1290         /*
1291                 Store the highest Ticket number processed for each chare on this processor
1292         */
1293         processedTicketLog.removeAll();
1294         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1295
1296 #if DEBUG_CHECKPOINT
1297         if(CmiMyPe() == 0){
1298                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1299         }
1300 #endif
1301
1302 #if COLLECT_STATS_MEMORY
1303         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1304         msgLogSize = 0;
1305         bufferedDetsSize = 0;
1306         storedDetsSize = 0;
1307 #endif
1308
1309         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1310                 lastCompletedAlarm = curWallTime;
1311                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1312         }
1313         traceUserBracketEvent(28,_startTime,CkWallTimer());
1314 };
1315
1316 /**
1317  * @brief A chare adds the latest ticket number processed.
1318  */
1319 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1320         DEBUG(char objString[100]);
1321
1322         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1323         TProcessedLog logEntry;
1324         logEntry.recver = mlogData->objID;
1325         logEntry.tProcessed = mlogData->tProcessed;
1326         log->push_back(logEntry);
1327
1328         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1329 }
1330
1331 class ElementPacker : public CkLocIterator {
1332 private:
1333         CkLocMgr *locMgr;
1334         PUP::er &p;
1335 public:
1336         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1337         void addLocation(CkLocation &loc) {
1338                 CkArrayIndexMax idx=loc.getIndex();
1339                 CkGroupID gID = locMgr->ckGetGroupID();
1340                 p|gID;      // store loc mgr's GID as well for easier restore
1341                 p|idx;
1342                 p|loc;
1343         }
1344 };
1345
1346 /**
1347  * Pups all the array elements in this processor.
1348  */
1349 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1350         int numElements,i;
1351         int numGroups = CkpvAccess(_groupIDTable)->size();      
1352         if(!p.isUnpacking()){
1353                 numElements = CkCountArrayElements();
1354         }       
1355         p | numElements;
1356         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1357         if(!p.isUnpacking()){
1358                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1359         }else{
1360                 //Flush all recs of all LocMgrs before putting in new elements
1361 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1362                 for(int j=0;j<listsize;j++){
1363                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1364                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1365                                 listToSkip[j].idx.print();
1366                         }
1367                 }
1368                 
1369                 printf("numElements = %d\n",numElements);
1370         
1371                 for (int i=0; i<numElements; i++) {
1372                         CkGroupID gID;
1373                         CkArrayIndexMax idx;
1374                         p|gID;
1375                 p|idx;
1376                         int flag=0;
1377                         int matchedIdx=0;
1378                         for(int j=0;j<listsize;j++){
1379                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1380                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1381                                                 matchedIdx = j;
1382                                                 flag = 1;
1383                                                 break;
1384                                         }
1385                                 }
1386                         }
1387                         if(flag == 1){
1388                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1389                         }else{
1390                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1391                         }
1392                                 
1393                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1394                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1395                         mgr->resume(idx,p,create,flag);
1396                         if(flag == 1){
1397                                 int homePE = mgr->homePe(idx);
1398                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1399                         }
1400                 }
1401         }
1402 };
1403
1404
1405 void writeCheckpointToDisk(int size,char *chkpt){
1406         char fNameTemp[100];
1407         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1408         int fd = creat(fNameTemp,S_IRWXU);
1409         int ret = write(fd,chkpt,size);
1410         CkAssert(ret == size);
1411         close(fd);
1412         
1413         char fName[100];
1414         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1415         unlink(fName);
1416
1417         rename(fNameTemp,fName);
1418 }
1419
1420 //handler that receives the checkpoint from a processor
1421 //it stores it and acks it
1422 void _storeCheckpointHandler(char *msg){
1423         double _startTime=CkWallTimer();
1424                 
1425         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1426         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1427         
1428         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1429         
1430         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1431         if(oldChkpt != NULL){
1432                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1433                 CmiFree(oldmsg);
1434         }
1435         //turning off storing checkpoints
1436         
1437         int sendingPE = chkMsg->PE;
1438         
1439         CpvAccess(_storedCheckpointData)->buf = chkpt;
1440         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1441         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1442
1443         int count=0;
1444         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1445                 if(migratedNoticeList[j].fromPE == sendingPE){
1446                         migratedNoticeList[j].ackFrom = 1;
1447                 }else{
1448                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1449                 }
1450                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1451                         migratedNoticeList.remove(j);
1452                         count++;
1453                 }
1454                 
1455         }
1456         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1457         
1458         CheckPointAck ackMsg;
1459         ackMsg.PE = CkMyPe();
1460         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1461         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1462         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1463         
1464         traceUserBracketEvent(29,_startTime,CkWallTimer());
1465 };
1466
1467
1468 /**
1469  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1470  * @note The remove log request message looks like
1471                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1472  */
1473 void sendRemoveLogRequests(){
1474 #if SYNCHRONIZED_CHECKPOINT
1475         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1476 #endif
1477         double _startTime = CkWallTimer();      
1478
1479         // computing total message size
1480         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1481         char *requestMsg = (char *)CmiAlloc(totalSize);
1482
1483         // filling up the message
1484         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1485         request->PE = CkMyPe();
1486         request->numberObjects = processedTicketLog.size();
1487         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1488         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1489         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1490         int *numDeterminants = (int *)listDeterminants;
1491         numDeterminants[0] = 0; 
1492         listDeterminants = (char *)&numDeterminants[1];
1493
1494         // setting the handler in the message
1495         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1496         
1497         DEBUG_MEM(CmiMemoryCheck());
1498         for(int i=0;i<CkNumPes();i++){
1499                 CmiSyncSend(i,totalSize,requestMsg);
1500         }
1501         CmiFree(requestMsg);
1502
1503         clearUpMigratedRetainedLists(CmiMyPe());
1504         //TODO: clear ticketTable
1505         
1506         traceUserBracketEvent(30,_startTime,CkWallTimer());
1507
1508         DEBUG_MEM(CmiMemoryCheck());
1509 }
1510
1511
1512 void _checkpointAckHandler(CheckPointAck *ackMsg){
1513         DEBUG_MEM(CmiMemoryCheck());
1514         unAckedCheckpoint=0;
1515         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1516         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1517         if(onGoingLoadBalancing){
1518                 onGoingLoadBalancing = 0;
1519                 finishedCheckpointLoadBalancing();
1520         }else{
1521                 sendRemoveLogRequests();
1522         }
1523         CmiFree(ackMsg);
1524         
1525 };
1526
1527
1528 /**
1529  * @brief Inserts all the determinants into a hash table.
1530  */
1531 inline void populateDeterminantTable(char *data){
1532         DEBUG(char recverString[100]);
1533         DEBUG(char senderString[100]);
1534         int numDets, *numDetsPtr;
1535         Determinant *detList;
1536         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1537         SNToTicket *tickets;
1538         Ticket ticket;
1539
1540         RemoveLogRequest *request = (RemoveLogRequest *)data;
1541         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1542         
1543         numDetsPtr = (int *)&list[request->numberObjects];
1544         numDets = numDetsPtr[0];
1545         detList = (Determinant *)&numDetsPtr[1];
1546
1547         // inserting determinants into hashtable
1548         for(int i=0; i<numDets; i++){
1549                 table = detTable.get(detList[i].sender);
1550                 if(table == NULL){
1551                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1552                         detTable.put(detList[i].sender) = table;
1553                 }
1554                 tickets = table->get(detList[i].receiver);
1555                 if(tickets == NULL){
1556                         tickets = new SNToTicket();
1557                         table->put(detList[i].receiver) = tickets; 
1558                 }
1559                 ticket.TN = detList[i].TN;
1560                 tickets->put(detList[i].SN) = ticket;
1561         }
1562
1563         DEBUG_MEM(CmiMemoryCheck());    
1564
1565 }
1566
1567 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1568         int total;
1569         DEBUG(char nameString[100]);
1570         DEBUG_MEM(CmiMemoryCheck());
1571         CmiMemoryCheck();
1572         char *data = (char *)_data;
1573         RemoveLogRequest *request = (RemoveLogRequest *)data;
1574         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1575         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1576         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1577         SNToTicket *tickets;
1578         MCount TN;
1579
1580         int count=0;
1581         total = mlog->length();
1582         for(int i=0; i<total; i++){
1583                 MlogEntry *logEntry = mlog->deq();
1584
1585                 // looking message in determinant table
1586                 table = detTable.get(logEntry->env->sender);
1587                 if(table != NULL){
1588                         tickets = table->get(logEntry->env->recver);
1589                         if(tickets != NULL){
1590                                 TN = tickets->get(logEntry->env->SN).TN;
1591                                 if(TN != 0){
1592                                         logEntry->env->TN = TN;
1593                                 }
1594                         }
1595                 }
1596         
1597                 int match=0;
1598                 for(int j=0;j<request->numberObjects;j++){
1599                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1600                                 //this log Entry should be removed
1601                                 match = 1;
1602                                 break;
1603                         }
1604                 }
1605                 char senderString[100],recverString[100];
1606 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1607                 if(match){
1608                         count++;
1609                         delete logEntry;
1610                 }else{
1611                         mlog->enq(logEntry);
1612                 }
1613         }
1614         if(count > 0){
1615                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1616         }
1617         DEBUG_MEM(CmiMemoryCheck());
1618         CmiMemoryCheck();
1619 }
1620
1621 /**
1622  * @brief Removes messages in the log according to the received ticket numbers
1623  */
1624 void _removeProcessedLogHandler(char *requestMsg){
1625         double start = CkWallTimer();
1626
1627         // building map for determinants
1628         populateDeterminantTable(requestMsg);
1629
1630         // removing processed messages from the message log
1631         forAllCharesDo(removeProcessedLogs,requestMsg);
1632
1633         // @todo: clean determinant table 
1634         
1635         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1636         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1637         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1638         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1639         /*
1640                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1641         */
1642         DEBUG_MEM(CmiMemoryCheck());
1643         clearUpMigratedRetainedLists(request->PE);
1644         
1645         traceUserBracketEvent(20,start,CkWallTimer());
1646         CmiFree(requestMsg);    
1647 };
1648
1649
1650 void clearUpMigratedRetainedLists(int PE){
1651         int count=0;
1652         CmiMemoryCheck();
1653         
1654         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1655                 if(migratedNoticeList[j].toPE == PE){
1656                         migratedNoticeList[j].ackTo = 1;
1657                 }
1658                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1659                         migratedNoticeList.remove(j);
1660                         count++;
1661                 }
1662         }
1663         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1664         
1665         for(int j=retainedObjectList.size()-1;j>=0;j--){
1666                 if(retainedObjectList[j]->migRecord.toPE == PE){
1667                         RetainedMigratedObject *obj = retainedObjectList[j];
1668                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1669                         retainedObjectList.remove(j);
1670                         if(obj->msg != NULL){
1671                                 CmiMemoryCheck();
1672                                 CmiFree(obj->msg);
1673                         }
1674                         delete obj;
1675                 }
1676         }
1677 }
1678
1679 /***************************************************************
1680         Restart Methods and handlers
1681 ***************************************************************/        
1682
1683 /**
1684  * Function for restarting the crashed processor.
1685  * It sets the restart flag and contacts the buddy
1686  * processor to get the latest checkpoint.
1687  */
1688 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1689         RestartRequest msg;
1690
1691         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1692
1693         // setting the restart flag
1694         _restartFlag = 1;
1695         _numRestartResponses = 0;
1696
1697         // if we are using team-based message logging, all members of the group have to be restarted
1698         if(teamSize > 1){
1699                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1700                         if(i != CkMyPe() && i < CkNumPes()){
1701                                 // sending a message to the team member
1702                                 msg.PE = CkMyPe();
1703                             CmiSetHandler(&msg,_restartHandlerIdx);
1704                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1705                         }
1706                 }
1707         }
1708
1709         // requesting the latest checkpoint from its buddy
1710         msg.PE = CkMyPe();
1711         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1712         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1713 };
1714
1715 /**
1716  * Function to restart this processor.
1717  * The handler is invoked by a member of its same team in message logging.
1718  */
1719 void _restartHandler(RestartRequest *restartMsg){
1720         int i;
1721         int numGroups = CkpvAccess(_groupIDTable)->size();
1722         RestartRequest msg;
1723         
1724         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1725
1726     // setting the restart flag
1727         _restartFlag = 1;
1728         _numRestartResponses = 0;
1729
1730         // flushing all buffers
1731         //TEST END
1732 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1733         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1734         for(int i=0;i<numGroups;i++){
1735         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1736                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1737                 obj->flushStates();
1738                 obj->ckJustMigrated();
1739         }*/
1740
1741     // requesting the latest checkpoint from its buddy
1742         msg.PE = CkMyPe();
1743         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1744         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1745 }
1746
1747
1748 /**
1749  * Gets the stored checkpoint but calls another function in the sender.
1750  */
1751 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1752
1753         // retrieving the stored checkpoint
1754         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1755
1756         // making sure it is its buddy who is requesting the checkpoint
1757         CkAssert(restartMsg->PE == storedChkpt->PE);
1758
1759         storedRequest = restartMsg;
1760         verifyAckTotal = 0;
1761
1762         for(int i=0;i<migratedNoticeList.size();i++){
1763                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1764 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1765                         if(migratedNoticeList[i].ackFrom == 0){
1766                                 //need to verify if the object actually exists .. it might not
1767                                 //have been acked but it might exist on it
1768                                 VerifyAckMsg msg;
1769                                 msg.migRecord = migratedNoticeList[i];
1770                                 msg.index = i;
1771                                 msg.fromPE = CmiMyPe();
1772                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1773                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1774                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1775                                 verifyAckTotal++;
1776                         }
1777                 }
1778         }
1779
1780         // sending the checkpoint back to its buddy     
1781         if(verifyAckTotal == 0){
1782                 sendCheckpointData(MLOG_RESTARTED);
1783         }
1784         verifyAckCount = 0;
1785 }
1786
1787 /**
1788  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1789  */
1790 void _recvRestartCheckpointHandler(char *_restartData){
1791         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1792         MigrationRecord *migratedAwayElements;
1793
1794         globalLBID = restartData->lbGroupID;
1795         
1796         restartData->restartWallTime *= 1000;
1797         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1798         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1799         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1800
1801         
1802         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1803         char *buf = &_restartData[sizeof(RestartProcessorData)];
1804         
1805         if(restartData->numMigratedAwayElements != 0){
1806                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1807                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1808                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1809                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1810         }
1811
1812         // turning on the team recovery flag
1813         forAllCharesDo(setTeamRecovery,NULL);
1814         
1815         PUP::fromMem pBuf(buf);
1816         pBuf | checkpointCount;
1817         for(int i=0; i<CmiNumPes(); i++){
1818                 pBuf | CpvAccess(_incarnation)[i];
1819         }
1820         CkPupROData(pBuf);
1821         CkPupGroupData(pBuf,CmiFalse);
1822         CkPupNodeGroupData(pBuf,CmiFalse);
1823         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1824         CkAssert(pBuf.size() == restartData->checkPointSize);
1825         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1826
1827         // increasing the incarnation number of all the team members
1828         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1829                 CpvAccess(_incarnation)[i]++;
1830         }
1831         
1832         // turning off the team recovery flag
1833         forAllCharesDo(unsetTeamRecovery,NULL);
1834
1835         // initializing a few variables for handling local messages
1836         forAllCharesDo(initializeRestart,NULL);
1837         
1838         CmiFree(_restartData);  
1839
1840         /*HERE _initDone();
1841
1842         getGlobalStep(globalLBID);
1843         
1844         countUpdateHomeAcks = 0;
1845         RestartRequest updateHomeRequest;
1846         updateHomeRequest.PE = CmiMyPe();
1847         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1848         for(int i=0;i<CmiNumPes();i++){
1849                 if(i != CmiMyPe()){
1850                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1851                 }
1852         }
1853 */
1854
1855
1856         // Send out the request to resend logged messages to all other processors
1857         CkVec<TProcessedLog> objectVec;
1858         forAllCharesDo(createObjIDList, (void *)&objectVec);
1859         int numberObjects = objectVec.size();
1860         
1861         /*
1862                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1863         */
1864         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1865         char *resendMsg = (char *)CmiAlloc(totalSize);  
1866
1867         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1868         resendReq->PE =CkMyPe(); 
1869         resendReq->numberObjects = numberObjects;
1870         char *objList = &resendMsg[sizeof(ResendRequest)];
1871         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1872         
1873
1874         /* test for parallel restart migrate away object**/
1875 //      if(fastRecovery){
1876 //              distributeRestartedObjects();
1877 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1878 //      }
1879         
1880         /*      To make restart work for load balancing.. should only
1881         be used when checkpoint happens along with load balancing
1882         **/
1883 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1884
1885         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1886         CpvAccess(_currentObj) = lb;
1887         lb->ReceiveDummyMigration(restartDecisionNumber);
1888
1889         sleep(10);
1890         
1891         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1892         for(int i=0;i<CkNumPes();i++){
1893                 if(i != CkMyPe()){
1894                         CmiSyncSend(i,totalSize,resendMsg);
1895                 }       
1896         }
1897         _resendMessagesHandler(resendMsg);
1898
1899 }
1900
1901
1902 void CkMlogRestartDouble(void *,double){
1903         CkMlogRestart(NULL,NULL);
1904 };
1905
1906 //TML: restarting from local (group) failure
1907 void CkMlogRestartLocal(){
1908     CkMlogRestart(NULL,NULL);
1909 };
1910
1911 /**
1912  * Gets the stored checkpoint for its buddy processor.
1913  */
1914 void _getCheckpointHandler(RestartRequest *restartMsg){
1915
1916         // retrieving the stored checkpoint
1917         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1918
1919         // making sure it is its buddy who is requesting the checkpoint
1920         CkAssert(restartMsg->PE == storedChkpt->PE);
1921
1922         storedRequest = restartMsg;
1923         verifyAckTotal = 0;
1924
1925         for(int i=0;i<migratedNoticeList.size();i++){
1926                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1927 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1928                         if(migratedNoticeList[i].ackFrom == 0){
1929                                 //need to verify if the object actually exists .. it might not
1930                                 //have been acked but it might exist on it
1931                                 VerifyAckMsg msg;
1932                                 msg.migRecord = migratedNoticeList[i];
1933                                 msg.index = i;
1934                                 msg.fromPE = CmiMyPe();
1935                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1936                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1937                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1938                                 verifyAckTotal++;
1939                         }
1940                 }
1941         }
1942
1943         // sending the checkpoint back to its buddy     
1944         if(verifyAckTotal == 0){
1945                 sendCheckpointData(MLOG_CRASHED);
1946         }
1947         verifyAckCount = 0;
1948 }
1949
1950
1951 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1952         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1953         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1954         if(rec != NULL && rec->type() == CkLocRec::local){
1955                         //this location exists on this processor
1956                         //and needs to be removed       
1957                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1958                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1959                         
1960                         int localIdx = localRec->getLocalIndex();
1961                         LBDatabase *lbdb = localRec->getLBDB();
1962                         LDObjHandle ldHandle = localRec->getLdHandle();
1963                                 
1964                         locMgr->setDuringMigration(true);
1965                         
1966                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1967                         lbdb->UnregisterObj(ldHandle);
1968                         
1969                         locMgr->setDuringMigration(false);
1970                         
1971                         verifyAckedRequests++;
1972
1973         }
1974         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1975         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1976 };
1977
1978
1979 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1980         int index =     verifyReply->index;
1981         migratedNoticeList[index] = verifyReply->migRecord;
1982         verifyAckCount++;
1983         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1984         if(verifyAckCount == verifyAckTotal){
1985                 sendCheckpointData(MLOG_CRASHED);
1986         }
1987 }
1988
1989
1990 /**
1991  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
1992  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
1993  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
1994  */
1995 void sendCheckpointData(int mode){      
1996         RestartRequest *restartMsg = storedRequest;
1997         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1998         int numMigratedAwayElements = migratedNoticeList.size();
1999         if(migratedNoticeList.size() != 0){
2000                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
2001 //                      CkAssert(migratedNoticeList.size() == 0);
2002         }
2003         
2004         
2005         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2006         
2007         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2008         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2009         
2010         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2011         
2012         char *msg = (char *)CmiAlloc(totalSize);
2013         
2014         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2015         dataMsg->PE = CkMyPe();
2016         dataMsg->restartWallTime = CmiTimer();
2017         dataMsg->checkPointSize = storedChkpt->bufSize;
2018         
2019         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2020 //      dataMsg->numMigratedAwayElements = 0;
2021         
2022         dataMsg->numMigratedInElements = 0;
2023         dataMsg->migratedElementSize = 0;
2024         dataMsg->lbGroupID = globalLBID;
2025         /*msg layout 
2026                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2027                 Local MessageLog|
2028         */
2029         //store checkpoint data
2030         char *buf = &msg[sizeof(RestartProcessorData)];
2031
2032         if(dataMsg->numMigratedAwayElements != 0){
2033                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2034                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2035         }
2036         
2037
2038         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2039         buf = &buf[storedChkpt->bufSize];
2040
2041         if(mode == MLOG_RESTARTED){
2042                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2043                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2044                 CmiFree(restartMsg);
2045         }else{
2046                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2047                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2048                 CmiFree(restartMsg);
2049         }
2050 };
2051
2052
2053 // this list is used to create a vector of the object ids of all
2054 //the chares on this processor currently and the highest TN processed by them 
2055 //the first argument is actually a CkVec<TProcessedLog> *
2056 void createObjIDList(void *data,ChareMlogData *mlogData){
2057         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2058         TProcessedLog entry;
2059         entry.recver = mlogData->objID;
2060         entry.tProcessed = mlogData->tProcessed;
2061         list->push_back(entry);
2062         DEBUG_TEAM(char objString[100]);
2063         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2064         DEBUG_RECOVERY(printLog(&entry));
2065 }
2066
2067
2068 /**
2069  * Receives the checkpoint data from its buddy, restores the state of all the objects
2070  * and asks everyone else to update its home.
2071  */
2072 void _recvCheckpointHandler(char *_restartData){
2073         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2074         MigrationRecord *migratedAwayElements;
2075
2076         globalLBID = restartData->lbGroupID;
2077         
2078         restartData->restartWallTime *= 1000;
2079         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2080         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2081         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2082
2083         
2084         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2085         char *buf = &_restartData[sizeof(RestartProcessorData)];
2086         
2087         if(restartData->numMigratedAwayElements != 0){
2088                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2089                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2090                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2091                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2092         }
2093         
2094         PUP::fromMem pBuf(buf);
2095
2096         pBuf | checkpointCount;
2097         for(int i=0; i<CmiNumPes(); i++){
2098                 pBuf | CpvAccess(_incarnation)[i];
2099         }
2100         CkPupROData(pBuf);
2101         CkPupGroupData(pBuf,CmiTrue);
2102         CkPupNodeGroupData(pBuf,CmiTrue);
2103         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2104         CkAssert(pBuf.size() == restartData->checkPointSize);
2105         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2106
2107         // increases the incarnation number
2108         CpvAccess(_incarnation)[CmiMyPe()]++;
2109         
2110         forAllCharesDo(initializeRestart,NULL);
2111         
2112         CmiFree(_restartData);
2113         
2114         _initDone();
2115
2116         getGlobalStep(globalLBID);
2117
2118         // sending request to send determinants
2119         _numRestartResponses = 0;
2120         
2121         // Send out the request to resend logged determinants to all other processors
2122         CkVec<TProcessedLog> objectVec;
2123         forAllCharesDo(createObjIDList, (void *)&objectVec);
2124         int numberObjects = objectVec.size();
2125         
2126         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2127         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2128         char *resendMsg = (char *)CmiAlloc(totalSize);  
2129
2130         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2131         resendReq->PE =CkMyPe(); 
2132         resendReq->numberObjects = numberObjects;
2133         char *objList = &resendMsg[sizeof(ResendRequest)];
2134         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2135
2136         CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
2137         for(int i=0;i<CkNumPes();i++){
2138                 if(i != CkMyPe()){
2139                         CmiSyncSend(i,totalSize,resendMsg);
2140                 }
2141         }
2142         CmiFree(resendMsg);
2143         
2144 }
2145
2146 /**
2147  * Receives the updateHome ACKs from all other processors. Once everybody
2148  * has replied, it sends a request to resend the logged messages.
2149  */
2150 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2151         countUpdateHomeAcks++;
2152         CmiFree(updateHomeAck);
2153         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2154         if(countUpdateHomeAcks != CmiNumPes()){
2155                 return;
2156         }
2157
2158         // Send out the request to resend logged messages to all other processors
2159         CkVec<TProcessedLog> objectVec;
2160         forAllCharesDo(createObjIDList, (void *)&objectVec);
2161         int numberObjects = objectVec.size();
2162         
2163         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2164         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2165         char *resendMsg = (char *)CmiAlloc(totalSize);  
2166
2167         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2168         resendReq->PE =CkMyPe(); 
2169         resendReq->numberObjects = numberObjects;
2170         char *objList = &resendMsg[sizeof(ResendRequest)];
2171         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2172
2173         /* test for parallel restart migrate away object**/
2174         if(fastRecovery){
2175                 distributeRestartedObjects();
2176                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2177         }
2178         
2179         /*      To make restart work for load balancing.. should only
2180         be used when checkpoint happens along with load balancing
2181         **/
2182 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2183
2184         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2185         CpvAccess(_currentObj) = lb;
2186         lb->ReceiveDummyMigration(restartDecisionNumber);
2187
2188 //HERE  sleep(10);
2189         
2190         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2191         for(int i=0;i<CkNumPes();i++){
2192                 if(i != CkMyPe()){
2193                         CmiSyncSend(i,totalSize,resendMsg);
2194                 }
2195         }
2196         _resendMessagesHandler(resendMsg);
2197         CmiFree(resendMsg);
2198
2199 };
2200
2201 /**
2202  * @brief Initializes variables and flags for restarting procedure.
2203  */
2204 void initializeRestart(void *data, ChareMlogData *mlogData){
2205         mlogData->resendReplyRecvd = 0;
2206         mlogData->receivedTNs = new CkVec<MCount>;
2207         mlogData->restartFlag = 1;
2208 };
2209
2210 /**
2211  * Updates the homePe of chare array elements.
2212  */
2213 void updateHomePE(void *data,ChareMlogData *mlogData){
2214         RestartRequest *updateRequest = (RestartRequest *)data;
2215         int PE = updateRequest->PE; //restarted PE
2216         //if this object is an array Element and its home is the restarted processor
2217         // the home processor needs to know its current location
2218         if(mlogData->objID.type == TypeArray){
2219                 //it is an array element
2220                 CkGroupID myGID = mlogData->objID.data.array.id;
2221                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2222                 CkArrayID aid(mlogData->objID.data.array.id);           
2223                 //check if the restarted processor is the home processor for this object
2224                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2225                 if(locMgr->homePe(myIdx) == PE){
2226                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2227                         DEBUG_RESTART(myIdx.print());
2228                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2229                 }
2230         }
2231 };
2232
2233
2234 /**
2235  * Updates the homePe for all chares in this processor.
2236  */
2237 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2238         int sender = updateRequest->PE;
2239         
2240         forAllCharesDo(updateHomePE,updateRequest);
2241         
2242         updateRequest->PE = CmiMyPe();
2243         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2244         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2245         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2246                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2247                 checkpointCount--;
2248                 startMlogCheckpoint(NULL,0);
2249         }
2250         if(sender == getCheckPointPE()){
2251                 for(int i=0;i<retainedObjectList.size();i++){
2252                         if(retainedObjectList[i]->acked == 0){
2253                                 MigrationNotice migMsg;
2254                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2255                                 migMsg.record = retainedObjectList[i];
2256                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2257                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2258                         }
2259                 }
2260         }
2261 }
2262
2263 /**
2264  * @brief Fills up the ticket vector for each chare.
2265  */
2266 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2267         DEBUG(char name[100]);
2268         ResendData *resendData = (ResendData *)data;
2269         int PE = resendData->PE; //restarted PE
2270         int count=0;
2271         CkHashtableIterator *iterator;
2272         void *objp;
2273         void *objkey;
2274         CkObjID *objID;
2275         SNToTicket *snToTicket;
2276         Ticket ticket;
2277         
2278         // traversing the team table looking up for the maximum TN received     
2279         iterator = mlogData->teamTable.iterator();
2280         while( (objp = iterator->next(&objkey)) != NULL ){
2281                 objID = (CkObjID *)objkey;
2282         
2283                 // traversing the resendData structure to add ticket numbers
2284                 for(int j=0;j<resendData->numberObjects;j++){
2285                         if((*objID) == (resendData->listObjects)[j].recver){
2286                                 snToTicket = *(SNToTicket **)objp;
2287 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2288                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2289                                         ticket = snToTicket->get(snIndex);      
2290                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2291                                                 //store the TNs that have been since the recver last checkpointed
2292                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2293                                         }
2294                                 }
2295                         }
2296                 }
2297         }
2298
2299         //releasing the memory for the iterator
2300         delete iterator;
2301 }
2302
2303
2304 /**
2305  * @brief Turns on the flag for team recovery that selectively restores
2306  * particular metadata information.
2307  */
2308 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2309         char name[100];
2310         mlogData->teamRecoveryFlag = 1; 
2311 }
2312
2313 /**
2314  * @brief Turns off the flag for team recovery.
2315  */
2316 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2317         mlogData->teamRecoveryFlag = 0;
2318 }
2319
2320 /**
2321  * Prints a processed log.
2322  */
2323 void printLog(TProcessedLog *log){
2324         char recverString[100];
2325         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2326 }
2327
2328 /**
2329  * Prints information about a message.
2330  */
2331 void printMsg(envelope *env, const char* par){
2332         char senderString[100];
2333         char recverString[100];
2334         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2335 }
2336
2337 /**
2338  * Prints information about a determinant.
2339  */
2340 void printDet(Determinant *det, const char* par){
2341         char senderString[100];
2342         char recverString[100];
2343         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2344 }
2345
2346 /**
2347  * @brief Resends all the logged messages to a particular chare list.
2348  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2349  * @param mlogData a particular chare living in this processor.
2350  */
2351 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2352         DEBUG_RESTART(char nameString[100]);
2353         DEBUG_RESTART(char recverString[100]);
2354         DEBUG_RESTART(char senderString[100]);
2355
2356         ResendData *resendData = (ResendData *)data;
2357         int PE = resendData->PE; //restarted PE
2358         int count=0;
2359         int ticketRequests=0;
2360         CkQ<MlogEntry *> *log = mlogData->getMlog();
2361
2362         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2363
2364         // traversing the message log to see if we must resend a message        
2365         for(int i=0;i<log->length();i++){
2366                 MlogEntry *logEntry = (*log)[i];
2367                 
2368                 // if we sent out the logs of a local message to buddy and it crashed
2369                 //before acknowledging 
2370                 envelope *env = logEntry->env;
2371                 if(env == NULL){
2372                         continue;
2373                 }
2374         
2375                 // resend if type is not invalid        
2376                 if(env->recver.type != TypeInvalid){
2377                         for(int j=0;j<resendData->numberObjects;j++){
2378                                 if(env->recver == (resendData->listObjects)[j].recver){
2379                                         if(PE != CkMyPe()){
2380                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2381                                                 if(env->recver.type == TypeNodeGroup){
2382                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2383                                                 }else{
2384                                                         CmiSetHandler(env,CmiGetXHandler(env));
2385                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2386                                                 }
2387                                         }else{
2388                                                 envelope *copyEnv = copyEnvelope(env);
2389                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2390                                         }
2391                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2392                                         count++;
2393                                 }
2394                         }//end of for loop of objects
2395                         
2396                 }       
2397         }
2398         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2399 }
2400
2401 /**
2402  * Send all remote determinants to a particular failed PE.
2403  * It only sends determinants to those objects on the list.
2404  */
2405 void _sendDetsHandler(char *msg){
2406         ResendData d;
2407         CkVec<Determinant> *detVec;
2408         ResendRequest *resendReq = (ResendRequest *)msg;
2409
2410         // CkPrintf("[%d] Sending determinants\n",CkMyPe());
2411
2412         // building the reply message
2413         char *listObjects = &msg[sizeof(ResendRequest)];
2414         d.numberObjects = resendReq->numberObjects;
2415         d.PE = resendReq->PE;
2416         d.listObjects = (TProcessedLog *)listObjects;
2417         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2418         detVec = new CkVec<Determinant>[d.numberObjects];
2419
2420         // adding the remote determinants to the resendReplyMsg
2421         // traversing all the remote determinants
2422         CkVec<Determinant> *vec;
2423         for(int i=0; i<d.numberObjects; i++){
2424                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2425                 if(vec != NULL){
2426                         for(int j=0; j<vec->size(); j++){
2427
2428                                 // only relevant determinants are to be sent
2429                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2430
2431                                         // adding the ticket in the ticket vector
2432                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2433
2434                                         // adding the determinant in the determinant vector
2435                                         detVec[i].push_back((*vec)[j]);
2436
2437                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2438                                 }
2439                         }
2440                 }
2441         }
2442
2443         int totalDetStored = 0;
2444         for(int i=0;i<d.numberObjects;i++){
2445                 totalDetStored += detVec[i].size();
2446         }
2447
2448         //send back the maximum ticket number for a message sent to each object on the 
2449         //restarted processor
2450         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2451         
2452         int totalTNStored=0;
2453         for(int i=0;i<d.numberObjects;i++){
2454                 totalTNStored += d.ticketVecs[i].size();
2455         }
2456         
2457         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2458         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2459         
2460         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2461         resendReply->PE = CkMyPe();
2462         resendReply->numberObjects = d.numberObjects;
2463         
2464         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2465         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2466         for(int i=0;i<d.numberObjects;i++){
2467                 replyObjects[i] = d.listObjects[i].recver;
2468         }
2469         
2470         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2471         for(int i=0;i<d.numberObjects;i++){
2472                 int vecsize = d.ticketVecs[i].size();
2473                 memcpy(ticketList,&vecsize,sizeof(int));
2474                 ticketList = &ticketList[sizeof(int)];
2475                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2476                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2477         }
2478
2479         // adding the stored remote determinants to the message
2480         for(int i=0;i<d.numberObjects;i++){
2481                 int vecsize = detVec[i].size();
2482                 memcpy(ticketList,&vecsize,sizeof(int));
2483                 ticketList = &ticketList[sizeof(int)];
2484                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2485                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2486         }
2487
2488         CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
2489         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2490
2491         delete [] detVec;
2492         delete [] d.ticketVecs;
2493
2494         DEBUG_MEM(CmiMemoryCheck());
2495
2496         if(resendReq->PE != CkMyPe()){
2497                 CmiFree(msg);
2498         }       
2499 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2500         lastRestart = CmiWallTimer();
2501
2502 }
2503
2504 /**
2505  * Resends messages since last checkpoint to the list of objects included in the 
2506  * request. It also sends stored remote determinants to the particular failed PE.
2507  */
2508 void _resendMessagesHandler(char *msg){
2509         ResendData d;
2510         ResendRequest *resendReq = (ResendRequest *)msg;
2511
2512         //CkPrintf("[%d] Resending messages\n",CkMyPe());
2513
2514         // building the reply message
2515         char *listObjects = &msg[sizeof(ResendRequest)];
2516         d.numberObjects = resendReq->numberObjects;
2517         d.PE = resendReq->PE;
2518         d.listObjects = (TProcessedLog *)listObjects;
2519         
2520         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2521
2522         //TML: examines the origin processor to determine if it belongs to the same group.
2523         // In that case, it only returns the maximum ticket received for each object in the list.
2524         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2525                 forAllCharesDo(fillTicketForChare,&d);
2526         else
2527                 forAllCharesDo(resendMessageForChare,&d);
2528
2529         DEBUG_MEM(CmiMemoryCheck());
2530
2531         if(resendReq->PE != CkMyPe()){
2532                 CmiFree(msg);
2533         }       
2534 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2535         lastRestart = CmiWallTimer();
2536 }
2537
2538 MCount maxVec(CkVec<MCount> *TNvec);
2539 void sortVec(CkVec<MCount> *TNvec);
2540 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2541
2542 /**
2543  * @brief Processes the messages in the delayed remote message queue
2544  */
2545 void processDelayedRemoteMsgQueue(){
2546         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2547         
2548         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2549                 void *qMsgPtr;
2550                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2551                 envelope *qEnv = (envelope *)qMsgPtr;
2552                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2553                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2554                 DEBUG_MEM(CmiMemoryCheck());
2555         }
2556
2557 }
2558
2559 /**
2560  * @brief Receives determinants stored on remote nodes.
2561  * Message format: |Header|ObjID list|TN list|Determinant list|
2562  * TN list = |number of TNs|list of TNs|...|
2563  */
2564 void _sendDetsReplyHandler(char *msg){
2565         ResendRequest *resendReply = (ResendRequest *)msg;
2566         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2567         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2568         
2569 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2570         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2571         
2572         for(int i =0; i< resendReply->numberObjects;i++){
2573                 Chare *obj = (Chare *)listObjects[i].getObject();
2574                 
2575                 int vecsize;
2576                 memcpy(&vecsize,listTickets,sizeof(int));
2577                 listTickets = &listTickets[sizeof(int)];
2578                 MCount *listTNs = (MCount *)listTickets;
2579                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2580         
2581                 if(obj != NULL){
2582                         //the object was restarted on the processor on which it existed
2583                         processReceivedTN(obj,vecsize,listTNs);
2584                 }else{
2585                         //pack up objID vecsize and listTNs and send it to the correct processor
2586                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2587                         char *TNMsg = (char *)CmiAlloc(totalSize);
2588                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2589                         receivedTNData->recver = listObjects[i];
2590                         receivedTNData->numTNs = vecsize;
2591                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2592                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2593                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2594                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2595                 }
2596
2597         }
2598
2599         // traversing all the retrieved determinants
2600         for(int i = 0; i < resendReply->numberObjects; i++){
2601                 Chare *obj = (Chare *)listObjects[i].getObject();
2602                 
2603                 int vecsize;
2604                 memcpy(&vecsize,listTickets,sizeof(int));
2605                 listTickets = &listTickets[sizeof(int)];
2606                 Determinant *listDets = (Determinant *)listTickets;     
2607                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2608         
2609                 if(obj != NULL){
2610                         //the object was restarted on the processor on which it existed
2611                         processReceivedDet(obj,vecsize,listDets);
2612                 } else {
2613                         // pack the determinants and ship them to the other processor
2614                         // pack up objID vecsize and listDets and send it to the correct processor
2615                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2616                         char *detMsg = (char *)CmiAlloc(totalSize);
2617                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2618                         receivedDetData->recver = listObjects[i];
2619                         receivedDetData->numDets = vecsize;
2620                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2621                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2622                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2623                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2624                 }
2625
2626         }
2627
2628         // checking if we have received all replies
2629         _numRestartResponses++;
2630         if(_numRestartResponses != CkNumPes())
2631                 return;
2632         else 
2633                 _numRestartResponses = 0;
2634
2635         // continuing with restart process; send out the request to resend logged messages to all other processors
2636         CkVec<TProcessedLog> objectVec;
2637         forAllCharesDo(createObjIDList, (void *)&objectVec);
2638         int numberObjects = objectVec.size();
2639         
2640         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2641         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2642         char *resendMsg = (char *)CmiAlloc(totalSize);  
2643
2644         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2645         resendReq->PE =CkMyPe(); 
2646         resendReq->numberObjects = numberObjects;
2647         char *objList = &resendMsg[sizeof(ResendRequest)];
2648         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2649
2650         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2651         CpvAccess(_currentObj) = lb;
2652         lb->ReceiveDummyMigration(restartDecisionNumber);
2653
2654 //HERE  sleep(10);
2655 //      CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
2656         
2657         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2658         for(int i=0;i<CkNumPes();i++){
2659                 if(i != CkMyPe()){
2660                         CmiSyncSend(i,totalSize,resendMsg);
2661                 }
2662         }
2663         _resendMessagesHandler(resendMsg);
2664         CmiFree(resendMsg);
2665
2666         /* test for parallel restart migrate away object**/
2667         if(fastRecovery){
2668                 distributeRestartedObjects();
2669                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2670         }
2671
2672 //      processDelayedRemoteMsgQueue();
2673
2674 };
2675
2676 /**
2677  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2678  */
2679 void _receivedDetDataHandler(ReceivedDetData *msg){
2680         DEBUG_NOW(char objName[100]);
2681         Chare *obj = (Chare *) msg->recver.getObject();
2682         if(obj){                
2683                 char *_msg = (char *)msg;
2684                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2685                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2686                 processReceivedDet(obj,msg->numDets,listDets);
2687                 CmiFree(msg);
2688         }else{
2689                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2690                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2691         }
2692 }
2693
2694 /**
2695  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2696  */
2697 void _receivedTNDataHandler(ReceivedTNData *msg){
2698         DEBUG_NOW(char objName[100]);
2699         Chare *obj = (Chare *) msg->recver.getObject();
2700         if(obj){                
2701                 char *_msg = (char *)msg;
2702                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2703                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2704                 processReceivedTN(obj,msg->numTNs,listTNs);
2705                 CmiFree(msg);
2706         }else{
2707                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2708                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2709         }
2710 };
2711
2712 /**
2713  * @brief Processes the received list of determinants from a particular PE.
2714  */
2715 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2716         Determinant *det;
2717
2718         // traversing the whole list of determinants
2719         for(int i=0; i<listSize; i++){
2720                 det = &listDets[i];
2721                 if(CkMyPe() == 4) printDet(det,"RECOVERY");
2722                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2723                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2724         }
2725         
2726         DEBUG_MEM(CmiMemoryCheck());
2727 }
2728         
2729 /**
2730  * @brief Processes the received list of tickets from a particular PE.
2731  */
2732 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2733         // increases the number of resendReply received
2734         obj->mlogData->resendReplyRecvd++;
2735
2736         DEBUG(char objName[100]);
2737         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2738         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2739         //if(obj->mlogData->receivedTNs == NULL)
2740         //      CkPrintf("NULL\n");     
2741         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2742
2743         // includes the tickets into the receivedTN structure
2744         for(int j=0;j<listSize;j++){
2745                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2746         }
2747         
2748         //if this object has received all the replies find the ticket numbers
2749         //that senders know about. Those less than the ticket number processed 
2750         //by the receiver can be thrown away. The rest need not be consecutive
2751         // ie there can be holes in the list of ticket numbers seen by senders
2752         if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
2753                 obj->mlogData->resendReplyRecvd = 0;
2754
2755 #if VERIFY_DETS
2756                 //sort the received TNS
2757                 sortVec(obj->mlogData->receivedTNs);
2758         
2759                 //after all the received tickets are in we need to sort them and then 
2760                 // calculate the holes  
2761                 if(obj->mlogData->receivedTNs->size() > 0){
2762                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2763                         int vecsize = obj->mlogData->receivedTNs->size();
2764                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2765                         
2766                         // updating tCount with the highest ticket handed out
2767                         if(teamSize > 1){
2768                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2769                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2770                         }else{
2771                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2772                         }
2773                         
2774                         if(numberHoles == 0){
2775                         }else{
2776                                 char objName[100];                                      
2777                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2778                                 obj->mlogData->numberHoles = numberHoles;
2779                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2780                                 int countHoles=0;
2781                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2782                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2783                                                 //the TNs are not consecutive at this point
2784                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2785                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2786                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2787                                                         countHoles++;
2788                                                 }       
2789                                         }
2790                                 }
2791                                 //Holes have been given new TN
2792                                 if(countHoles != numberHoles){
2793                                         char str[100];
2794                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2795                                 }
2796                                 CkAssert(countHoles == numberHoles);                                    
2797                                 obj->mlogData->currentHoles = numberHoles;
2798                         }
2799                 }
2800 #else
2801                 if(obj->mlogData->receivedTNs->size() > 0){
2802                         obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
2803                 }
2804 #endif
2805                 // cleaning up structures and getting ready to continue execution       
2806                 delete obj->mlogData->receivedTNs;
2807                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2808                 obj->mlogData->receivedTNs = NULL;
2809                 obj->mlogData->restartFlag = 0;
2810
2811                 // processDelayedRemoteMsgQueue();
2812
2813                 DEBUG_RESTART(char objString[100]);
2814                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2815         }
2816
2817 }
2818
2819 /** @brief Returns the maximum ticket from a vector */
2820 MCount maxVec(CkVec<MCount> *TNvec){
2821         MCount max = 0;
2822         for(int i=0; i<TNvec->size(); i++){
2823                 if((*TNvec)[i] > max)
2824                         max = (*TNvec)[i];
2825         }
2826         return max;
2827 }
2828
2829 void sortVec(CkVec<MCount> *TNvec){
2830         //sort it ->its bloddy bubble sort
2831         //TODO: use quicksort
2832         for(int i=0;i<TNvec->size();i++){
2833                 for(int j=i+1;j<TNvec->size();j++){
2834                         if((*TNvec)[j] < (*TNvec)[i]){
2835                                 MCount temp;
2836                                 temp = (*TNvec)[i];
2837                                 (*TNvec)[i] = (*TNvec)[j];
2838                                 (*TNvec)[j] = temp;
2839                         }
2840                 }
2841         }
2842         //make it unique .. since its sorted all equal units will be consecutive
2843         MCount *tempArray = new MCount[TNvec->size()];
2844         int     uniqueCount=-1;
2845         for(int i=0;i<TNvec->size();i++){
2846                 tempArray[i] = 0;
2847                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2848                         uniqueCount++;
2849                         tempArray[uniqueCount] = (*TNvec)[i];
2850                 }
2851         }
2852         uniqueCount++;
2853         TNvec->removeAll();
2854         for(int i=0;i<uniqueCount;i++){
2855                 TNvec->push_back(tempArray[i]);
2856         }
2857         delete [] tempArray;
2858 }       
2859
2860 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2861         if(TNVec->size() == 0){
2862                 return -1; //not found in an empty vec
2863         }
2864         //binary search to find 
2865         int left=0;
2866         int right = TNVec->size();
2867         int mid = (left +right)/2;
2868         while(searchTN != (*TNVec)[mid] && left < right){
2869                 if((*TNVec)[mid] > searchTN){
2870                         right = mid-1;
2871                 }else{
2872                         left = mid+1;
2873                 }
2874                 mid = (left + right)/2;
2875         }
2876         if(left < right){
2877                 //mid is the element to be returned
2878                 return mid;
2879         }else{
2880                 if(mid < TNVec->size() && mid >=0){
2881                         if((*TNVec)[mid] == searchTN){
2882                                 return mid;
2883                         }else{
2884                                 return -1;
2885                         }
2886                 }else{
2887                         return -1;
2888                 }
2889         }
2890 };
2891
2892
2893 /*
2894         Method to do parallel restart. Distribute some of the array elements to other processors.
2895         The problem is that we cant use to charm entry methods to do migration as it will get
2896         stuck in the protocol that is going to restart
2897         Note: in order to avoid interference between the objects being recovered, the current PE
2898     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2899     objects end up stepping into each other's shoes (interference).
2900 */
2901
2902 class ElementDistributor: public CkLocIterator{
2903         CkLocMgr *locMgr;
2904         int *targetPE;
2905
2906         void pupLocation(CkLocation &loc,PUP::er &p){
2907                 CkArrayIndexMax idx=loc.getIndex();
2908                 CkGroupID gID = locMgr->ckGetGroupID();
2909                 p|gID;      // store loc mgr's GID as well for easier restore
2910                 p|idx;
2911                 p|loc;
2912         };
2913 public:
2914         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2915
2916         void addLocation(CkLocation &loc){
2917
2918                 // leaving object on this PE
2919                 if(*targetPE == CkMyPe()){
2920                         *targetPE = (*targetPE +1)%CkNumPes();
2921                         return;
2922                 }
2923                         
2924                 CkArrayIndexMax idx = loc.getIndex();
2925                 CkLocRec_local *rec = loc.getLocalRecord();
2926                 CkLocMgr *locMgr = loc.getManager();
2927                 CkVec<CkMigratable *> eltList;
2928                         
2929                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2930                 idx.print();
2931
2932                 // incrementing number of emigrant objects
2933                 CpvAccess(_numEmigrantRecObjs)++;
2934         locMgr->migratableList((CkLocRec_local *)rec,eltList);
2935                 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
2936                 
2937                 // let everybody else know the object is leaving
2938                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
2939                 reductionMgr->incNumEmigrantRecObjs();
2940         
2941                 //pack up this location and send it across
2942                 PUP::sizer psizer;
2943                 pupLocation(loc,psizer);
2944                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
2945                 char *msg = (char *)CmiAlloc(totalSize);
2946                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
2947                 distributeMsg->PE = CkMyPe();
2948                 char *buf = &msg[sizeof(DistributeObjectMsg)];
2949                 PUP::toMem pmem(buf);
2950                 pmem.becomeDeleting();
2951                 pupLocation(loc,pmem);
2952                         
2953                 locMgr->setDuringMigration(CmiTrue);
2954                 delete rec;
2955                 locMgr->setDuringMigration(CmiFalse);
2956                 locMgr->inform(idx,*targetPE);
2957
2958                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2959                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2960
2961                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2962
2963                 //decide on the target processor for the next object
2964                 *targetPE = *targetPE + 1;
2965                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2966                         *targetPE = CkMyPe() + 1;
2967                 }
2968         }
2969
2970 };
2971
2972 /**
2973  * Distributes objects to accelerate recovery after a failure.
2974  */
2975 void distributeRestartedObjects(){
2976         int numGroups = CkpvAccess(_groupIDTable)->size();      
2977         int i;
2978         int targetPE=CkMyPe()+1;
2979         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2980 };
2981
2982 /**
2983  * Handler to receive back a location.
2984  */
2985 void _sendBackLocationHandler(char *receivedMsg){
2986         printf("Array element received at processor %d after recovery\n",CkMyPe());
2987         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
2988         int sourcePE = distributeMsg->PE;
2989         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
2990         PUP::fromMem pmem(buf);
2991         CkGroupID gID;
2992         CkArrayIndexMax idx;
2993         pmem |gID;
2994         pmem |idx;
2995         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2996         donotCountMigration=1;
2997         mgr->resume(idx,pmem,CmiTrue);
2998         donotCountMigration=0;
2999         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3000         printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
3001         idx.print();
3002
3003         // decrementing number of emigrant objects at reduction manager
3004         CkVec<CkMigratable *> eltList;
3005         CkLocRec *rec = mgr->elementRec(idx);
3006         mgr->migratableList((CkLocRec_local *)rec,eltList);
3007         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
3008         reductionMgr->decNumEmigrantRecObjs();
3009         reductionMgr->decGCount();
3010
3011         // checking if it has received all emigrant recovering objects
3012         CpvAccess(_numEmigrantRecObjs)--;
3013         if(CpvAccess(_numEmigrantRecObjs) == 0){
3014                 (*resumeLbFnPtr)(centralLb);
3015         }
3016
3017 }
3018
3019 /**
3020  * Handler to update information about an object just received.
3021  */
3022 void _distributedLocationHandler(char *receivedMsg){
3023         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
3024         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
3025         int sourcePE = distributeMsg->PE;
3026         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
3027         PUP::fromMem pmem(buf);
3028         CkGroupID gID;
3029         CkArrayIndexMax idx;
3030         pmem |gID;
3031         pmem |idx;
3032         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3033         donotCountMigration=1;
3034         mgr->resume(idx,pmem,CmiTrue);
3035         donotCountMigration=0;
3036         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3037         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
3038         idx.print();
3039
3040         CkLocRec *rec = mgr->elementRec(idx);
3041         CmiAssert(rec->type() == CkLocRec::local);
3042
3043         // adding object to the list of immigrant recovery objects
3044         CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,(CkLocRec_local *)rec));
3045         CpvAccess(_numImmigrantRecObjs)++;
3046         
3047         CkVec<CkMigratable *> eltList;
3048         mgr->migratableList((CkLocRec_local *)rec,eltList);
3049         for(int i=0;i<eltList.size();i++){
3050                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
3051                         CpvAccess(_currentObj) = eltList[i];
3052                         eltList[i]->mlogData->immigrantRecFlag = 1;
3053                         eltList[i]->mlogData->immigrantSourcePE = sourcePE;
3054
3055                         // incrementing immigrant counter at reduction manager
3056                         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
3057                         reductionMgr->incNumImmigrantRecObjs();
3058                         reductionMgr->decGCount();
3059
3060                         eltList[i]->ResumeFromSync();
3061                 }
3062         }
3063 }
3064
3065
3066 /** this method is used to send messages to a restarted processor to tell
3067  * it that a particular expected object is not going to get to it */
3068 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
3069         DummyMigrationMsg buf;
3070         buf.flag = MLOG_OBJECT;
3071         buf.lbID = lbID;
3072         buf.mgrID = locMgrID;
3073         buf.idx = idx;
3074         buf.locationPE = locationPE;
3075         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3076         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3077 };
3078
3079
3080 /**this method is used by a restarted processor to tell other processors
3081  * that they are not going to receive these many objects.. just the count
3082  * not the objects themselves ***/
3083
3084 void sendDummyMigrationCounts(int *dummyCounts){
3085         DummyMigrationMsg buf;
3086         buf.flag = MLOG_COUNT;
3087         buf.lbID = globalLBID;
3088         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3089         for(int i=0;i<CmiNumPes();i++){
3090                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3091                         buf.count = dummyCounts[i];
3092                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3093                 }
3094         }
3095 }
3096
3097
3098 /** this handler is used to process a dummy migration msg.
3099  * it looks up the load balancer and calls migrated for it */
3100
3101 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3102         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3103         if(msg->flag == MLOG_OBJECT){
3104                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3105                 LDObjHandle h;
3106                 lb->Migrated(h,1);
3107         }
3108         if(msg->flag == MLOG_COUNT){
3109                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3110                 msg->count -= verifyAckedRequests;
3111                 for(int i=0;i<msg->count;i++){
3112                         LDObjHandle h;
3113                         lb->Migrated(h,1);
3114                 }
3115         }
3116         verifyAckedRequests=0;
3117         CmiFree(msg);
3118 };
3119
3120 /*****************************************************
3121         Implementation of a method that can be used to call
3122         any method on the ChareMlogData of all the chares on
3123         a processor currently
3124 ******************************************************/
3125
3126
3127 class ElementCaller :  public CkLocIterator {
3128 private:
3129         CkLocMgr *locMgr;
3130         MlogFn fnPointer;
3131         void *data;
3132 public:
3133         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3134                 locMgr = _locMgr;
3135                 fnPointer = _fnPointer;
3136                 data = _data;
3137         };
3138         void addLocation(CkLocation &loc){
3139                 CkVec<CkMigratable *> list;
3140                 CkLocRec_local *local = loc.getLocalRecord();
3141                 locMgr->migratableList (local,list);
3142                 for(int i=0;i<list.size();i++){
3143                         CkMigratable *migratableElement = list[i];
3144                         fnPointer(data,migratableElement->mlogData);
3145                 }
3146         }
3147 };
3148
3149 /**
3150  * Map function pointed by fnPointer over all the chares living in this processor.
3151  */
3152 void forAllCharesDo(MlogFn fnPointer, void *data){
3153         int numGroups = CkpvAccess(_groupIDTable)->size();
3154         for(int i=0;i<numGroups;i++){
3155                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3156                 fnPointer(data,obj->mlogData);
3157         }
3158         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3159         for(int i=0;i<numNodeGroups;i++){
3160                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3161                 fnPointer(data,obj->mlogData);
3162         }
3163         int i;
3164         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3165 };
3166
3167
3168 /******************************************************************
3169  Load Balancing
3170 ******************************************************************/
3171
3172 /**
3173  * This is the first time Converse is called after AtSync method has been called by every local object.
3174  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3175  * message logging, we can take advantage of this situation and garbage collect at this point.
3176  */
3177 void initMlogLBStep(CkGroupID gid){
3178         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3179         countLBMigratedAway = 0;
3180         countLBToMigrate=0;
3181         onGoingLoadBalancing=1;
3182         migrationDoneCalled=0;
3183         checkpointBarrierCount=0;
3184         if(globalLBID.idx != 0){
3185                 CmiAssert(globalLBID.idx == gid.idx);
3186         }
3187         globalLBID = gid;
3188 #if SYNCHRONIZED_CHECKPOINT
3189         garbageCollectMlog();
3190 #endif
3191 }
3192
3193 /**
3194  * Pups a location
3195  */
3196 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
3197         CkArrayIndexMax idx = loc->getIndex();
3198         CkGroupID gID = locMgr->ckGetGroupID();
3199         p|gID;      // store loc mgr's GID as well for easier restore
3200         p|idx;
3201         p|*loc;
3202 };
3203
3204 /**
3205  * Sends back the immigrant recovering object to their origin PE.
3206  */
3207 void sendBackImmigrantRecObjs(){
3208         CkLocation *loc;
3209         CkLocMgr *locMgr;
3210         CkArrayIndexMax idx;
3211         CkLocRec_local *rec;
3212         PUP::sizer psizer;
3213         int targetPE;
3214         CkVec<CkMigratable *> eltList;
3215         CkReductionMgr *reductionMgr;
3216  
3217         // looping through all elements in immigrant recovery objects vector
3218         for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
3219
3220                 // getting the components of each location
3221                 loc = (*CpvAccess(_immigrantRecObjs))[i];
3222                 idx = loc->getIndex();
3223                 rec = loc->getLocalRecord();
3224                 locMgr = loc->getManager();
3225         locMgr->migratableList((CkLocRec_local *)rec,eltList);
3226                 targetPE = eltList[i]->mlogData->immigrantSourcePE;
3227
3228                 // decrement counter at array manager
3229                 reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
3230                 reductionMgr->decNumImmigrantRecObjs();
3231
3232                 CkPrintf("[%d] Sending back object to %d: ",CkMyPe(),targetPE);
3233                 idx.print();
3234
3235                 // let everybody else know the object is leaving
3236                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
3237                         
3238                 //pack up this location and send it across
3239                 pupLocation(loc,locMgr,psizer);
3240                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
3241                 char *msg = (char *)CmiAlloc(totalSize);
3242                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
3243                 distributeMsg->PE = CkMyPe();
3244                 char *buf = &msg[sizeof(DistributeObjectMsg)];
3245                 PUP::toMem pmem(buf);
3246                 pmem.becomeDeleting();
3247                 pupLocation(loc,locMgr,pmem);
3248                 
3249                 locMgr->setDuringMigration(CmiTrue);
3250                 delete rec;
3251                 locMgr->setDuringMigration(CmiFalse);
3252                 locMgr->inform(idx,targetPE);
3253
3254                 // sending the object
3255                 CmiSetHandler(msg,_sendBackLocationHandlerIdx);
3256                 CmiSyncSendAndFree(targetPE,totalSize,msg);
3257
3258                 // freeing memory
3259                 delete loc;
3260
3261                 CmiAssert(locMgr->lastKnown(idx) == targetPE);
3262                 
3263         }
3264
3265         // cleaning up all data structures
3266         CpvAccess(_immigrantRecObjs)->removeAll();
3267         CpvAccess(_numImmigrantRecObjs) = 0;
3268
3269 }
3270
3271 /**
3272  * Restores objects after parallel recovery, either by sending back the immigrant objects or 
3273  * by waiting for all emigrant objects to be back.
3274  */
3275 void restoreParallelRecovery(void (*_fnPtr)(void *),void *_centralLb){
3276         resumeLbFnPtr = _fnPtr;
3277         centralLb = _centralLb;
3278
3279         // sending back the immigrant recovering objects
3280         if(CpvAccess(_numImmigrantRecObjs) > 0){
3281                 sendBackImmigrantRecObjs();     
3282         }
3283
3284         // checking whether it needs to wait for emigrant recovery objects
3285         if(CpvAccess(_numEmigrantRecObjs) > 0)
3286                 return;
3287
3288         // otherwise, load balancing process is finished
3289         (*resumeLbFnPtr)(centralLb);
3290 }
3291
3292 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3293         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3294         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3295
3296         resumeLbFnPtr = _fnPtr;
3297         centralLb = _centralLb;
3298         migrationDoneCalled = 1;
3299         if(countLBToMigrate == countLBMigratedAway){
3300                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3301                 startMlogCheckpoint(NULL,CmiWallTimer());
3302         }
3303 };
3304
3305 void finishedCheckpointLoadBalancing(){
3306         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3307         CheckpointBarrierMsg msg;
3308         msg.fromPE = CmiMyPe();
3309         msg.checkpointCount = checkpointCount;
3310
3311         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3312         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3313         
3314 };
3315
3316
3317 void sendMlogLocation(int targetPE, envelope *env){
3318 #if !SYNCHRONIZED_CHECKPOINT
3319         void *_msg = EnvToUsr(env);
3320         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3321
3322         int existing = 0;
3323         //if this object is already in the retainedobjectlust destined for this
3324         //processor it should not be sent
3325         
3326         for(int i=0;i<retainedObjectList.size();i++){
3327                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3328                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3329                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3330                         existing = 1;
3331                         break;
3332                 }
3333         }
3334
3335         if(existing){
3336                 return;
3337         }
3338         
3339         countLBToMigrate++;
3340         
3341         MigrationNotice migMsg;
3342         migMsg.migRecord.gID = msg->gid;
3343         migMsg.migRecord.idx = msg->idx;
3344         migMsg.migRecord.fromPE = CkMyPe();
3345         migMsg.migRecord.toPE =  targetPE;
3346         
3347         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3348         
3349         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3350         retainedObject->migRecord = migMsg.migRecord;
3351         retainedObject->acked  = 0;
3352         
3353         CkPackMessage(&env);
3354         
3355         migMsg.record = retainedObject;
3356         retainedObject->msg = env;
3357         int size = retainedObject->size = env->getTotalsize();
3358         
3359         retainedObjectList.push_back(retainedObject);
3360         
3361         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3362         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3363         
3364         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3365 #endif
3366 }
3367
3368 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3369         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3370         migratedNoticeList.push_back(msg->migRecord);
3371
3372         MigrationNoticeAck buf;
3373         buf.record = msg->record;
3374         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3375         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3376 }
3377
3378 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3379         
3380         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3381         retainedObject->acked = 1;
3382
3383         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3384         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3385
3386         //inform home about the new location of this object
3387         CkGroupID gID = retainedObject->migRecord.gID ;
3388         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3389         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3390         
3391         countLBMigratedAway++;
3392         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3393                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3394                 startMlogCheckpoint(NULL,CmiWallTimer());
3395         }
3396 };
3397
3398 void _receiveMlogLocationHandler(void *buf){
3399         envelope *env = (envelope *)buf;
3400         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3401         CkUnpackMessage(&env);
3402         void *_msg = EnvToUsr(env);
3403         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3404         CkGroupID gID= msg->gid;
3405         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3406         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3407         CpvAccess(_currentObj)=mgr;
3408         mgr->immigrate(msg);
3409 };
3410
3411
3412 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3413 /*      if(mlogData->objID.type == TypeArray){
3414                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3415         //      TODO: make sure later that atSync has been called and it needs 
3416         //      to be resumed from sync
3417         //
3418                 CpvAccess(_currentObj) = elt;
3419                 elt->ResumeFromSync();
3420         }*/
3421 }
3422
3423 /**
3424  * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
3425  */
3426 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3427         if(checkpointBarrierCount == CmiNumPes()){
3428                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3429                 for(int i=0;i<CmiNumPes();i++){
3430                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3431                 }
3432         }
3433 }
3434
3435 /**
3436  * @brief Processor 0 receives a contribution from every other processor after checkpoint.
3437  */ 
3438 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3439         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3440         if(msg->checkpointCount == checkpointCount){
3441                 checkpointBarrierCount++;
3442                 checkAndSendCheckpointBarrierAcks(msg);
3443         }else{
3444                 if(msg->checkpointCount-1 == checkpointCount){
3445                         checkpointBarrierCount++;
3446                         checkAndSendCheckpointBarrierAcks(msg);
3447                 }else{
3448                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3449                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3450                 }
3451         }
3452
3453         // deleting the received message
3454         CmiFree(msg);
3455 }
3456
3457 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3458         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3459         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3460
3461 #if !SYNCHRONIZED_CHECKPOINT
3462         // sending a notice to all senders to remove message logs
3463         sendRemoveLogRequests();
3464 #endif
3465
3466         // resuming LB function pointer
3467         (*resumeLbFnPtr)(centralLb);
3468
3469         // deleting message
3470         CmiFree(msg);
3471 }
3472
3473 /**
3474  * @brief Function to remove all messages in the message log of a particular chare.
3475  */
3476 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
3477         int total;
3478         MlogEntry *logEntry;
3479         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
3480
3481         // traversing the whole message log and removing all elements
3482         total = mlog->length();
3483         for(int i=0; i<total; i++){
3484                 logEntry = mlog->deq();
3485                 delete logEntry;
3486         }
3487
3488 }
3489
3490 /**
3491  * @brief Garbage collects the message log and other data structures.
3492  * In case of synchronized checkpoint, we use an optimization to avoid causal message logging protocol
3493  * to communicate all determinants to the rest of the processors.
3494  */
3495 void garbageCollectMlog(){
3496         CkHashtableIterator *iterator;
3497         CkVec<Determinant> *detArray;
3498
3499         DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
3500
3501         // cleaning up the buffered determinants, since they belong to a previous checkpoint period
3502         _indexBufferedDets = 0;
3503         _numBufferedDets = 0;
3504         _phaseBufferedDets++;
3505
3506         // cleaning up remote determinants, since they belong to a previous checkpoint period
3507         iterator = CpvAccess(_remoteDets)->iterator();
3508         while(iterator->hasNext()){
3509                 detArray = *(CkVec<Determinant> **)iterator->next();
3510                 detArray->removeAll();
3511         }
3512         
3513         // deleting the iterator
3514         delete iterator;
3515
3516         // removing all messages in message log for every chare
3517         forAllCharesDo(garbageCollectMlogForChare, NULL);
3518 }
3519
3520 /**
3521         method that informs an array elements home processor of its current location
3522         It is a converse method to bypass the charm++ message logging framework
3523 */
3524
3525 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
3526         double _startTime = CmiWallTimer();
3527         CurrentLocationMsg msg;
3528         msg.mgrID = locMgrID;
3529         msg.idx = idx;
3530         msg.locationPE = currentPE;
3531         msg.fromPE = CkMyPe();
3532
3533         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3534         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3535         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3536         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3537 }
3538
3539
3540 void _receiveLocationHandler(CurrentLocationMsg *data){
3541         double _startTime = CmiWallTimer();
3542         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3543         if(mgr == NULL){
3544                 CmiFree(data);
3545                 return;
3546         }
3547         CkLocRec *rec = mgr->elementNrec(data->idx);
3548         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3549         if(rec != NULL){
3550                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3551                         if(data->fromPE == data->locationPE){
3552                                 CmiAbort("Another processor has the same object");
3553                         }
3554                 }
3555         }
3556         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3557                 int targetPE = data->fromPE;
3558                 data->fromPE = CmiMyPe();
3559                 data->locationPE = CmiMyPe();
3560                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3561                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3562         }else{
3563                 mgr->inform(data->idx,data->locationPE);
3564         }
3565         CmiFree(data);
3566         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3567 }
3568
3569
3570
3571 void getGlobalStep(CkGroupID gID){
3572         LBStepMsg msg;
3573         int destPE = 0;
3574         msg.lbID = gID;
3575         msg.fromPE = CmiMyPe();
3576         msg.step = -1;
3577         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3578         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3579 };
3580
3581 void _getGlobalStepHandler(LBStepMsg *msg){
3582         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3583         msg->step = lb->step();
3584         CmiAssert(msg->fromPE != CmiMyPe());
3585         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3586         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3587         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3588 };
3589
3590 /**
3591  * @brief Receives the global step handler from PE 0
3592  */
3593 void _recvGlobalStepHandler(LBStepMsg *msg){
3594         
3595         // updating restart decision number
3596         restartDecisionNumber = msg->step;
3597         CmiFree(msg);
3598
3599         CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
3600
3601         // sending a dummy message to sendDetsReplyHandler
3602         ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
3603         resendReplyMsg->PE = CkMyPe();
3604         resendReplyMsg->numberObjects = 0;
3605         _sendDetsReplyHandler((char *)resendReplyMsg);
3606 };
3607
3608 /**
3609  * @brief Function to wrap up performance information.
3610  */
3611 void _messageLoggingExit(){
3612         
3613         // printing the signature for causal message logging
3614         if(CkMyPe() == 0)
3615                 printf("[%d] FastMessageLoggingExit \n",CmiMyPe());
3616
3617         //TML: printing some statistics for group approach
3618 #if COLLECT_STATS_TEAM
3619         printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
3620         printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
3621 #endif
3622
3623 #if COLLECT_STATS_MSGS
3624 #if COLLECT_STATS_MSGS_TOTAL
3625         printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
3626         printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
3627 #else
3628         printf("[%d] TARGETS: ",CmiMyPe());
3629         for(int i=0; i<CmiNumPes(); i++){
3630 #if COLLECT_STATS_MSG_COUNT
3631                 printf("%d ",numMsgsTarget[i]);
3632 #else
3633                 printf("%d ",sizeMsgsTarget[i]);
3634 #endif
3635         }
3636         printf("\n");
3637 #endif
3638 #endif
3639
3640 #if COLLECT_STATS_DETS
3641         printf("\n");
3642         printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
3643         printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
3644 #if COLLECT_STATS_DETS_DUP
3645         printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
3646 #endif
3647 #endif
3648
3649 }
3650
3651 /**
3652         The method for returning the actual object pointed to by an id
3653         If the object doesnot exist on the processor it returns NULL
3654 **/
3655
3656 void* CkObjID::getObject(){
3657         
3658                 switch(type){
3659                         case TypeChare: 
3660                                 return CkLocalChare(&data.chare.id);
3661                         case TypeMainChare:
3662                                 return CkLocalChare(&data.chare.id);
3663                         case TypeGroup:
3664         
3665                                 CkAssert(data.group.onPE == CkMyPe());
3666                                 return CkLocalBranch(data.group.id);
3667                         case TypeNodeGroup:
3668                                 CkAssert(data.group.onPE == CkMyNode());
3669                                 //CkLocalNodeBranch(data.group.id);
3670                                 {
3671                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3672                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3673                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3674         
3675                                         return retval;
3676                                 }       
3677                         case TypeArray:
3678                                 {
3679         
3680         
3681                                         CkArrayID aid(data.array.id);
3682         
3683                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3684         
3685