92ef43c2b8ce43126db752a49843fbe9c9d776d3
[charm.git] / src / ck-core / ckcausalmlog.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_CAUSAL_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_TEAM 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x)  //x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
41 #define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
42 #define DEBUG_RECOVERY(x) //x
43
44 extern const char *idx2str(const CkArrayIndex &ind);
45 extern const char *idx2str(const ArrayElement *el);
46
47 void getGlobalStep(CkGroupID gID);
48
49 bool fault_aware(CkObjID &recver);
50 void sendCheckpointData(int mode);
51 void createObjIDList(void *data,ChareMlogData *mlogData);
52 inline bool isLocal(int destPE);
53 inline bool isTeamLocal(int destPE);
54 void printLog(TProcessedLog *log);
55
56 int _restartFlag=0;
57 int _numRestartResponses=0;
58
59 //TODO: remove for perf runs
60 int countHashRefs=0; //count the number of gets
61 int countHashCollisions=0;
62
63 char *checkpointDirectory=".";
64 int unAckedCheckpoint=0;
65
66 int countLocal=0,countBuffered=0;
67 int countPiggy=0;
68 int countClearBufferedLocalCalls=0;
69
70 int countUpdateHomeAcks=0;
71
72 extern int teamSize;
73 extern int chkptPeriod;
74 extern bool fastRecovery;
75 extern int parallelRecovery;
76
77 char *killFile;
78 char *faultFile;
79 int killFlag=0;
80 int faultFlag=0;
81 int restartingMlogFlag=0;
82 void readKillFile();
83 double killTime=0.0;
84 double faultMean;
85 int checkpointCount=0;
86
87 CpvDeclare(Chare *,_currentObj);
88 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
89 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
90 CpvDeclare(Queue, _outOfOrderMessageQueue);
91 CpvDeclare(Queue, _delayedRemoteMessageQueue);
92 CpvDeclare(char **,_bufferedTicketRequests);
93 CpvDeclare(int *,_numBufferedTicketRequests);
94
95 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
96 /** @note All the determinants generated by a PE are stored in variable _localDets.
97  * As soon as a message is sent, then all the determinants are appended to the message,
98  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
99  * to delete the determinants. In the meantime the same determinants may be appended
100  * to other messages and more determinants can be added to _localDets.
101  * A simple solution to this problem was to have a primitive array and keep adding determinants
102  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
103  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
104  * determinant and a number of how many valid determinants there are behind it. We do not remove
105  * determinants until a checkpoint is made, since these determinants may have to be added to
106  * messages in case of a recovery.
107  */
108 // temporal storage for the outgoing determinants
109 CpvDeclare(char *, _localDets);
110 // number of buffered determinants
111 int _numBufferedDets;
112 // current index of first determinant in _localDets
113 int _indexBufferedDets;
114 // current phase of determinants
115 int _phaseBufferedDets;
116
117 // stores the determinants from other nodes, to send them in case of a crash
118 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
119 // max number of buffered determinants
120 int _maxBufferedDets;
121
122 // stores the incarnation number from every other processor
123 CpvDeclare(char *, _incarnation);
124
125 // storage for remove determinants header
126 CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
127 // storage for store determinants header
128 CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
129 // storage for the sizes in vector-send to store determinants
130 CpvDeclare(int *, _storeDetsSizes);
131 // storage for the pointers in vector-send to store determinants
132 CpvDeclare(char **, _storeDetsPtrs);
133
134 /***** *****/
135
136 /***** VARIABLES FOR PARALLEL RECOVERY *****/
137 CpvDeclare(int, _numEmigrantRecObjs);
138 CpvDeclare(int, _numImmigrantRecObjs);
139 CpvDeclare(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
140 /***** *****/
141
142 #if COLLECT_STATS_MSGS
143 int *numMsgsTarget;
144 int *sizeMsgsTarget;
145 int totalMsgsTarget;
146 float totalMsgsSize;
147 #endif
148 #if COLLECT_STATS_DETS
149 int numPiggyDets;
150 int numDets;
151 int numDupDets;
152 #endif
153 #if COLLECT_STATS_MEMORY
154 int msgLogSize;
155 int bufferedDetsSize;
156 int storedDetsSize;
157 #endif
158 //TML: variables for measuring savings with teams in message logging
159 #if COLLECT_STATS_TEAM
160 float MLOGFT_totalLogSize = 0.0;
161 float MLOGFT_totalMessages = 0.0;
162 #endif
163
164 static double adjustChkptPeriod=0.0; //in ms
165 static double nextCheckpointTime=0.0;//in seconds
166 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
167
168 int _pingHandlerIdx;
169
170 char objString[100];
171 int _checkpointRequestHandlerIdx;
172 int _storeCheckpointHandlerIdx;
173 int _checkpointAckHandlerIdx;
174 int _getCheckpointHandlerIdx;
175 int _recvCheckpointHandlerIdx;
176 int _removeProcessedLogHandlerIdx;
177
178 int _verifyAckRequestHandlerIdx;
179 int _verifyAckHandlerIdx;
180 int _dummyMigrationHandlerIdx;
181
182
183 int     _getGlobalStepHandlerIdx;
184 int     _recvGlobalStepHandlerIdx;
185
186 int _updateHomeRequestHandlerIdx;
187 int _updateHomeAckHandlerIdx;
188 int _resendMessagesHandlerIdx;
189 int _sendDetsHandlerIdx;
190 int _sendDetsReplyHandlerIdx;
191 int _receivedTNDataHandlerIdx;
192 int _receivedDetDataHandlerIdx;
193 int _distributedLocationHandlerIdx;
194 int _storeDeterminantsHandlerIdx;
195 int _removeDeterminantsHandlerIdx;
196
197 //TML: integer constants for team-based message logging
198 int _restartHandlerIdx;
199 int _getRestartCheckpointHandlerIdx;
200 int _recvRestartCheckpointHandlerIdx;
201 void setTeamRecovery(void *data, ChareMlogData *mlogData);
202 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
203
204 int verifyAckTotal;
205 int verifyAckCount;
206
207 int verifyAckedRequests=0;
208
209 RestartRequest *storedRequest;
210
211 int _falseRestart =0; /**
212                                                                                                         For testing on clusters we might carry out restarts on 
213                                                                                                         a porcessor without actually starting it
214                                                                                                         1 -> false restart
215                                                                                                         0 -> restart after an actual crash
216                                                                                                 */
217
218 //Load balancing globals
219 int onGoingLoadBalancing=0;
220 void *centralLb;
221 void (*resumeLbFnPtr)(void *);
222 int _receiveMlogLocationHandlerIdx;
223 int _receiveMigrationNoticeHandlerIdx;
224 int _receiveMigrationNoticeAckHandlerIdx;
225 int _checkpointBarrierHandlerIdx;
226 int _checkpointBarrierAckHandlerIdx;
227
228 CkVec<MigrationRecord> migratedNoticeList;
229 CkVec<RetainedMigratedObject *> retainedObjectList;
230 int donotCountMigration=0;
231 int countLBMigratedAway=0;
232 int countLBToMigrate=0;
233 int migrationDoneCalled=0;
234 int checkpointBarrierCount=0;
235 int globalResumeCount=0;
236 CkGroupID globalLBID;
237 int restartDecisionNumber=-1;
238
239 double lastCompletedAlarm=0;
240 double lastRestart=0;
241
242 //update location globals
243 int _receiveLocationHandlerIdx;
244
245
246 /** 
247  * @brief Initialize message logging data structures and register handlers
248  */
249 void _messageLoggingInit(){
250         //current object
251         CpvInitialize(Chare *,_currentObj);
252         
253         //registering handlers for message logging
254         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
255                 
256         //handlers for checkpointing
257         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
258         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
259         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
260         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
261
262         //handlers for restart
263         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
264         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
265         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
266         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
267         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
268         _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
269         _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
270         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
271         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
272         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
273         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
274         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
275         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
276
277         //TML: handlers for team-based message logging
278         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
279         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
280         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
281
282         // handlers for causal message logging
283         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
284         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
285         
286         //handlers for load balancing
287         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
288         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
289         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
290         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
291         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
292         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
293         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
294         
295         //handlers for updating locations
296         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
297         
298         //Cpv variables for message logging
299         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
300         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
301         CpvInitialize(Queue, _outOfOrderMessageQueue);
302         CpvInitialize(Queue, _delayedRemoteMessageQueue);
303         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
304         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
305         
306         CpvInitialize(char **,_bufferedTicketRequests);
307         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
308         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
309         for(int i=0;i<CkNumPes();i++){
310                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
311                 CpvAccess(_numBufferedTicketRequests)[i]=0;
312         }
313  
314         // Cpv variables for causal protocol
315         _numBufferedDets = 0;
316         _indexBufferedDets = 0;
317         _phaseBufferedDets = 0;
318         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
319         CpvInitialize(char *, _localDets);
320         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
321         CpvInitialize(char *, _incarnation);
322         CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
323         CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
324         CpvInitialize(int *, _storeDetsSizes);
325         CpvInitialize(char **, _storeDetsPtrs);
326         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
327         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
328         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
329         for(int i=0; i<CmiNumPes(); i++){
330                 CpvAccess(_incarnation)[i] = 0;
331         }
332         CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
333         CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
334         CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
335         CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
336
337         // Cpv variables for parallel recovery
338         CpvInitialize(int, _numEmigrantRecObjs);
339     CpvAccess(_numEmigrantRecObjs) = 0;
340     CpvInitialize(int, _numImmigrantRecObjs);
341     CpvAccess(_numImmigrantRecObjs) = 0;
342
343     CpvInitialize(CkVec<CkLocRec_local *> *, _immigrantRecObjs);
344     CpvAccess(_immigrantRecObjs) = new CkVec<CkLocRec_local *>;
345
346         //Cpv variables for checkpoint
347         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
348         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
349
350         // registering user events for projections      
351         traceRegisterUserEvent("Remove Logs", 20);
352         traceRegisterUserEvent("Ticket Request Handler", 21);
353         traceRegisterUserEvent("Ticket Handler", 22);
354         traceRegisterUserEvent("Local Message Copy Handler", 23);
355         traceRegisterUserEvent("Local Message Ack Handler", 24);        
356         traceRegisterUserEvent("Preprocess current message",25);
357         traceRegisterUserEvent("Preprocess past message",26);
358         traceRegisterUserEvent("Preprocess future message",27);
359         traceRegisterUserEvent("Checkpoint",28);
360         traceRegisterUserEvent("Checkpoint Store",29);
361         traceRegisterUserEvent("Checkpoint Ack",30);
362         traceRegisterUserEvent("Send Ticket Request",31);
363         traceRegisterUserEvent("Generalticketrequest1",32);
364         traceRegisterUserEvent("TicketLogLocal",33);
365         traceRegisterUserEvent("next_ticket and SN",34);
366         traceRegisterUserEvent("Timeout for buffered remote messages",35);
367         traceRegisterUserEvent("Timeout for buffered local messages",36);
368         traceRegisterUserEvent("Inform Location Home",37);
369         traceRegisterUserEvent("Receive Location Handler",38);
370         
371         lastCompletedAlarm=CmiWallTimer();
372         lastRestart = CmiWallTimer();
373
374 #if COLLECT_STATS_MSGS
375 #if COLLECT_STATS_MSGS_TOTAL
376         totalMsgsTarget = 0;
377         totalMsgsSize = 0.0;
378 #else
379         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
380         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
381         for(int i=0; i<CmiNumPes(); i++){
382                 numMsgsTarget[i] = 0;
383                 sizeMsgsTarget[i] = 0;
384         }
385 #endif
386 #endif
387 #if COLLECT_STATS_DETS
388         numPiggyDets = 0;
389         numDets = 0;
390         numDupDets = 0;
391 #endif
392 #if COLLECT_STATS_MEMORY
393         msgLogSize = 0;
394         bufferedDetsSize = 0;
395         storedDetsSize = 0;
396 #endif
397
398
399 }
400
401 void killLocal(void *_dummy,double curWallTime);        
402
403 void readKillFile(){
404         FILE *fp=fopen(killFile,"r");
405         if(!fp){
406                 return;
407         }
408         int proc;
409         double sec;
410         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
411                 if(proc == CkMyPe()){
412                         killTime = CmiWallTimer()+sec;
413                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
414                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
415                 }
416         }
417         fclose(fp);
418 }
419
420 /**
421  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
422  * We assume an exponential distribution for the mean-time-between-failures.
423  */
424 void readFaultFile(){
425         FILE *fp=fopen(faultFile,"r");
426         if(!fp){
427                 return;
428         }
429         int proc;
430         double sec;
431         fscanf(fp,"%d %lf",&proc,&sec);
432         faultMean = sec;
433         if(proc == CkMyPe()){
434                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
435                 CcdCallFnAfter(killLocal,NULL,sec*1000);
436         }
437         fclose(fp);
438 }
439
440 void killLocal(void *_dummy,double curWallTime){
441         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
442         if(CmiWallTimer()<killTime-1){
443                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
444         }else{  
445                 kill(getpid(),SIGKILL);
446         }
447 }
448
449 /*** Auxiliary Functions ***/
450
451 /**
452  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
453  * determinants needs to be extended.
454  */
455 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
456         Determinant *det, *auxDet;
457         char *aux;
458
459         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
460
461         // checking if we are overflowing the buffered determinants array
462         if(_indexBufferedDets >= _maxBufferedDets){
463                 aux = CpvAccess(_localDets);
464                 _maxBufferedDets *= 2;
465                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
466                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
467                 CmiFree(aux);
468         }
469
470         // adding the new determinant at the end
471         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
472         det->sender = sender;
473         det->receiver = receiver;
474         det->SN = SN;
475         det->TN = TN;
476         _numBufferedDets++;
477         _indexBufferedDets++;
478
479 #if COLLECT_STATS_MEMORY
480         bufferedDetsSize++;
481 #endif
482 #if COLLECT_STATS_DETS
483         numDets++;
484 #endif
485 }
486
487 /************************ Message logging methods ****************/
488
489 /**
490  * Sends a group message that might be a broadcast.
491  */
492 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
493         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
494                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
495                 void *origMsg = EnvToUsr(env);
496                 for(int i=0;i<CmiNumPes();i++){
497                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
498                                 void *copyMsg = CkCopyMsg(&origMsg);
499                                 envelope *copyEnv = UsrToEnv(copyMsg);
500                                 copyEnv->SN=0;
501                                 copyEnv->TN=0;
502                                 copyEnv->sender.type = TypeInvalid;
503                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
504                                 sendGroupMsg(copyEnv,i,_infoIdx);
505                         }
506                 }
507                 return;
508         }
509
510         // initializing values of envelope
511         env->SN=0;
512         env->TN=0;
513         env->sender.type = TypeInvalid;
514
515         CkObjID recver;
516         recver.type = TypeGroup;
517         recver.data.group.id = env->getGroupNum();
518         recver.data.group.onPE = destPE;
519         sendCommonMsg(recver,env,destPE,_infoIdx);
520 }
521
522 /**
523  * Sends a nodegroup message that might be a broadcast.
524  */
525 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
526         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
527                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
528                 void *origMsg = EnvToUsr(env);
529                 for(int i=0;i<CmiNumNodes();i++){
530                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
531                                 void *copyMsg = CkCopyMsg(&origMsg);
532                                 envelope *copyEnv = UsrToEnv(copyMsg);
533                                 copyEnv->SN=0;
534                                 copyEnv->TN=0;
535                                 copyEnv->sender.type = TypeInvalid;
536                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
537                         }
538                 }
539                 return;
540         }
541
542         // initializing values of envelope
543         env->SN=0;
544         env->TN=0;
545         env->sender.type = TypeInvalid;
546
547         CkObjID recver;
548         recver.type = TypeNodeGroup;
549         recver.data.group.id = env->getGroupNum();
550         recver.data.group.onPE = destNode;
551         sendCommonMsg(recver,env,destNode,_infoIdx);
552 }
553
554 /**
555  * Sends a message to an array element.
556  */
557 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
558         CkObjID recver;
559         recver.type = TypeArray;
560         recver.data.array.id = env->getsetArrayMgr();
561         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
562
563         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
564                 char recverString[100],senderString[100];
565                 
566                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
567         }
568
569         // initializing values of envelope
570         env->SN = 0;
571         env->TN = 0;
572
573         sendCommonMsg(recver,env,destPE,_infoIdx);
574 };
575
576 /**
577  * Sends a message to a singleton chare.
578  */
579 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
580         CkObjID recver;
581         recver.type = TypeChare;
582         recver.data.chare.id = *pCid;
583
584         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
585                 char recverString[100],senderString[100];
586                 
587                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
588         }
589
590         // initializing values of envelope
591         env->SN = 0;
592         env->TN = 0;
593
594         sendCommonMsg(recver,env,destPE,_infoIdx);
595 };
596
597 /**
598  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
599  */
600 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
601         envelope *env = _env;
602         MCount ticketNumber = 0;
603         int resend=0; //is it a resend
604         char recverName[100];
605         char senderString[100];
606         double _startTime=CkWallTimer();
607         
608         DEBUG_MEM(CmiMemoryCheck());
609
610         if(CpvAccess(_currentObj) == NULL){
611 //              CkAssert(0);
612                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
613                 generalCldEnqueue(destPE,env,_infoIdx);
614                 return;
615         }
616
617         // checking if this message should bypass determinants in message-logging
618         if(env->flags & CK_BYPASS_DET_MLOG){
619                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
620                 env->recver = recver;
621                 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
622                 generalCldEnqueue(destPE,env,_infoIdx);
623                 return;
624         }
625         
626         // setting message logging data in the envelope
627         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
628         if(env->sender.type == TypeInvalid){
629                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
630         }else{
631 //              envelope *copyEnv = copyEnvelope(env);
632 //              env = copyEnv;
633                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
634                 env->SN = 0;
635         }
636         
637         DEBUG_MEM(CmiMemoryCheck());
638
639         CkObjID &sender = env->sender;
640         env->recver = recver;
641
642         Chare *obj = (Chare *)env->sender.getObject();
643           
644         if(env->SN == 0){
645                 DEBUG_MEM(CmiMemoryCheck());
646                 env->SN = obj->mlogData->nextSN(recver);
647         }else{
648                 resend = 1;
649         }
650         
651 //      if(env->SN != 1){
652                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
653         //      CmiPrintStackTrace(0);
654 /*      }else{
655                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
656         }*/
657                 
658 //      CkPackMessage(&(mEntry->env));
659 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
660         
661         _startTime = CkWallTimer();
662
663         // uses the proper ticketing mechanism for local, team and general messages
664         if(isLocal(destPE)){
665                 sendLocalMsg(env, _infoIdx);
666         }else{
667                 if((teamSize > 1) && isTeamLocal(destPE)){
668
669                         // look to see if this message already has a ticket in the team-table
670                         Chare *senderObj = (Chare *)sender.getObject();
671                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
672                         if(ticketRow != NULL){
673                                 Ticket ticket = ticketRow->get(env->SN);
674                                 if(ticket.TN != 0){
675                                         ticketNumber = ticket.TN;
676                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
677                                 }
678                         }
679                 }
680                 
681                 // sending the message
682                 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
683                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
684                 
685         }
686 }
687
688 /**
689  * Determines if the message is local or not. A message is local if:
690  * 1) Both the destination and origin are the same PE.
691  */
692 inline bool isLocal(int destPE){
693         // both the destination and the origin are the same PE
694         if(destPE == CkMyPe())
695                 return true;
696
697         return false;
698 }
699
700 /**
701  * Determines if the message is group local or not. A message is group local if:
702  * 1) They belong to the same group in the group-based message logging.
703  */
704 inline bool isTeamLocal(int destPE){
705
706         // they belong to the same group
707         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
708                 return true;
709
710         return false;
711 }
712
713 /**
714  * Method that does the actual send by creating a ticket request filling it up and sending it.
715  */
716 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
717         DEBUG_NOW(char recverString[100]);
718         DEBUG_NOW(char senderString[100]);
719
720         int totalSize;
721
722         envelope *env = entry->env;
723         DEBUG_PE(3,printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
724
725         // setting all the information
726         Chare *obj = (Chare *)entry->env->sender.getObject();
727         entry->env->recver = recver;
728         entry->env->SN = SN;
729         entry->env->TN = TN;
730         if(!resend){
731                 //TML: only stores message if either it goes to this processor or to a processor in a different group
732                 if(!isTeamLocal(entry->destPE)){
733                         obj->mlogData->addLogEntry(entry);
734 #if COLLECT_STATS_TEAM
735                         MLOGFT_totalMessages += 1.0;
736                         MLOGFT_totalLogSize += entry->env->getTotalsize();
737 #endif
738                 }else{
739                         // the message has to be deleted after it has been sent
740                         entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
741                 }
742         }
743
744         // sending the determinants that causally affect this message
745         if(_numBufferedDets > 0){
746
747                 // modifying the actual number of determinants sent in message
748                 CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
749                 CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
750                 CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
751                 CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
752         
753                 // sending the message
754                 CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
755                 CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
756                 CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
757                 CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
758                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
759                 CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
760                 CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
761         }
762
763         // updating its message log entry
764         entry->indexBufDets = _indexBufferedDets;
765         entry->numBufDets = _numBufferedDets;
766
767         // sending the message
768         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
769
770         DEBUG_MEM(CmiMemoryCheck());
771 #if COLLECT_STATS_MSGS
772 #if COLLECT_STATS_MSGS_TOTAL
773         totalMsgsTarget++;
774         totalMsgsSize += (float)env->getTotalsize();
775 #else
776         numMsgsTarget[destPE]++;
777         sizeMsgsTarget[destPE] += env->getTotalsize();
778 #endif
779 #endif
780 #if COLLECT_STATS_DETS
781         numPiggyDets += _numBufferedDets;
782 #endif
783 #if COLLECT_STATS_MEMORY
784         msgLogSize += env->getTotalsize();
785 #endif
786 };
787
788
789 /**
790  * @brief Function to send a local message. It first gets a ticket and
791  * then enqueues the message. If we are recovering, then the message 
792  * is enqueued in a delay queue.
793  */
794 void sendLocalMsg(envelope *env, int _infoIdx){
795         DEBUG_PERF(double _startTime=CkWallTimer());
796         DEBUG_MEM(CmiMemoryCheck());
797         DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
798         DEBUG(char senderString[100]);
799         DEBUG(char recverString[100]);
800         Ticket ticket;
801
802         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
803
804         // getting the receiver local object
805         Chare *recverObj = (Chare *)env->recver.getObject();
806
807         // if receiver object is not NULL, we will ask it for a ticket
808         if(recverObj){
809
810 /*HERE          // if we are recovery, then we must put this off for later
811                 if(recverObj->mlogData->restartFlag){
812                         CpvAccess(_delayedLocalMsgs)->enq(entry);
813                         return;
814                 }
815
816                 // check if this combination of sender and SN has been already a ticket
817                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
818                         
819                 // assigned a new ticket in case this message is completely new
820                 if(ticket.TN == 0){
821                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
822
823                         // if TN == 0 we enqueue this message since we are recovering from a crash
824                         if(ticket.TN == 0){
825                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
826                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
827                                 
828 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
829                                 return;
830                         }
831                 }
832
833                 //TODO: check for the case when an invalid ticket is returned
834                 //TODO: check for OLD or RECEIVED TICKETS
835                 entry->env->TN = ticket.TN;
836                 CkAssert(entry->env->TN > 0);
837                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
838         
839                 DEBUG_MEM(CmiMemoryCheck());
840
841                 // adding the determinant to _localDets
842                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
843 */
844                 // sends the local message
845                 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
846
847                 DEBUG_MEM(CmiMemoryCheck());
848         }else{
849                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
850         }
851 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
852 };
853
854 /****
855         The handler functions
856 *****/
857
858 /*** CAUSAL PROTOCOL HANDLERS ***/
859
860 /**
861  * @brief Removes the determinants after a particular index in the _localDets array.
862  */
863 void _removeDeterminantsHandler(char *buffer){
864         RemoveDeterminantsHeader *header;
865         int index, phase;
866
867         // getting the header from the message
868         header = (RemoveDeterminantsHeader *)buffer;
869         index = header->index;
870         phase = header->phase;
871
872         // fprintf(stderr,"ACK index=%d\n",index);
873
874         // updating the number of buffered determinants
875         if(phase == _phaseBufferedDets){
876                 if(index > _indexBufferedDets)
877                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
878                 CmiAssert(index <= _indexBufferedDets);
879                 _numBufferedDets = _indexBufferedDets - index;
880         }
881
882         // releasing memory
883         CmiFree(buffer);
884
885 }
886
887 /**
888  * @brief Stores the determinants coming from other processor.
889  */
890 void _storeDeterminantsHandler(char *buffer){
891         StoreDeterminantsHeader *header;
892         Determinant *detPtr, det;
893         int i, n, index, phase, destPE;
894         CkVec<Determinant> *vec;
895
896         // getting the header from the message and pointing to the first determinant
897         header = (StoreDeterminantsHeader *)buffer;
898         n = header->number;
899         index = header->index;
900         phase = header->phase;
901         destPE = header->PE;
902         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
903
904         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
905         DEBUG_MEM(CmiMemoryCheck());
906
907         // going through all determinants and storing them into _remoteDets
908         for(i = 0; i < n; i++){
909                 det.sender = detPtr->sender;
910                 det.receiver = detPtr->receiver;
911                 det.SN = detPtr->SN;
912                 det.TN = detPtr->TN;
913
914                 // getting the determinant array
915                 vec = CpvAccess(_remoteDets)->get(det.receiver);
916                 if(vec == NULL){
917                         vec = new CkVec<Determinant>();
918                         CpvAccess(_remoteDets)->put(det.receiver) = vec;
919                 }
920 #if COLLECT_STATS_DETS
921 #if COLLECT_STATS_DETS_DUP
922                 for(int j=0; j<vec->size(); j++){
923                         if(isSameDet(&(*vec)[j],&det)){
924                                 numDupDets++;
925                                 break;
926                         }
927                 }
928 #endif
929 #endif
930 #if COLLECT_STATS_MEMORY
931                 storedDetsSize++;
932 #endif
933                 vec->push_back(det);
934                 detPtr = detPtr++;
935         }
936
937         DEBUG_MEM(CmiMemoryCheck());
938
939         // freeing buffer
940         CmiFree(buffer);
941
942         // sending the ACK back to the sender
943         CpvAccess(_removeDetsHeader)->index = index;
944         CpvAccess(_removeDetsHeader)->phase = phase;
945         CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
946         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
947         
948 }
949
950 /**
951  *  If there are any delayed requests, process them first before 
952  *  processing this request
953  * */
954 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
955         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
956         double  _startTime = CkWallTimer();
957         CmiFree(ticketRequest);
958 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
959 }
960
961
962 /**
963  * @brief Gets a ticket for a recently received message.
964  * @pre env->recver has to be on this processor.
965  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
966  */
967 inline bool _getTicket(envelope *env, int *flag){
968         DEBUG_MEM(CmiMemoryCheck());
969         DEBUG(char recverName[100]);
970         DEBUG(char senderString[100]);
971         Ticket ticket;
972         
973         // getting information from request
974         CkObjID sender = env->sender;
975         CkObjID recver = env->recver;
976         MCount SN = env->SN;
977         MCount TN = env->TN;
978         Chare *recverObj = (Chare *)recver.getObject();
979         
980         DEBUG(recver.toString(recverName);)
981
982         // verifying whether we are recovering from a failure or not
983         if(recverObj->mlogData->restartFlag)
984                 return false;
985
986         // checking ticket table to see if this message already received a ticket
987         ticket = recverObj->mlogData->getTicket(sender, SN);
988         TN = ticket.TN;
989         
990         // checking if the message is team local and if it has a ticket already assigned
991         if(teamSize > 1 && TN != 0){
992                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
993                 recverObj->mlogData->verifyTicket(sender,SN,TN);
994         }
995
996         //check if a ticket for this has been already handed out to an object that used to be local but 
997         // is no longer so.. need for parallel restart
998         if(TN == 0){
999                 ticket = recverObj->mlogData->next_ticket(sender,SN);
1000                 *flag = NEW_TICKET;
1001         } else {
1002                 *flag = OLD_TICKET;
1003         }
1004         if(ticket.TN > recverObj->mlogData->tProcessed){
1005                 ticket.state = NEW_TICKET;
1006         } else {
1007                 ticket.state = OLD_TICKET;
1008         }
1009         // @todo: check for the case when an invalid ticket is returned
1010         if(ticket.TN == 0){
1011                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
1012                 return false;
1013         }
1014                 
1015         // setting the ticket number in the envelope
1016         env->TN = ticket.TN;
1017         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
1018         return true;
1019
1020 };
1021
1022 bool fault_aware(CkObjID &recver){
1023         switch(recver.type){
1024                 case TypeChare:
1025                         return true;
1026                 case TypeMainChare:
1027                         return false;
1028                 case TypeGroup:
1029                 case TypeNodeGroup:
1030                 case TypeArray:
1031                         return true;
1032                 default:
1033                         return false;
1034         }
1035 };
1036
1037 /* Preprocesses a received message */
1038 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
1039         DEBUG_NOW(char recverString[100]);
1040         DEBUG_NOW(char senderString[100]);
1041         DEBUG_MEM(CmiMemoryCheck());
1042         int flag;
1043         bool ticketSuccess;
1044
1045         // getting the receiver object
1046         CkObjID recver = env->recver;
1047
1048         // checking for determinants bypass in message logging
1049         if(env->flags & CK_BYPASS_DET_MLOG){
1050                 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
1051                 return 1;       
1052         }
1053
1054         // checking if receiver is fault aware
1055         if(!fault_aware(recver)){
1056                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
1057                 return 1;
1058         }
1059
1060         Chare *obj = (Chare *)recver.getObject();
1061         *objPointer = obj;
1062         if(obj == NULL){
1063                 int possiblePE = recver.guessPE();
1064                 if(possiblePE != CkMyPe()){
1065                         int totalSize = env->getTotalsize();
1066                         CmiSyncSend(possiblePE,totalSize,(char *)env);
1067                         
1068                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
1069                 }else{
1070                         // this is the case where a message is received and the object has not been initialized
1071                         // we delayed the delivery of the message
1072                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1073                         
1074                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1075                 }
1076                 return 0;
1077         }
1078
1079         // checking if message comes from an old incarnation
1080         // message must be discarded
1081         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1082                 CmiFree(env);
1083                 return 0;
1084         }
1085
1086         DEBUG_MEM(CmiMemoryCheck());
1087         DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1088
1089         // getting a ticket for this message
1090         ticketSuccess = _getTicket(env,&flag);
1091
1092         // we might be during recovery when this message arrives
1093         if(!ticketSuccess){
1094                 
1095                 // adding the message to a delay queue
1096                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1097                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1098
1099                 return 0;
1100         }
1101         
1102         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1103         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1104         
1105         if(flag == NEW_TICKET){
1106                 // storing determinant of message in data structure
1107                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1108         }
1109
1110         DEBUG_MEM(CmiMemoryCheck());
1111
1112         double _startTime = CkWallTimer();
1113 //env->sender.updatePosition(env->getSrcPe());
1114         if(env->TN == obj->mlogData->tProcessed+1){
1115                 //the message that needs to be processed now
1116                 DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1117                 // once we find a message that we can process we put back all the messages in the out of order queue
1118                 // back into the main scheduler queue. 
1119         DEBUG_MEM(CmiMemoryCheck());
1120                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1121                         void *qMsgPtr;
1122                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1123                         envelope *qEnv = (envelope *)qMsgPtr;
1124                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1125         DEBUG_MEM(CmiMemoryCheck());
1126                 }
1127 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1128                 //TODO: this might be a problem.. change made for leanMD
1129 //              CpvAccess(_currentObj) = obj;
1130         DEBUG_MEM(CmiMemoryCheck());
1131                 return 1;
1132         }
1133
1134         // checking if message has already been processed
1135         // message must be discarded
1136         if(env->TN <= obj->mlogData->tProcessed){
1137                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1138                 
1139                 CmiFree(env);
1140                 return 0;
1141         }
1142         //message that needs to be processed in the future
1143
1144         DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1145         //the message cant be processed now put it back in the out of order message Q, 
1146         //It will be transferred to the main queue later
1147         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1148 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1149         DEBUG_MEM(CmiMemoryCheck());
1150         
1151         return 0;
1152 }
1153
1154 /**
1155  * @brief Updates a few variables once a message has been processed.
1156  */
1157 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1158         DEBUG(char senderString[100]);
1159         if(obj){
1160                 if(sender.guessPE() == CkMyPe()){
1161                         if(entry != NULL){
1162                                 entry->env = NULL;
1163                         }
1164                 }
1165                 obj->mlogData->tProcessed++;
1166 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1167                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1168 //              CpvAccess(_currentObj)= NULL;
1169         }
1170         DEBUG_MEM(CmiMemoryCheck());
1171 }
1172
1173 /***
1174         Helpers for the handlers and message logging methods
1175 ***/
1176
1177 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1178 //      double _startTime = CkWallTimer();
1179         if(env->recver.type != TypeNodeGroup){
1180         //This repeats a step performed in skipCldEnq for messages sent to
1181         //other processors. I do this here so that messages to local processors
1182         //undergo the same transformation.. It lets the resend be uniform for 
1183         //all messages
1184 //              CmiSetXHandler(env,CmiGetHandler(env));
1185                 _skipCldEnqueue(destPE,env,_infoIdx);
1186         }else{
1187                 _noCldNodeEnqueue(destPE,env);
1188         }
1189 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1190 }
1191 //extern "C" int CmiGetNonLocalLength();
1192 /** This method is used to retry the ticket requests
1193  * that had been queued up earlier
1194  * */
1195
1196 int calledRetryTicketRequest=0;
1197
1198 void _pingHandler(CkPingMsg *msg){
1199         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1200         CmiFree(msg);
1201 }
1202
1203
1204 /*****************************************************************************
1205         Checkpointing methods..
1206                 Pack all the data on a processor and send it to the buddy periodically
1207                 Also used to throw away message logs
1208 *****************************************************************************/
1209 CkVec<TProcessedLog> processedTicketLog;
1210 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1211 void clearUpMigratedRetainedLists(int PE);
1212
1213 void checkpointAlarm(void *_dummy,double curWallTime){
1214         double diff = curWallTime-lastCompletedAlarm;
1215         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1216 /*      if(CkWallTimer()-lastRestart < 50){
1217                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1218                 return;
1219         }*/
1220         if(diff < ((chkptPeriod) - 2)){
1221                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1222                 return;
1223         }
1224         CheckpointRequest request;
1225         request.PE = CkMyPe();
1226         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1227         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1228 };
1229
1230 void _checkpointRequestHandler(CheckpointRequest *request){
1231         startMlogCheckpoint(NULL,CmiWallTimer());
1232 }
1233
1234 /**
1235  * @brief Starts the checkpoint phase after migration.
1236  */
1237 void startMlogCheckpoint(void *_dummy, double curWallTime){
1238         double _startTime = CkWallTimer();
1239
1240         // increasing the checkpoint counter
1241         checkpointCount++;
1242         
1243 #if DEBUG_CHECKPOINT
1244         if(CmiMyPe() == 0){
1245                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1246         }
1247 #endif
1248
1249         DEBUG_MEM(CmiMemoryCheck());
1250
1251         PUP::sizer psizer;
1252         psizer | checkpointCount;
1253         for(int i=0; i<CmiNumPes(); i++){
1254                 psizer | CpvAccess(_incarnation)[i];
1255         }
1256         CkPupROData(psizer);
1257         DEBUG_MEM(CmiMemoryCheck());
1258         CkPupGroupData(psizer,CmiTrue);
1259         DEBUG_MEM(CmiMemoryCheck());
1260         CkPupNodeGroupData(psizer,CmiTrue);
1261         DEBUG_MEM(CmiMemoryCheck());
1262         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1263         DEBUG_MEM(CmiMemoryCheck());
1264
1265         int dataSize = psizer.size();
1266         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1267         char *msg = (char *)CmiAlloc(totalSize);
1268         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1269         chkMsg->PE = CkMyPe();
1270         chkMsg->dataSize = dataSize;
1271         
1272         char *buf = &msg[sizeof(CheckPointDataMsg)];
1273         PUP::toMem pBuf(buf);
1274
1275         pBuf | checkpointCount;
1276         for(int i=0; i<CmiNumPes(); i++){
1277                 pBuf | CpvAccess(_incarnation)[i];
1278         }
1279         CkPupROData(pBuf);
1280         CkPupGroupData(pBuf,CmiTrue);
1281         CkPupNodeGroupData(pBuf,CmiTrue);
1282         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1283
1284         unAckedCheckpoint=1;
1285         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1286         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1287         
1288         /*
1289                 Store the highest Ticket number processed for each chare on this processor
1290         */
1291         processedTicketLog.removeAll();
1292         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1293
1294 #if DEBUG_CHECKPOINT
1295         if(CmiMyPe() == 0){
1296                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1297         }
1298 #endif
1299
1300 #if COLLECT_STATS_MEMORY
1301         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1302         msgLogSize = 0;
1303         bufferedDetsSize = 0;
1304         storedDetsSize = 0;
1305 #endif
1306
1307         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1308                 lastCompletedAlarm = curWallTime;
1309                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1310         }
1311         traceUserBracketEvent(28,_startTime,CkWallTimer());
1312 };
1313
1314 /**
1315  * @brief A chare adds the latest ticket number processed.
1316  */
1317 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1318         DEBUG(char objString[100]);
1319
1320         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1321         TProcessedLog logEntry;
1322         logEntry.recver = mlogData->objID;
1323         logEntry.tProcessed = mlogData->tProcessed;
1324         log->push_back(logEntry);
1325
1326         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1327 }
1328
1329 class ElementPacker : public CkLocIterator {
1330 private:
1331         CkLocMgr *locMgr;
1332         PUP::er &p;
1333 public:
1334         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1335         void addLocation(CkLocation &loc) {
1336                 CkArrayIndexMax idx=loc.getIndex();
1337                 CkGroupID gID = locMgr->ckGetGroupID();
1338                 p|gID;      // store loc mgr's GID as well for easier restore
1339                 p|idx;
1340                 p|loc;
1341         }
1342 };
1343
1344 /**
1345  * Pups all the array elements in this processor.
1346  */
1347 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1348         int numElements,i;
1349         int numGroups = CkpvAccess(_groupIDTable)->size();      
1350         if(!p.isUnpacking()){
1351                 numElements = CkCountArrayElements();
1352         }       
1353         p | numElements;
1354         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1355         if(!p.isUnpacking()){
1356                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1357         }else{
1358                 //Flush all recs of all LocMgrs before putting in new elements
1359 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1360                 for(int j=0;j<listsize;j++){
1361                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1362                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1363                                 listToSkip[j].idx.print();
1364                         }
1365                 }
1366                 
1367                 printf("numElements = %d\n",numElements);
1368         
1369                 for (int i=0; i<numElements; i++) {
1370                         CkGroupID gID;
1371                         CkArrayIndexMax idx;
1372                         p|gID;
1373                 p|idx;
1374                         int flag=0;
1375                         int matchedIdx=0;
1376                         for(int j=0;j<listsize;j++){
1377                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1378                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1379                                                 matchedIdx = j;
1380                                                 flag = 1;
1381                                                 break;
1382                                         }
1383                                 }
1384                         }
1385                         if(flag == 1){
1386                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1387                         }else{
1388                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1389                         }
1390                                 
1391                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1392                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1393                         mgr->resume(idx,p,create,flag);
1394                         if(flag == 1){
1395                                 int homePE = mgr->homePe(idx);
1396                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1397                         }
1398                 }
1399         }
1400 };
1401
1402
1403 void writeCheckpointToDisk(int size,char *chkpt){
1404         char fNameTemp[100];
1405         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1406         int fd = creat(fNameTemp,S_IRWXU);
1407         int ret = write(fd,chkpt,size);
1408         CkAssert(ret == size);
1409         close(fd);
1410         
1411         char fName[100];
1412         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1413         unlink(fName);
1414
1415         rename(fNameTemp,fName);
1416 }
1417
1418 //handler that receives the checkpoint from a processor
1419 //it stores it and acks it
1420 void _storeCheckpointHandler(char *msg){
1421         double _startTime=CkWallTimer();
1422                 
1423         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1424         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1425         
1426         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1427         
1428         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1429         if(oldChkpt != NULL){
1430                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1431                 CmiFree(oldmsg);
1432         }
1433         //turning off storing checkpoints
1434         
1435         int sendingPE = chkMsg->PE;
1436         
1437         CpvAccess(_storedCheckpointData)->buf = chkpt;
1438         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1439         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1440
1441         int count=0;
1442         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1443                 if(migratedNoticeList[j].fromPE == sendingPE){
1444                         migratedNoticeList[j].ackFrom = 1;
1445                 }else{
1446                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1447                 }
1448                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1449                         migratedNoticeList.remove(j);
1450                         count++;
1451                 }
1452                 
1453         }
1454         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1455         
1456         CheckPointAck ackMsg;
1457         ackMsg.PE = CkMyPe();
1458         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1459         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1460         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1461         
1462         traceUserBracketEvent(29,_startTime,CkWallTimer());
1463 };
1464
1465
1466 /**
1467  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1468  * @note The remove log request message looks like
1469                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1470  */
1471 void sendRemoveLogRequests(){
1472 #if SYNCHRONIZED_CHECKPOINT
1473         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1474 #endif
1475         double _startTime = CkWallTimer();      
1476
1477         // computing total message size
1478         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1479         char *requestMsg = (char *)CmiAlloc(totalSize);
1480
1481         // filling up the message
1482         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1483         request->PE = CkMyPe();
1484         request->numberObjects = processedTicketLog.size();
1485         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1486         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1487         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1488         int *numDeterminants = (int *)listDeterminants;
1489         numDeterminants[0] = 0; 
1490         listDeterminants = (char *)&numDeterminants[1];
1491
1492         // setting the handler in the message
1493         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1494         
1495         DEBUG_MEM(CmiMemoryCheck());
1496         for(int i=0;i<CkNumPes();i++){
1497                 CmiSyncSend(i,totalSize,requestMsg);
1498         }
1499         CmiFree(requestMsg);
1500
1501         clearUpMigratedRetainedLists(CmiMyPe());
1502         //TODO: clear ticketTable
1503         
1504         traceUserBracketEvent(30,_startTime,CkWallTimer());
1505
1506         DEBUG_MEM(CmiMemoryCheck());
1507 }
1508
1509
1510 void _checkpointAckHandler(CheckPointAck *ackMsg){
1511         DEBUG_MEM(CmiMemoryCheck());
1512         unAckedCheckpoint=0;
1513         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1514         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1515         if(onGoingLoadBalancing){
1516                 onGoingLoadBalancing = 0;
1517                 finishedCheckpointLoadBalancing();
1518         }else{
1519                 sendRemoveLogRequests();
1520         }
1521         CmiFree(ackMsg);
1522         
1523 };
1524
1525
1526 /**
1527  * @brief Inserts all the determinants into a hash table.
1528  */
1529 inline void populateDeterminantTable(char *data){
1530         DEBUG(char recverString[100]);
1531         DEBUG(char senderString[100]);
1532         int numDets, *numDetsPtr;
1533         Determinant *detList;
1534         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1535         SNToTicket *tickets;
1536         Ticket ticket;
1537
1538         RemoveLogRequest *request = (RemoveLogRequest *)data;
1539         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1540         
1541         numDetsPtr = (int *)&list[request->numberObjects];
1542         numDets = numDetsPtr[0];
1543         detList = (Determinant *)&numDetsPtr[1];
1544
1545         // inserting determinants into hashtable
1546         for(int i=0; i<numDets; i++){
1547                 table = detTable.get(detList[i].sender);
1548                 if(table == NULL){
1549                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1550                         detTable.put(detList[i].sender) = table;
1551                 }
1552                 tickets = table->get(detList[i].receiver);
1553                 if(tickets == NULL){
1554                         tickets = new SNToTicket();
1555                         table->put(detList[i].receiver) = tickets; 
1556                 }
1557                 ticket.TN = detList[i].TN;
1558                 tickets->put(detList[i].SN) = ticket;
1559         }
1560
1561         DEBUG_MEM(CmiMemoryCheck());    
1562
1563 }
1564
1565 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1566         int total;
1567         DEBUG(char nameString[100]);
1568         DEBUG_MEM(CmiMemoryCheck());
1569         CmiMemoryCheck();
1570         char *data = (char *)_data;
1571         RemoveLogRequest *request = (RemoveLogRequest *)data;
1572         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1573         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1574         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1575         SNToTicket *tickets;
1576         MCount TN;
1577
1578         int count=0;
1579         total = mlog->length();
1580         for(int i=0; i<total; i++){
1581                 MlogEntry *logEntry = mlog->deq();
1582
1583                 // looking message in determinant table
1584                 table = detTable.get(logEntry->env->sender);
1585                 if(table != NULL){
1586                         tickets = table->get(logEntry->env->recver);
1587                         if(tickets != NULL){
1588                                 TN = tickets->get(logEntry->env->SN).TN;
1589                                 if(TN != 0){
1590                                         logEntry->env->TN = TN;
1591                                 }
1592                         }
1593                 }
1594         
1595                 int match=0;
1596                 for(int j=0;j<request->numberObjects;j++){
1597                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1598                                 //this log Entry should be removed
1599                                 match = 1;
1600                                 break;
1601                         }
1602                 }
1603                 char senderString[100],recverString[100];
1604 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1605                 if(match){
1606                         count++;
1607                         delete logEntry;
1608                 }else{
1609                         mlog->enq(logEntry);
1610                 }
1611         }
1612         if(count > 0){
1613                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1614         }
1615         DEBUG_MEM(CmiMemoryCheck());
1616         CmiMemoryCheck();
1617 }
1618
1619 /**
1620  * @brief Removes messages in the log according to the received ticket numbers
1621  */
1622 void _removeProcessedLogHandler(char *requestMsg){
1623         double start = CkWallTimer();
1624
1625         // building map for determinants
1626         populateDeterminantTable(requestMsg);
1627
1628         // removing processed messages from the message log
1629         forAllCharesDo(removeProcessedLogs,requestMsg);
1630
1631         // @todo: clean determinant table 
1632         
1633         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1634         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1635         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1636         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1637         /*
1638                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1639         */
1640         DEBUG_MEM(CmiMemoryCheck());
1641         clearUpMigratedRetainedLists(request->PE);
1642         
1643         traceUserBracketEvent(20,start,CkWallTimer());
1644         CmiFree(requestMsg);    
1645 };
1646
1647
1648 void clearUpMigratedRetainedLists(int PE){
1649         int count=0;
1650         CmiMemoryCheck();
1651         
1652         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1653                 if(migratedNoticeList[j].toPE == PE){
1654                         migratedNoticeList[j].ackTo = 1;
1655                 }
1656                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1657                         migratedNoticeList.remove(j);
1658                         count++;
1659                 }
1660         }
1661         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1662         
1663         for(int j=retainedObjectList.size()-1;j>=0;j--){
1664                 if(retainedObjectList[j]->migRecord.toPE == PE){
1665                         RetainedMigratedObject *obj = retainedObjectList[j];
1666                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1667                         retainedObjectList.remove(j);
1668                         if(obj->msg != NULL){
1669                                 CmiMemoryCheck();
1670                                 CmiFree(obj->msg);
1671                         }
1672                         delete obj;
1673                 }
1674         }
1675 }
1676
1677 /***************************************************************
1678         Restart Methods and handlers
1679 ***************************************************************/        
1680
1681 /**
1682  * Function for restarting the crashed processor.
1683  * It sets the restart flag and contacts the buddy
1684  * processor to get the latest checkpoint.
1685  */
1686 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1687         RestartRequest msg;
1688
1689         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1690
1691         // setting the restart flag
1692         _restartFlag = 1;
1693         _numRestartResponses = 0;
1694
1695         // if we are using team-based message logging, all members of the group have to be restarted
1696         if(teamSize > 1){
1697                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1698                         if(i != CkMyPe() && i < CkNumPes()){
1699                                 // sending a message to the team member
1700                                 msg.PE = CkMyPe();
1701                             CmiSetHandler(&msg,_restartHandlerIdx);
1702                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1703                         }
1704                 }
1705         }
1706
1707         // requesting the latest checkpoint from its buddy
1708         msg.PE = CkMyPe();
1709         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1710         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1711 };
1712
1713 /**
1714  * Function to restart this processor.
1715  * The handler is invoked by a member of its same team in message logging.
1716  */
1717 void _restartHandler(RestartRequest *restartMsg){
1718         int i;
1719         int numGroups = CkpvAccess(_groupIDTable)->size();
1720         RestartRequest msg;
1721         
1722         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1723
1724     // setting the restart flag
1725         _restartFlag = 1;
1726         _numRestartResponses = 0;
1727
1728         // flushing all buffers
1729         //TEST END
1730 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1731         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1732         for(int i=0;i<numGroups;i++){
1733         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1734                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1735                 obj->flushStates();
1736                 obj->ckJustMigrated();
1737         }*/
1738
1739     // requesting the latest checkpoint from its buddy
1740         msg.PE = CkMyPe();
1741         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1742         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1743 }
1744
1745
1746 /**
1747  * Gets the stored checkpoint but calls another function in the sender.
1748  */
1749 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1750
1751         // retrieving the stored checkpoint
1752         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1753
1754         // making sure it is its buddy who is requesting the checkpoint
1755         CkAssert(restartMsg->PE == storedChkpt->PE);
1756
1757         storedRequest = restartMsg;
1758         verifyAckTotal = 0;
1759
1760         for(int i=0;i<migratedNoticeList.size();i++){
1761                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1762 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1763                         if(migratedNoticeList[i].ackFrom == 0){
1764                                 //need to verify if the object actually exists .. it might not
1765                                 //have been acked but it might exist on it
1766                                 VerifyAckMsg msg;
1767                                 msg.migRecord = migratedNoticeList[i];
1768                                 msg.index = i;
1769                                 msg.fromPE = CmiMyPe();
1770                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1771                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1772                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1773                                 verifyAckTotal++;
1774                         }
1775                 }
1776         }
1777
1778         // sending the checkpoint back to its buddy     
1779         if(verifyAckTotal == 0){
1780                 sendCheckpointData(MLOG_RESTARTED);
1781         }
1782         verifyAckCount = 0;
1783 }
1784
1785 /**
1786  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1787  */
1788 void _recvRestartCheckpointHandler(char *_restartData){
1789         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1790         MigrationRecord *migratedAwayElements;
1791
1792         globalLBID = restartData->lbGroupID;
1793         
1794         restartData->restartWallTime *= 1000;
1795         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1796         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1797         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1798
1799         
1800         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1801         char *buf = &_restartData[sizeof(RestartProcessorData)];
1802         
1803         if(restartData->numMigratedAwayElements != 0){
1804                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1805                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1806                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1807                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1808         }
1809
1810         // turning on the team recovery flag
1811         forAllCharesDo(setTeamRecovery,NULL);
1812         
1813         PUP::fromMem pBuf(buf);
1814         pBuf | checkpointCount;
1815         for(int i=0; i<CmiNumPes(); i++){
1816                 pBuf | CpvAccess(_incarnation)[i];
1817         }
1818         CkPupROData(pBuf);
1819         CkPupGroupData(pBuf,CmiFalse);
1820         CkPupNodeGroupData(pBuf,CmiFalse);
1821         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1822         CkAssert(pBuf.size() == restartData->checkPointSize);
1823         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1824
1825         // increasing the incarnation number of all the team members
1826         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1827                 CpvAccess(_incarnation)[i]++;
1828         }
1829         
1830         // turning off the team recovery flag
1831         forAllCharesDo(unsetTeamRecovery,NULL);
1832
1833         // initializing a few variables for handling local messages
1834         forAllCharesDo(initializeRestart,NULL);
1835         
1836         CmiFree(_restartData);  
1837
1838         /*HERE _initDone();
1839
1840         getGlobalStep(globalLBID);
1841         
1842         countUpdateHomeAcks = 0;
1843         RestartRequest updateHomeRequest;
1844         updateHomeRequest.PE = CmiMyPe();
1845         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1846         for(int i=0;i<CmiNumPes();i++){
1847                 if(i != CmiMyPe()){
1848                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1849                 }
1850         }
1851 */
1852
1853
1854         // Send out the request to resend logged messages to all other processors
1855         CkVec<TProcessedLog> objectVec;
1856         forAllCharesDo(createObjIDList, (void *)&objectVec);
1857         int numberObjects = objectVec.size();
1858         
1859         /*
1860                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1861         */
1862         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1863         char *resendMsg = (char *)CmiAlloc(totalSize);  
1864
1865         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1866         resendReq->PE =CkMyPe(); 
1867         resendReq->numberObjects = numberObjects;
1868         char *objList = &resendMsg[sizeof(ResendRequest)];
1869         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1870         
1871
1872         /* test for parallel restart migrate away object**/
1873 //      if(fastRecovery){
1874 //              distributeRestartedObjects();
1875 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1876 //      }
1877         
1878         /*      To make restart work for load balancing.. should only
1879         be used when checkpoint happens along with load balancing
1880         **/
1881 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1882
1883         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1884         CpvAccess(_currentObj) = lb;
1885         lb->ReceiveDummyMigration(restartDecisionNumber);
1886
1887         sleep(10);
1888         
1889         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1890         for(int i=0;i<CkNumPes();i++){
1891                 if(i != CkMyPe()){
1892                         CmiSyncSend(i,totalSize,resendMsg);
1893                 }       
1894         }
1895         _resendMessagesHandler(resendMsg);
1896
1897 }
1898
1899
1900 void CkMlogRestartDouble(void *,double){
1901         CkMlogRestart(NULL,NULL);
1902 };
1903
1904 //TML: restarting from local (group) failure
1905 void CkMlogRestartLocal(){
1906     CkMlogRestart(NULL,NULL);
1907 };
1908
1909 /**
1910  * Gets the stored checkpoint for its buddy processor.
1911  */
1912 void _getCheckpointHandler(RestartRequest *restartMsg){
1913
1914         // retrieving the stored checkpoint
1915         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1916
1917         // making sure it is its buddy who is requesting the checkpoint
1918         CkAssert(restartMsg->PE == storedChkpt->PE);
1919
1920         storedRequest = restartMsg;
1921         verifyAckTotal = 0;
1922
1923         for(int i=0;i<migratedNoticeList.size();i++){
1924                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1925 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1926                         if(migratedNoticeList[i].ackFrom == 0){
1927                                 //need to verify if the object actually exists .. it might not
1928                                 //have been acked but it might exist on it
1929                                 VerifyAckMsg msg;
1930                                 msg.migRecord = migratedNoticeList[i];
1931                                 msg.index = i;
1932                                 msg.fromPE = CmiMyPe();
1933                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1934                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1935                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1936                                 verifyAckTotal++;
1937                         }
1938                 }
1939         }
1940
1941         // sending the checkpoint back to its buddy     
1942         if(verifyAckTotal == 0){
1943                 sendCheckpointData(MLOG_CRASHED);
1944         }
1945         verifyAckCount = 0;
1946 }
1947
1948
1949 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1950         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1951         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1952         if(rec != NULL && rec->type() == CkLocRec::local){
1953                         //this location exists on this processor
1954                         //and needs to be removed       
1955                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1956                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1957                         
1958                         int localIdx = localRec->getLocalIndex();
1959                         LBDatabase *lbdb = localRec->getLBDB();
1960                         LDObjHandle ldHandle = localRec->getLdHandle();
1961                                 
1962                         locMgr->setDuringMigration(true);
1963                         
1964                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1965                         lbdb->UnregisterObj(ldHandle);
1966                         
1967                         locMgr->setDuringMigration(false);
1968                         
1969                         verifyAckedRequests++;
1970
1971         }
1972         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1973         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1974 };
1975
1976
1977 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1978         int index =     verifyReply->index;
1979         migratedNoticeList[index] = verifyReply->migRecord;
1980         verifyAckCount++;
1981         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1982         if(verifyAckCount == verifyAckTotal){
1983                 sendCheckpointData(MLOG_CRASHED);
1984         }
1985 }
1986
1987
1988 /**
1989  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
1990  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
1991  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
1992  */
1993 void sendCheckpointData(int mode){      
1994         RestartRequest *restartMsg = storedRequest;
1995         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1996         int numMigratedAwayElements = migratedNoticeList.size();
1997         if(migratedNoticeList.size() != 0){
1998                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
1999 //                      CkAssert(migratedNoticeList.size() == 0);
2000         }
2001         
2002         
2003         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2004         
2005         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2006         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2007         
2008         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2009         
2010         char *msg = (char *)CmiAlloc(totalSize);
2011         
2012         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2013         dataMsg->PE = CkMyPe();
2014         dataMsg->restartWallTime = CmiTimer();
2015         dataMsg->checkPointSize = storedChkpt->bufSize;
2016         
2017         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2018 //      dataMsg->numMigratedAwayElements = 0;
2019         
2020         dataMsg->numMigratedInElements = 0;
2021         dataMsg->migratedElementSize = 0;
2022         dataMsg->lbGroupID = globalLBID;
2023         /*msg layout 
2024                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2025                 Local MessageLog|
2026         */
2027         //store checkpoint data
2028         char *buf = &msg[sizeof(RestartProcessorData)];
2029
2030         if(dataMsg->numMigratedAwayElements != 0){
2031                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2032                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2033         }
2034         
2035
2036         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2037         buf = &buf[storedChkpt->bufSize];
2038
2039         if(mode == MLOG_RESTARTED){
2040                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2041                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2042                 CmiFree(restartMsg);
2043         }else{
2044                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2045                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2046                 CmiFree(restartMsg);
2047         }
2048 };
2049
2050
2051 // this list is used to create a vector of the object ids of all
2052 //the chares on this processor currently and the highest TN processed by them 
2053 //the first argument is actually a CkVec<TProcessedLog> *
2054 void createObjIDList(void *data,ChareMlogData *mlogData){
2055         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2056         TProcessedLog entry;
2057         entry.recver = mlogData->objID;
2058         entry.tProcessed = mlogData->tProcessed;
2059         list->push_back(entry);
2060         DEBUG_TEAM(char objString[100]);
2061         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2062         DEBUG_RECOVERY(printLog(&entry));
2063 }
2064
2065
2066 /**
2067  * Receives the checkpoint data from its buddy, restores the state of all the objects
2068  * and asks everyone else to update its home.
2069  */
2070 void _recvCheckpointHandler(char *_restartData){
2071         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2072         MigrationRecord *migratedAwayElements;
2073
2074         globalLBID = restartData->lbGroupID;
2075         
2076         restartData->restartWallTime *= 1000;
2077         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2078         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2079         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2080
2081         
2082         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2083         char *buf = &_restartData[sizeof(RestartProcessorData)];
2084         
2085         if(restartData->numMigratedAwayElements != 0){
2086                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2087                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2088                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2089                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2090         }
2091         
2092         PUP::fromMem pBuf(buf);
2093
2094         pBuf | checkpointCount;
2095         for(int i=0; i<CmiNumPes(); i++){
2096                 pBuf | CpvAccess(_incarnation)[i];
2097         }
2098         CkPupROData(pBuf);
2099         CkPupGroupData(pBuf,CmiTrue);
2100         CkPupNodeGroupData(pBuf,CmiTrue);
2101         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2102         CkAssert(pBuf.size() == restartData->checkPointSize);
2103         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2104
2105         // increases the incarnation number
2106         CpvAccess(_incarnation)[CmiMyPe()]++;
2107         
2108         forAllCharesDo(initializeRestart,NULL);
2109         
2110         CmiFree(_restartData);
2111         
2112         _initDone();
2113
2114         getGlobalStep(globalLBID);
2115
2116         // sending request to send determinants
2117         _numRestartResponses = 0;
2118         
2119         // Send out the request to resend logged determinants to all other processors
2120         CkVec<TProcessedLog> objectVec;
2121         forAllCharesDo(createObjIDList, (void *)&objectVec);
2122         int numberObjects = objectVec.size();
2123         
2124         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2125         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2126         char *resendMsg = (char *)CmiAlloc(totalSize);  
2127
2128         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2129         resendReq->PE =CkMyPe(); 
2130         resendReq->numberObjects = numberObjects;
2131         char *objList = &resendMsg[sizeof(ResendRequest)];
2132         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2133
2134         CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
2135         for(int i=0;i<CkNumPes();i++){
2136                 if(i != CkMyPe()){
2137                         CmiSyncSend(i,totalSize,resendMsg);
2138                 }
2139         }
2140         CmiFree(resendMsg);
2141         
2142 }
2143
2144 /**
2145  * Receives the updateHome ACKs from all other processors. Once everybody
2146  * has replied, it sends a request to resend the logged messages.
2147  */
2148 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2149         countUpdateHomeAcks++;
2150         CmiFree(updateHomeAck);
2151         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2152         if(countUpdateHomeAcks != CmiNumPes()){
2153                 return;
2154         }
2155
2156         // Send out the request to resend logged messages to all other processors
2157         CkVec<TProcessedLog> objectVec;
2158         forAllCharesDo(createObjIDList, (void *)&objectVec);
2159         int numberObjects = objectVec.size();
2160         
2161         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2162         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2163         char *resendMsg = (char *)CmiAlloc(totalSize);  
2164
2165         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2166         resendReq->PE =CkMyPe(); 
2167         resendReq->numberObjects = numberObjects;
2168         char *objList = &resendMsg[sizeof(ResendRequest)];
2169         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2170
2171         /* test for parallel restart migrate away object**/
2172         if(fastRecovery){
2173                 distributeRestartedObjects();
2174                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2175         }
2176         
2177         /*      To make restart work for load balancing.. should only
2178         be used when checkpoint happens along with load balancing
2179         **/
2180 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2181
2182         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2183         CpvAccess(_currentObj) = lb;
2184         lb->ReceiveDummyMigration(restartDecisionNumber);
2185
2186 //HERE  sleep(10);
2187         
2188         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2189         for(int i=0;i<CkNumPes();i++){
2190                 if(i != CkMyPe()){
2191                         CmiSyncSend(i,totalSize,resendMsg);
2192                 }
2193         }
2194         _resendMessagesHandler(resendMsg);
2195         CmiFree(resendMsg);
2196
2197 };
2198
2199 /**
2200  * @brief Initializes variables and flags for restarting procedure.
2201  */
2202 void initializeRestart(void *data, ChareMlogData *mlogData){
2203         mlogData->resendReplyRecvd = 0;
2204         mlogData->receivedTNs = new CkVec<MCount>;
2205         mlogData->restartFlag = 1;
2206 };
2207
2208 /**
2209  * Updates the homePe of chare array elements.
2210  */
2211 void updateHomePE(void *data,ChareMlogData *mlogData){
2212         RestartRequest *updateRequest = (RestartRequest *)data;
2213         int PE = updateRequest->PE; //restarted PE
2214         //if this object is an array Element and its home is the restarted processor
2215         // the home processor needs to know its current location
2216         if(mlogData->objID.type == TypeArray){
2217                 //it is an array element
2218                 CkGroupID myGID = mlogData->objID.data.array.id;
2219                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2220                 CkArrayID aid(mlogData->objID.data.array.id);           
2221                 //check if the restarted processor is the home processor for this object
2222                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2223                 if(locMgr->homePe(myIdx) == PE){
2224                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2225                         DEBUG_RESTART(myIdx.print());
2226                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2227                 }
2228         }
2229 };
2230
2231
2232 /**
2233  * Updates the homePe for all chares in this processor.
2234  */
2235 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2236         int sender = updateRequest->PE;
2237         
2238         forAllCharesDo(updateHomePE,updateRequest);
2239         
2240         updateRequest->PE = CmiMyPe();
2241         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2242         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2243         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2244                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2245                 checkpointCount--;
2246                 startMlogCheckpoint(NULL,0);
2247         }
2248         if(sender == getCheckPointPE()){
2249                 for(int i=0;i<retainedObjectList.size();i++){
2250                         if(retainedObjectList[i]->acked == 0){
2251                                 MigrationNotice migMsg;
2252                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2253                                 migMsg.record = retainedObjectList[i];
2254                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2255                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2256                         }
2257                 }
2258         }
2259 }
2260
2261 /**
2262  * @brief Fills up the ticket vector for each chare.
2263  */
2264 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2265         DEBUG(char name[100]);
2266         ResendData *resendData = (ResendData *)data;
2267         int PE = resendData->PE; //restarted PE
2268         int count=0;
2269         CkHashtableIterator *iterator;
2270         void *objp;
2271         void *objkey;
2272         CkObjID *objID;
2273         SNToTicket *snToTicket;
2274         Ticket ticket;
2275         
2276         // traversing the team table looking up for the maximum TN received     
2277         iterator = mlogData->teamTable.iterator();
2278         while( (objp = iterator->next(&objkey)) != NULL ){
2279                 objID = (CkObjID *)objkey;
2280         
2281                 // traversing the resendData structure to add ticket numbers
2282                 for(int j=0;j<resendData->numberObjects;j++){
2283                         if((*objID) == (resendData->listObjects)[j].recver){
2284                                 snToTicket = *(SNToTicket **)objp;
2285 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2286                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2287                                         ticket = snToTicket->get(snIndex);      
2288                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2289                                                 //store the TNs that have been since the recver last checkpointed
2290                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2291                                         }
2292                                 }
2293                         }
2294                 }
2295         }
2296
2297         //releasing the memory for the iterator
2298         delete iterator;
2299 }
2300
2301
2302 /**
2303  * @brief Turns on the flag for team recovery that selectively restores
2304  * particular metadata information.
2305  */
2306 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2307         char name[100];
2308         mlogData->teamRecoveryFlag = 1; 
2309 }
2310
2311 /**
2312  * @brief Turns off the flag for team recovery.
2313  */
2314 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2315         mlogData->teamRecoveryFlag = 0;
2316 }
2317
2318 /**
2319  * Prints a processed log.
2320  */
2321 void printLog(TProcessedLog *log){
2322         char recverString[100];
2323         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2324 }
2325
2326 /**
2327  * Prints information about a message.
2328  */
2329 void printMsg(envelope *env, const char* par){
2330         char senderString[100];
2331         char recverString[100];
2332         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2333 }
2334
2335 /**
2336  * Prints information about a determinant.
2337  */
2338 void printDet(Determinant *det, const char* par){
2339         char senderString[100];
2340         char recverString[100];
2341         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2342 }
2343
2344 /**
2345  * @brief Resends all the logged messages to a particular chare list.
2346  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2347  * @param mlogData a particular chare living in this processor.
2348  */
2349 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2350         DEBUG_RESTART(char nameString[100]);
2351         DEBUG_RESTART(char recverString[100]);
2352         DEBUG_RESTART(char senderString[100]);
2353
2354         ResendData *resendData = (ResendData *)data;
2355         int PE = resendData->PE; //restarted PE
2356         int count=0;
2357         int ticketRequests=0;
2358         CkQ<MlogEntry *> *log = mlogData->getMlog();
2359
2360         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2361
2362         // traversing the message log to see if we must resend a message        
2363         for(int i=0;i<log->length();i++){
2364                 MlogEntry *logEntry = (*log)[i];
2365                 
2366                 // if we sent out the logs of a local message to buddy and it crashed
2367                 //before acknowledging 
2368                 envelope *env = logEntry->env;
2369                 if(env == NULL){
2370                         continue;
2371                 }
2372         
2373                 // resend if type is not invalid        
2374                 if(env->recver.type != TypeInvalid){
2375                         for(int j=0;j<resendData->numberObjects;j++){
2376                                 if(env->recver == (resendData->listObjects)[j].recver){
2377                                         if(PE != CkMyPe()){
2378                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2379                                                 if(env->recver.type == TypeNodeGroup){
2380                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2381                                                 }else{
2382                                                         CmiSetHandler(env,CmiGetXHandler(env));
2383                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2384                                                 }
2385                                         }else{
2386                                                 envelope *copyEnv = copyEnvelope(env);
2387                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2388                                         }
2389                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2390                                         count++;
2391                                 }
2392                         }//end of for loop of objects
2393                         
2394                 }       
2395         }
2396         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2397 }
2398
2399 /**
2400  * Send all remote determinants to a particular failed PE.
2401  * It only sends determinants to those objects on the list.
2402  */
2403 void _sendDetsHandler(char *msg){
2404         ResendData d;
2405         CkVec<Determinant> *detVec;
2406         ResendRequest *resendReq = (ResendRequest *)msg;
2407
2408         // CkPrintf("[%d] Sending determinants\n",CkMyPe());
2409
2410         // building the reply message
2411         char *listObjects = &msg[sizeof(ResendRequest)];
2412         d.numberObjects = resendReq->numberObjects;
2413         d.PE = resendReq->PE;
2414         d.listObjects = (TProcessedLog *)listObjects;
2415         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2416         detVec = new CkVec<Determinant>[d.numberObjects];
2417
2418         // adding the remote determinants to the resendReplyMsg
2419         // traversing all the remote determinants
2420         CkVec<Determinant> *vec;
2421         for(int i=0; i<d.numberObjects; i++){
2422                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2423                 if(vec != NULL){
2424                         for(int j=0; j<vec->size(); j++){
2425
2426                                 // only relevant determinants are to be sent
2427                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2428
2429                                         // adding the ticket in the ticket vector
2430                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2431
2432                                         // adding the determinant in the determinant vector
2433                                         detVec[i].push_back((*vec)[j]);
2434
2435                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2436                                 }
2437                         }
2438                 }
2439         }
2440
2441         int totalDetStored = 0;
2442         for(int i=0;i<d.numberObjects;i++){
2443                 totalDetStored += detVec[i].size();
2444         }
2445
2446         //send back the maximum ticket number for a message sent to each object on the 
2447         //restarted processor
2448         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2449         
2450         int totalTNStored=0;
2451         for(int i=0;i<d.numberObjects;i++){
2452                 totalTNStored += d.ticketVecs[i].size();
2453         }
2454         
2455         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2456         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2457         
2458         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2459         resendReply->PE = CkMyPe();
2460         resendReply->numberObjects = d.numberObjects;
2461         
2462         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2463         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2464         for(int i=0;i<d.numberObjects;i++){
2465                 replyObjects[i] = d.listObjects[i].recver;
2466         }
2467         
2468         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2469         for(int i=0;i<d.numberObjects;i++){
2470                 int vecsize = d.ticketVecs[i].size();
2471                 memcpy(ticketList,&vecsize,sizeof(int));
2472                 ticketList = &ticketList[sizeof(int)];
2473                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2474                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2475         }
2476
2477         // adding the stored remote determinants to the message
2478         for(int i=0;i<d.numberObjects;i++){
2479                 int vecsize = detVec[i].size();
2480                 memcpy(ticketList,&vecsize,sizeof(int));
2481                 ticketList = &ticketList[sizeof(int)];
2482                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2483                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2484         }
2485
2486         CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
2487         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2488
2489         delete [] detVec;
2490         delete [] d.ticketVecs;
2491
2492         DEBUG_MEM(CmiMemoryCheck());
2493
2494         if(resendReq->PE != CkMyPe()){
2495                 CmiFree(msg);
2496         }       
2497 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2498         lastRestart = CmiWallTimer();
2499
2500 }
2501
2502 /**
2503  * Resends messages since last checkpoint to the list of objects included in the 
2504  * request. It also sends stored remote determinants to the particular failed PE.
2505  */
2506 void _resendMessagesHandler(char *msg){
2507         ResendData d;
2508         ResendRequest *resendReq = (ResendRequest *)msg;
2509
2510         //CkPrintf("[%d] Resending messages\n",CkMyPe());
2511
2512         // building the reply message
2513         char *listObjects = &msg[sizeof(ResendRequest)];
2514         d.numberObjects = resendReq->numberObjects;
2515         d.PE = resendReq->PE;
2516         d.listObjects = (TProcessedLog *)listObjects;
2517         
2518         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2519
2520         //TML: examines the origin processor to determine if it belongs to the same group.
2521         // In that case, it only returns the maximum ticket received for each object in the list.
2522         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2523                 forAllCharesDo(fillTicketForChare,&d);
2524         else
2525                 forAllCharesDo(resendMessageForChare,&d);
2526
2527         DEBUG_MEM(CmiMemoryCheck());
2528
2529         if(resendReq->PE != CkMyPe()){
2530                 CmiFree(msg);
2531         }       
2532 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2533         lastRestart = CmiWallTimer();
2534 }
2535
2536 MCount maxVec(CkVec<MCount> *TNvec);
2537 void sortVec(CkVec<MCount> *TNvec);
2538 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2539
2540 /**
2541  * @brief Processes the messages in the delayed remote message queue
2542  */
2543 void processDelayedRemoteMsgQueue(){
2544         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2545         
2546         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2547                 void *qMsgPtr;
2548                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2549                 envelope *qEnv = (envelope *)qMsgPtr;
2550                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2551                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2552                 DEBUG_MEM(CmiMemoryCheck());
2553         }
2554
2555 }
2556
2557 /**
2558  * @brief Receives determinants stored on remote nodes.
2559  * Message format: |Header|ObjID list|TN list|Determinant list|
2560  * TN list = |number of TNs|list of TNs|...|
2561  */
2562 void _sendDetsReplyHandler(char *msg){
2563         ResendRequest *resendReply = (ResendRequest *)msg;
2564         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2565         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2566         
2567 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2568         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2569         
2570         for(int i =0; i< resendReply->numberObjects;i++){
2571                 Chare *obj = (Chare *)listObjects[i].getObject();
2572                 
2573                 int vecsize;
2574                 memcpy(&vecsize,listTickets,sizeof(int));
2575                 listTickets = &listTickets[sizeof(int)];
2576                 MCount *listTNs = (MCount *)listTickets;
2577                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2578         
2579                 if(obj != NULL){
2580                         //the object was restarted on the processor on which it existed
2581                         processReceivedTN(obj,vecsize,listTNs);
2582                 }else{
2583                         //pack up objID vecsize and listTNs and send it to the correct processor
2584                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2585                         char *TNMsg = (char *)CmiAlloc(totalSize);
2586                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2587                         receivedTNData->recver = listObjects[i];
2588                         receivedTNData->numTNs = vecsize;
2589                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2590                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2591                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2592                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2593                 }
2594
2595         }
2596
2597         // traversing all the retrieved determinants
2598         for(int i = 0; i < resendReply->numberObjects; i++){
2599                 Chare *obj = (Chare *)listObjects[i].getObject();
2600                 
2601                 int vecsize;
2602                 memcpy(&vecsize,listTickets,sizeof(int));
2603                 listTickets = &listTickets[sizeof(int)];
2604                 Determinant *listDets = (Determinant *)listTickets;     
2605                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2606         
2607                 if(obj != NULL){
2608                         //the object was restarted on the processor on which it existed
2609                         processReceivedDet(obj,vecsize,listDets);
2610                 } else {
2611                         // pack the determinants and ship them to the other processor
2612                         // pack up objID vecsize and listDets and send it to the correct processor
2613                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2614                         char *detMsg = (char *)CmiAlloc(totalSize);
2615                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2616                         receivedDetData->recver = listObjects[i];
2617                         receivedDetData->numDets = vecsize;
2618                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2619                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2620                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2621                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2622                 }
2623
2624         }
2625
2626         // checking if we have received all replies
2627         _numRestartResponses++;
2628         if(_numRestartResponses != CkNumPes())
2629                 return;
2630         else 
2631                 _numRestartResponses = 0;
2632
2633         // continuing with restart process; send out the request to resend logged messages to all other processors
2634         CkVec<TProcessedLog> objectVec;
2635         forAllCharesDo(createObjIDList, (void *)&objectVec);
2636         int numberObjects = objectVec.size();
2637         
2638         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2639         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2640         char *resendMsg = (char *)CmiAlloc(totalSize);  
2641
2642         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2643         resendReq->PE =CkMyPe(); 
2644         resendReq->numberObjects = numberObjects;
2645         char *objList = &resendMsg[sizeof(ResendRequest)];
2646         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2647
2648         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2649         CpvAccess(_currentObj) = lb;
2650         lb->ReceiveDummyMigration(restartDecisionNumber);
2651
2652 //HERE  sleep(10);
2653 //      CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
2654         
2655         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2656         for(int i=0;i<CkNumPes();i++){
2657                 if(i != CkMyPe()){
2658                         CmiSyncSend(i,totalSize,resendMsg);
2659                 }
2660         }
2661         _resendMessagesHandler(resendMsg);
2662         CmiFree(resendMsg);
2663
2664         /* test for parallel restart migrate away object**/
2665         if(fastRecovery){
2666                 distributeRestartedObjects();
2667                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2668         }
2669
2670 //      processDelayedRemoteMsgQueue();
2671
2672 };
2673
2674 /**
2675  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2676  */
2677 void _receivedDetDataHandler(ReceivedDetData *msg){
2678         DEBUG_NOW(char objName[100]);
2679         Chare *obj = (Chare *) msg->recver.getObject();
2680         if(obj){                
2681                 char *_msg = (char *)msg;
2682                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2683                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2684                 processReceivedDet(obj,msg->numDets,listDets);
2685                 CmiFree(msg);
2686         }else{
2687                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2688                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2689         }
2690 }
2691
2692 /**
2693  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2694  */
2695 void _receivedTNDataHandler(ReceivedTNData *msg){
2696         DEBUG_NOW(char objName[100]);
2697         Chare *obj = (Chare *) msg->recver.getObject();
2698         if(obj){                
2699                 char *_msg = (char *)msg;
2700                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2701                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2702                 processReceivedTN(obj,msg->numTNs,listTNs);
2703                 CmiFree(msg);
2704         }else{
2705                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2706                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2707         }
2708 };
2709
2710 /**
2711  * @brief Processes the received list of determinants from a particular PE.
2712  */
2713 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2714         Determinant *det;
2715
2716         // traversing the whole list of determinants
2717         for(int i=0; i<listSize; i++){
2718                 det = &listDets[i];
2719                 if(CkMyPe() == 4) printDet(det,"RECOVERY");
2720                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2721                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2722         }
2723         
2724         DEBUG_MEM(CmiMemoryCheck());
2725 }
2726         
2727 /**
2728  * @brief Processes the received list of tickets from a particular PE.
2729  */
2730 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2731         // increases the number of resendReply received
2732         obj->mlogData->resendReplyRecvd++;
2733
2734         DEBUG(char objName[100]);
2735         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2736         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2737         //if(obj->mlogData->receivedTNs == NULL)
2738         //      CkPrintf("NULL\n");     
2739         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2740
2741         // includes the tickets into the receivedTN structure
2742         for(int j=0;j<listSize;j++){
2743                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2744         }
2745         
2746         //if this object has received all the replies find the ticket numbers
2747         //that senders know about. Those less than the ticket number processed 
2748         //by the receiver can be thrown away. The rest need not be consecutive
2749         // ie there can be holes in the list of ticket numbers seen by senders
2750         if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
2751                 obj->mlogData->resendReplyRecvd = 0;
2752
2753 #if VERIFY_DETS
2754                 //sort the received TNS
2755                 sortVec(obj->mlogData->receivedTNs);
2756         
2757                 //after all the received tickets are in we need to sort them and then 
2758                 // calculate the holes  
2759                 if(obj->mlogData->receivedTNs->size() > 0){
2760                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2761                         int vecsize = obj->mlogData->receivedTNs->size();
2762                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2763                         
2764                         // updating tCount with the highest ticket handed out
2765                         if(teamSize > 1){
2766                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2767                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2768                         }else{
2769                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2770                         }
2771                         
2772                         if(numberHoles == 0){
2773                         }else{
2774                                 char objName[100];                                      
2775                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2776                                 obj->mlogData->numberHoles = numberHoles;
2777                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2778                                 int countHoles=0;
2779                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2780                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2781                                                 //the TNs are not consecutive at this point
2782                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2783                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2784                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2785                                                         countHoles++;
2786                                                 }       
2787                                         }
2788                                 }
2789                                 //Holes have been given new TN
2790                                 if(countHoles != numberHoles){
2791                                         char str[100];
2792                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2793                                 }
2794                                 CkAssert(countHoles == numberHoles);                                    
2795                                 obj->mlogData->currentHoles = numberHoles;
2796                         }
2797                 }
2798 #else
2799                 if(obj->mlogData->receivedTNs->size() > 0){
2800                         obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
2801                 }
2802 #endif
2803                 // cleaning up structures and getting ready to continue execution       
2804                 delete obj->mlogData->receivedTNs;
2805                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2806                 obj->mlogData->receivedTNs = NULL;
2807                 obj->mlogData->restartFlag = 0;
2808
2809                 // processDelayedRemoteMsgQueue();
2810
2811                 DEBUG_RESTART(char objString[100]);
2812                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2813         }
2814
2815 }
2816
2817 /** @brief Returns the maximum ticket from a vector */
2818 MCount maxVec(CkVec<MCount> *TNvec){
2819         MCount max = 0;
2820         for(int i=0; i<TNvec->size(); i++){
2821                 if((*TNvec)[i] > max)
2822                         max = (*TNvec)[i];
2823         }
2824         return max;
2825 }
2826
2827 void sortVec(CkVec<MCount> *TNvec){
2828         //sort it ->its bloddy bubble sort
2829         //TODO: use quicksort
2830         for(int i=0;i<TNvec->size();i++){
2831                 for(int j=i+1;j<TNvec->size();j++){
2832                         if((*TNvec)[j] < (*TNvec)[i]){
2833                                 MCount temp;
2834                                 temp = (*TNvec)[i];
2835                                 (*TNvec)[i] = (*TNvec)[j];
2836                                 (*TNvec)[j] = temp;
2837                         }
2838                 }
2839         }
2840         //make it unique .. since its sorted all equal units will be consecutive
2841         MCount *tempArray = new MCount[TNvec->size()];
2842         int     uniqueCount=-1;
2843         for(int i=0;i<TNvec->size();i++){
2844                 tempArray[i] = 0;
2845                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2846                         uniqueCount++;
2847                         tempArray[uniqueCount] = (*TNvec)[i];
2848                 }
2849         }
2850         uniqueCount++;
2851         TNvec->removeAll();
2852         for(int i=0;i<uniqueCount;i++){
2853                 TNvec->push_back(tempArray[i]);
2854         }
2855         delete [] tempArray;
2856 }       
2857
2858 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2859         if(TNVec->size() == 0){
2860                 return -1; //not found in an empty vec
2861         }
2862         //binary search to find 
2863         int left=0;
2864         int right = TNVec->size();
2865         int mid = (left +right)/2;
2866         while(searchTN != (*TNVec)[mid] && left < right){
2867                 if((*TNVec)[mid] > searchTN){
2868                         right = mid-1;
2869                 }else{
2870                         left = mid+1;
2871                 }
2872                 mid = (left + right)/2;
2873         }
2874         if(left < right){
2875                 //mid is the element to be returned
2876                 return mid;
2877         }else{
2878                 if(mid < TNVec->size() && mid >=0){
2879                         if((*TNVec)[mid] == searchTN){
2880                                 return mid;
2881                         }else{
2882                                 return -1;
2883                         }
2884                 }else{
2885                         return -1;
2886                 }
2887         }
2888 };
2889
2890
2891 /*
2892         Method to do parallel restart. Distribute some of the array elements to other processors.
2893         The problem is that we cant use to charm entry methods to do migration as it will get
2894         stuck in the protocol that is going to restart
2895         Note: in order to avoid interference between the objects being recovered, the current PE
2896     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2897     objects end up stepping into each other's shoes (interference).
2898 */
2899
2900 class ElementDistributor: public CkLocIterator{
2901         CkLocMgr *locMgr;
2902         int *targetPE;
2903
2904         void pupLocation(CkLocation &loc,PUP::er &p){
2905                 CkArrayIndexMax idx=loc.getIndex();
2906                 CkGroupID gID = locMgr->ckGetGroupID();
2907                 p|gID;      // store loc mgr's GID as well for easier restore
2908                 p|idx;
2909                 p|loc;
2910         };
2911 public:
2912         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2913
2914         void addLocation(CkLocation &loc){
2915
2916                 // leaving object on this PE
2917                 if(*targetPE == CkMyPe()){
2918                         *targetPE = (*targetPE +1)%CkNumPes();
2919                         return;
2920                 }
2921                         
2922                 CkArrayIndexMax idx = loc.getIndex();
2923                 CkLocRec_local *rec = loc.getLocalRecord();
2924                         
2925                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2926                 idx.print();
2927
2928                 // incrementing number of emigrant objects
2929                 CpvAccess(_numEmigrantRecObjs)++;
2930                         
2931                 //pack up this location and send it across
2932                 PUP::sizer psizer;
2933                 pupLocation(loc,psizer);
2934                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
2935                 char *msg = (char *)CmiAlloc(totalSize);
2936                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
2937                 distributeMsg->PE = CkMyPe();
2938                 char *buf = &msg[sizeof(DistributeObjectMsg)];
2939                 PUP::toMem pmem(buf);
2940                 pmem.becomeDeleting();
2941                 pupLocation(loc,pmem);
2942                         
2943                 locMgr->setDuringMigration(CmiTrue);
2944                 delete rec;
2945                 locMgr->setDuringMigration(CmiFalse);
2946                 locMgr->inform(idx,*targetPE);
2947
2948                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2949                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2950
2951                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2952
2953                 //decide on the target processor for the next object
2954                 *targetPE = *targetPE + 1;
2955                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2956                         *targetPE = CkMyPe() + 1;
2957                 }
2958         }
2959
2960 };
2961
2962 /**
2963  * Distributes objects to accelerate recovery after a failure.
2964  */
2965 void distributeRestartedObjects(){
2966         int numGroups = CkpvAccess(_groupIDTable)->size();      
2967         int i;
2968         int targetPE=CkMyPe()+1;
2969         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2970 };
2971
2972 /**
2973  * Handler to update information about an object just received.
2974  */
2975 void _distributedLocationHandler(char *receivedMsg){
2976         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
2977         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
2978         int sourcePE = distributeMsg->PE;
2979         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
2980         PUP::fromMem pmem(buf);
2981         CkGroupID gID;
2982         CkArrayIndexMax idx;
2983         pmem |gID;
2984         pmem |idx;
2985         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
2986         donotCountMigration=1;
2987         mgr->resume(idx,pmem,CmiTrue);
2988         donotCountMigration=0;
2989         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
2990         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
2991         idx.print();
2992
2993         CkLocRec *rec = mgr->elementRec(idx);
2994         CmiAssert(rec->type() == CkLocRec::local);
2995
2996         // adding object to the list of immigrant recovery objects
2997         CpvAccess(_immigrantRecObjs)->push_back((CkLocRec_local *)rec);
2998         CpvAccess(_numImmigrantRecObjs)++;
2999         
3000         CkVec<CkMigratable *> eltList;
3001         mgr->migratableList((CkLocRec_local *)rec,eltList);
3002         for(int i=0;i<eltList.size();i++){
3003                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
3004                         CpvAccess(_currentObj) = eltList[i];
3005                         eltList[i]->mlogData->immigrantRecFlag = 1;
3006                         eltList[i]->mlogData->immigrantSourcePE = sourcePE;
3007                         eltList[i]->ResumeFromSync();
3008                 }
3009         }
3010 }
3011
3012
3013 /** this method is used to send messages to a restarted processor to tell
3014  * it that a particular expected object is not going to get to it */
3015 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
3016         DummyMigrationMsg buf;
3017         buf.flag = MLOG_OBJECT;
3018         buf.lbID = lbID;
3019         buf.mgrID = locMgrID;
3020         buf.idx = idx;
3021         buf.locationPE = locationPE;
3022         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3023         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3024 };
3025
3026
3027 /**this method is used by a restarted processor to tell other processors
3028  * that they are not going to receive these many objects.. just the count
3029  * not the objects themselves ***/
3030
3031 void sendDummyMigrationCounts(int *dummyCounts){
3032         DummyMigrationMsg buf;
3033         buf.flag = MLOG_COUNT;
3034         buf.lbID = globalLBID;
3035         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3036         for(int i=0;i<CmiNumPes();i++){
3037                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3038                         buf.count = dummyCounts[i];
3039                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3040                 }
3041         }
3042 }
3043
3044
3045 /** this handler is used to process a dummy migration msg.
3046  * it looks up the load balancer and calls migrated for it */
3047
3048 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3049         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3050         if(msg->flag == MLOG_OBJECT){
3051                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3052                 LDObjHandle h;
3053                 lb->Migrated(h,1);
3054         }
3055         if(msg->flag == MLOG_COUNT){
3056                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3057                 msg->count -= verifyAckedRequests;
3058                 for(int i=0;i<msg->count;i++){
3059                         LDObjHandle h;
3060                         lb->Migrated(h,1);
3061                 }
3062         }
3063         verifyAckedRequests=0;
3064         CmiFree(msg);
3065 };
3066
3067 /*****************************************************
3068         Implementation of a method that can be used to call
3069         any method on the ChareMlogData of all the chares on
3070         a processor currently
3071 ******************************************************/
3072
3073
3074 class ElementCaller :  public CkLocIterator {
3075 private:
3076         CkLocMgr *locMgr;
3077         MlogFn fnPointer;
3078         void *data;
3079 public:
3080         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3081                 locMgr = _locMgr;
3082                 fnPointer = _fnPointer;
3083                 data = _data;
3084         };
3085         void addLocation(CkLocation &loc){
3086                 CkVec<CkMigratable *> list;
3087                 CkLocRec_local *local = loc.getLocalRecord();
3088                 locMgr->migratableList (local,list);
3089                 for(int i=0;i<list.size();i++){
3090                         CkMigratable *migratableElement = list[i];
3091                         fnPointer(data,migratableElement->mlogData);
3092                 }
3093         }
3094 };
3095
3096 /**
3097  * Map function pointed by fnPointer over all the chares living in this processor.
3098  */
3099 void forAllCharesDo(MlogFn fnPointer, void *data){
3100         int numGroups = CkpvAccess(_groupIDTable)->size();
3101         for(int i=0;i<numGroups;i++){
3102                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3103                 fnPointer(data,obj->mlogData);
3104         }
3105         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3106         for(int i=0;i<numNodeGroups;i++){
3107                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3108                 fnPointer(data,obj->mlogData);
3109         }
3110         int i;
3111         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3112 };
3113
3114
3115 /******************************************************************
3116  Load Balancing
3117 ******************************************************************/
3118
3119 /**
3120  * This is the first time Converse is called after AtSync method has been called by every local object.
3121  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3122  * message logging, we can take advantage of this situation and garbage collect at this point.
3123  */
3124 void initMlogLBStep(CkGroupID gid){
3125         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3126         countLBMigratedAway = 0;
3127         countLBToMigrate=0;
3128         onGoingLoadBalancing=1;
3129         migrationDoneCalled=0;
3130         checkpointBarrierCount=0;
3131         if(globalLBID.idx != 0){
3132                 CmiAssert(globalLBID.idx == gid.idx);
3133         }
3134         globalLBID = gid;
3135 #if SYNCHRONIZED_CHECKPOINT
3136         garbageCollectMlog();
3137 #endif
3138 }
3139
3140 void startLoadBalancingMlog(void (*_fnPtr)(void *),void *_centralLb){
3141         DEBUGLB(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3142         DEBUG_TEAM(printf("[%d] start Load balancing section of message logging \n",CmiMyPe()));
3143         
3144         resumeLbFnPtr = _fnPtr;
3145         centralLb = _centralLb;
3146         migrationDoneCalled = 1;
3147         if(countLBToMigrate == countLBMigratedAway){
3148                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in startLoadBalancingMlog countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3149                 startMlogCheckpoint(NULL,CmiWallTimer());       
3150         }
3151 };
3152
3153 void finishedCheckpointLoadBalancing(){
3154         DEBUGLB(printf("[%d] finished checkpoint after lb \n",CmiMyPe());)
3155         CheckpointBarrierMsg msg;
3156         msg.fromPE = CmiMyPe();
3157         msg.checkpointCount = checkpointCount;
3158
3159         CmiSetHandler(&msg,_checkpointBarrierHandlerIdx);
3160         CmiSyncSend(0,sizeof(CheckpointBarrierMsg),(char *)&msg);
3161         
3162 };
3163
3164
3165 void sendMlogLocation(int targetPE, envelope *env){
3166 #if !SYNCHRONIZED_CHECKPOINT
3167         void *_msg = EnvToUsr(env);
3168         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3169
3170         int existing = 0;
3171         //if this object is already in the retainedobjectlust destined for this
3172         //processor it should not be sent
3173         
3174         for(int i=0;i<retainedObjectList.size();i++){
3175                 MigrationRecord &migRecord = retainedObjectList[i]->migRecord;
3176                 if(migRecord.gID == msg->gid && migRecord.idx == msg->idx){
3177                         DEBUG(CmiPrintf("[%d] gid %d idx %s being sent to %d exists in retainedObjectList with toPE %d\n",CmiMyPe(),msg->gid.idx,idx2str(msg->idx),targetPE,migRecord.toPE));
3178                         existing = 1;
3179                         break;
3180                 }
3181         }
3182
3183         if(existing){
3184                 return;
3185         }
3186         
3187         countLBToMigrate++;
3188         
3189         MigrationNotice migMsg;
3190         migMsg.migRecord.gID = msg->gid;
3191         migMsg.migRecord.idx = msg->idx;
3192         migMsg.migRecord.fromPE = CkMyPe();
3193         migMsg.migRecord.toPE =  targetPE;
3194         
3195         DEBUGLB(printf("[%d] Sending array to proc %d gid %d idx %s\n",CmiMyPe(),targetPE,msg->gid.idx,idx2str(msg->idx)));
3196         
3197         RetainedMigratedObject  *retainedObject = new RetainedMigratedObject;
3198         retainedObject->migRecord = migMsg.migRecord;
3199         retainedObject->acked  = 0;
3200         
3201         CkPackMessage(&env);
3202         
3203         migMsg.record = retainedObject;
3204         retainedObject->msg = env;
3205         int size = retainedObject->size = env->getTotalsize();
3206         
3207         retainedObjectList.push_back(retainedObject);
3208         
3209         CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
3210         CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
3211         
3212         DEBUGLB(printf("[%d] Location in message of size %d being sent to PE %d\n",CkMyPe(),size,targetPE));
3213 #endif
3214 }
3215
3216 void _receiveMigrationNoticeHandler(MigrationNotice *msg){
3217         msg->migRecord.ackFrom = msg->migRecord.ackTo = 0;
3218         migratedNoticeList.push_back(msg->migRecord);
3219
3220         MigrationNoticeAck buf;
3221         buf.record = msg->record;
3222         CmiSetHandler((void *)&buf,_receiveMigrationNoticeAckHandlerIdx);
3223         CmiSyncSend(getCheckPointPE(),sizeof(MigrationNoticeAck),(char *)&buf);
3224 }
3225
3226 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg){
3227         
3228         RetainedMigratedObject *retainedObject = (RetainedMigratedObject *)(msg->record);
3229         retainedObject->acked = 1;
3230
3231         CmiSetHandler(retainedObject->msg,_receiveMlogLocationHandlerIdx);
3232         CmiSyncSend(retainedObject->migRecord.toPE,retainedObject->size,(char *)retainedObject->msg);
3233
3234         //inform home about the new location of this object
3235         CkGroupID gID = retainedObject->migRecord.gID ;
3236         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3237         informLocationHome(gID,retainedObject->migRecord.idx, mgr->homePe(retainedObject->migRecord.idx),retainedObject->migRecord.toPE);
3238         
3239         countLBMigratedAway++;
3240         if(countLBMigratedAway == countLBToMigrate && migrationDoneCalled == 1){
3241                 DEBUGLB(printf("[%d] calling startMlogCheckpoint in _receiveMigrationNoticeAckHandler countLBToMigrate %d countLBMigratedAway %d \n",CmiMyPe(),countLBToMigrate,countLBMigratedAway));
3242                 startMlogCheckpoint(NULL,CmiWallTimer());
3243         }
3244 };
3245
3246 void _receiveMlogLocationHandler(void *buf){
3247         envelope *env = (envelope *)buf;
3248         DEBUG(printf("[%d] Location received in message of size %d\n",CkMyPe(),env->getTotalsize()));
3249         CkUnpackMessage(&env);
3250         void *_msg = EnvToUsr(env);
3251         CkArrayElementMigrateMessage *msg = (CkArrayElementMigrateMessage *)_msg;
3252         CkGroupID gID= msg->gid;
3253         DEBUG(printf("[%d] Object to be inserted into location manager %d\n",CkMyPe(),gID.idx));
3254         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3255         CpvAccess(_currentObj)=mgr;
3256         mgr->immigrate(msg);
3257 };
3258
3259
3260 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData){
3261 /*      if(mlogData->objID.type == TypeArray){
3262                 CkMigratable *elt = (CkMigratable *)mlogData->objID.getObject();
3263         //      TODO: make sure later that atSync has been called and it needs 
3264         //      to be resumed from sync
3265         //
3266                 CpvAccess(_currentObj) = elt;
3267                 elt->ResumeFromSync();
3268         }*/
3269 }
3270
3271 /**
3272  * @brief Processor 0 sends a broadcast to every other processor after checkpoint barrier.
3273  */
3274 inline void checkAndSendCheckpointBarrierAcks(CheckpointBarrierMsg *msg){
3275         if(checkpointBarrierCount == CmiNumPes()){
3276                 CmiSetHandler(msg,_checkpointBarrierAckHandlerIdx);
3277                 for(int i=0;i<CmiNumPes();i++){
3278                         CmiSyncSend(i,sizeof(CheckpointBarrierMsg),(char *)msg);
3279                 }
3280         }
3281 }
3282
3283 /**
3284  * @brief Processor 0 receives a contribution from every other processor after checkpoint.
3285  */ 
3286 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg){
3287         DEBUG(CmiPrintf("[%d] msg->checkpointCount %d pe %d checkpointCount %d checkpointBarrierCount %d \n",CmiMyPe(),msg->checkpointCount,msg->fromPE,checkpointCount,checkpointBarrierCount));
3288         if(msg->checkpointCount == checkpointCount){
3289                 checkpointBarrierCount++;
3290                 checkAndSendCheckpointBarrierAcks(msg);
3291         }else{
3292                 if(msg->checkpointCount-1 == checkpointCount){
3293                         checkpointBarrierCount++;
3294                         checkAndSendCheckpointBarrierAcks(msg);
3295                 }else{
3296                         printf("[%d] msg->checkpointCount %d checkpointCount %d\n",CmiMyPe(),msg->checkpointCount,checkpointCount);
3297                         CmiAbort("msg->checkpointCount and checkpointCount differ by more than 1");
3298                 }
3299         }
3300
3301         // deleting the received message
3302         CmiFree(msg);
3303 }
3304
3305 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg){
3306         DEBUG(CmiPrintf("[%d] _checkpointBarrierAckHandler \n",CmiMyPe()));
3307         DEBUGLB(CkPrintf("[%d] Reaching this point\n",CkMyPe()));
3308
3309 #if !SYNCHRONIZED_CHECKPOINT
3310         // sending a notice to all senders to remove message logs
3311         sendRemoveLogRequests();
3312 #endif
3313
3314         // resuming LB function pointer
3315         (*resumeLbFnPtr)(centralLb);
3316
3317         // deleting message
3318         CmiFree(msg);
3319 }
3320
3321 /**
3322  * @brief Function to remove all messages in the message log of a particular chare.
3323  */
3324 void garbageCollectMlogForChare(void *data, ChareMlogData *mlogData){
3325         int total;
3326         MlogEntry *logEntry;
3327         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
3328
3329         // traversing the whole message log and removing all elements
3330         total = mlog->length();
3331         for(int i=0; i<total; i++){
3332                 logEntry = mlog->deq();
3333                 delete logEntry;
3334         }
3335
3336 }
3337
3338 /**
3339  * @brief Garbage collects the message log and other data structures.
3340  * In case of synchronized checkpoint, we use an optimization to avoid causal message logging protocol
3341  * to communicate all determinants to the rest of the processors.
3342  */
3343 void garbageCollectMlog(){
3344         CkHashtableIterator *iterator;
3345         CkVec<Determinant> *detArray;
3346
3347         DEBUG(CkPrintf("[%d] Garbage collecting message log and data structures\n", CkMyPe()));
3348
3349         // cleaning up the buffered determinants, since they belong to a previous checkpoint period
3350         _indexBufferedDets = 0;
3351         _numBufferedDets = 0;
3352         _phaseBufferedDets++;
3353
3354         // cleaning up remote determinants, since they belong to a previous checkpoint period
3355         iterator = CpvAccess(_remoteDets)->iterator();
3356         while(iterator->hasNext()){
3357                 detArray = *(CkVec<Determinant> **)iterator->next();
3358                 detArray->removeAll();
3359         }
3360         
3361         // deleting the iterator
3362         delete iterator;
3363
3364         // removing all messages in message log for every chare
3365         forAllCharesDo(garbageCollectMlogForChare, NULL);
3366 }
3367
3368 /**
3369         method that informs an array elements home processor of its current location
3370         It is a converse method to bypass the charm++ message logging framework
3371 */
3372
3373 void informLocationHome(CkGroupID locMgrID,CkArrayIndexMax idx,int homePE,int currentPE){
3374         double _startTime = CmiWallTimer();
3375         CurrentLocationMsg msg;
3376         msg.mgrID = locMgrID;
3377         msg.idx = idx;
3378         msg.locationPE = currentPE;
3379         msg.fromPE = CkMyPe();
3380
3381         DEBUG(CmiPrintf("[%d] informing home %d of location %d of gid %d idx %s \n",CmiMyPe(),homePE,currentPE,locMgrID.idx,idx2str(idx)));
3382         CmiSetHandler(&msg,_receiveLocationHandlerIdx);
3383         CmiSyncSend(homePE,sizeof(CurrentLocationMsg),(char *)&msg);
3384         traceUserBracketEvent(37,_startTime,CmiWallTimer());
3385 }
3386
3387
3388 void _receiveLocationHandler(CurrentLocationMsg *data){
3389         double _startTime = CmiWallTimer();
3390         CkLocMgr *mgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(data->mgrID).getObj();
3391         if(mgr == NULL){
3392                 CmiFree(data);
3393                 return;
3394         }
3395         CkLocRec *rec = mgr->elementNrec(data->idx);
3396         DEBUG(CmiPrintf("[%d] location from %d is %d for gid %d idx %s rec %p \n",CkMyPe(),data->fromPE,data->locationPE,data->mgrID,idx2str(data->idx),rec));
3397         if(rec != NULL){
3398                 if(mgr->lastKnown(data->idx) == CmiMyPe() && data->locationPE != CmiMyPe() && rec->type() == CkLocRec::local){
3399                         if(data->fromPE == data->locationPE){
3400                                 CmiAbort("Another processor has the same object");
3401                         }
3402                 }
3403         }
3404         if(rec!= NULL && rec->type() == CkLocRec::local && data->fromPE != CmiMyPe()){
3405                 int targetPE = data->fromPE;
3406                 data->fromPE = CmiMyPe();
3407                 data->locationPE = CmiMyPe();
3408                 DEBUG(printf("[%d] WARNING!! informing proc %d of current location\n",CmiMyPe(),targetPE));
3409                 CmiSyncSend(targetPE,sizeof(CurrentLocationMsg),(char *)data);
3410         }else{
3411                 mgr->inform(data->idx,data->locationPE);
3412         }
3413         CmiFree(data);
3414         traceUserBracketEvent(38,_startTime,CmiWallTimer());
3415 }
3416
3417
3418
3419 void getGlobalStep(CkGroupID gID){
3420         LBStepMsg msg;
3421         int destPE = 0;
3422         msg.lbID = gID;
3423         msg.fromPE = CmiMyPe();
3424         msg.step = -1;
3425         CmiSetHandler(&msg,_getGlobalStepHandlerIdx);
3426         CmiSyncSend(destPE,sizeof(LBStepMsg),(char *)&msg);
3427 };
3428
3429 void _getGlobalStepHandler(LBStepMsg *msg){
3430         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3431         msg->step = lb->step();
3432         CmiAssert(msg->fromPE != CmiMyPe());
3433         CmiPrintf("[%d] getGlobalStep called from %d step %d gid %d \n",CmiMyPe(),msg->fromPE,lb->step(),msg->lbID.idx);
3434         CmiSetHandler(msg,_recvGlobalStepHandlerIdx);
3435         CmiSyncSend(msg->fromPE,sizeof(LBStepMsg),(char *)msg);
3436 };
3437
3438 /**
3439  * @brief Receives the global step handler from PE 0
3440  */
3441 void _recvGlobalStepHandler(LBStepMsg *msg){
3442         
3443         // updating restart decision number
3444         restartDecisionNumber = msg->step;
3445         CmiFree(msg);
3446
3447         CmiPrintf("[%d] recvGlobalStepHandler \n",CmiMyPe());
3448
3449         // sending a dummy message to sendDetsReplyHandler
3450         ResendRequest *resendReplyMsg = (ResendRequest *)CmiAlloc(sizeof(ResendRequest));
3451         resendReplyMsg->PE = CkMyPe();
3452         resendReplyMsg->numberObjects = 0;
3453         _sendDetsReplyHandler((char *)resendReplyMsg);
3454 };
3455
3456 /**
3457  * @brief Function to wrap up performance information.
3458  */
3459 void _messageLoggingExit(){
3460         
3461         // printing the signature for causal message logging
3462         if(CkMyPe() == 0)
3463                 printf("[%d] _causalMessageLoggingExit \n",CmiMyPe());
3464
3465         //TML: printing some statistics for group approach
3466 #if COLLECT_STATS_TEAM
3467         printf("[%d] LOGGED MESSAGES: %.0f\n",CkMyPe(),MLOGFT_totalMessages);
3468         printf("[%d] MESSAGE LOG SIZE: %.2f MB\n",CkMyPe(),MLOGFT_totalLogSize/(float)MEGABYTE);
3469 #endif
3470
3471 #if COLLECT_STATS_MSGS
3472 #if COLLECT_STATS_MSGS_TOTAL
3473         printf("[%d] TOTAL MESSAGES SENT: %d\n",CmiMyPe(),totalMsgsTarget);
3474         printf("[%d] TOTAL MESSAGES SENT SIZE: %.2f MB\n",CmiMyPe(),totalMsgsSize/(float)MEGABYTE);
3475 #else
3476         printf("[%d] TARGETS: ",CmiMyPe());
3477         for(int i=0; i<CmiNumPes(); i++){
3478 #if COLLECT_STATS_MSG_COUNT
3479                 printf("%d ",numMsgsTarget[i]);
3480 #else
3481                 printf("%d ",sizeMsgsTarget[i]);
3482 #endif
3483         }
3484         printf("\n");
3485 #endif
3486 #endif
3487
3488 #if COLLECT_STATS_DETS
3489         printf("\n");
3490         printf("[%d] DETS: %d\n",CmiMyPe(),numDets);
3491         printf("[%d] PIGGYBACKED DETS: %d\n",CmiMyPe(),numPiggyDets);
3492 #if COLLECT_STATS_DETS_DUP
3493         printf("[%d] DUPLICATED DETS: %d\n",CmiMyPe(),numDupDets);
3494 #endif
3495 #endif
3496
3497 }
3498
3499 /**
3500         The method for returning the actual object pointed to by an id
3501         If the object doesnot exist on the processor it returns NULL
3502 **/
3503
3504 void* CkObjID::getObject(){
3505         
3506                 switch(type){
3507                         case TypeChare: 
3508                                 return CkLocalChare(&data.chare.id);
3509                         case TypeMainChare:
3510                                 return CkLocalChare(&data.chare.id);
3511                         case TypeGroup:
3512         
3513                                 CkAssert(data.group.onPE == CkMyPe());
3514                                 return CkLocalBranch(data.group.id);
3515                         case TypeNodeGroup:
3516                                 CkAssert(data.group.onPE == CkMyNode());
3517                                 //CkLocalNodeBranch(data.group.id);
3518                                 {
3519                                         CmiImmediateLock(CksvAccess(_nodeGroupTableImmLock));
3520                                   void *retval = CksvAccess(_nodeGroupTable)->find(data.group.id).getObj();
3521                                   CmiImmediateUnlock(CksvAccess(_nodeGroupTableImmLock));                                       
3522         
3523                                         return retval;
3524                                 }       
3525                         case TypeArray:
3526                                 {
3527         
3528         
3529                                         CkArrayID aid(data.array.id);
3530         
3531                                         if(aid.ckLocalBranch() == NULL){ return NULL;}
3532         
3533                                         CProxyElement_ArrayBase aProxy(aid,data.array.idx.asChild());
3534         
3535                                         return aProxy.ckLocal();
3536                                 }
3537                         default:
3538                                 CkAssert(0);
3539                 }
3540 }
3541
3542
3543 int CkObjID::guessPE(){
3544                 switch(type){
3545                         case TypeChare:
3546                         case TypeMainChare:
3547                                 return data.chare.id.onPE;
3548                         case TypeGroup:
3549                         case TypeNodeGroup:
3550                                 return data.group.onPE;
3551                         case TypeArray:
3552                                 {
3553                                         CkArrayID aid(data.array.id);
3554                                         if(aid.ckLocalBranch() == NULL){
3555                                                 return -1;
3556                                         }
3557                                         return aid.ckLocalBranch()->lastKnown(data.array.idx.asChild());
3558                                 }
3559                         default:
3560                                 CkAssert(0);
3561                 }
3562 };
3563
3564 char *CkObjID::toString(char *buf) const {
3565         
3566         switch(type){
3567                 case TypeChare:
3568                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);
3569                         break;
3570                 case TypeMainChare:
3571                         sprintf(buf,"Chare %p PE %d \0",data.chare.id.objPtr,data.chare.id.onPE);       
3572                         break;
3573                 case TypeGroup:
3574                         sprintf(buf,"Group %d   PE %d \0",data.group.id.idx,data.group.onPE);
3575                         break;
3576                 case TypeNodeGroup:
3577                         sprintf(buf,"NodeGroup %d       Node %d \0",data.group.id.idx,data.group.onPE);
3578                         break;
3579                 case TypeArray:
3580                         {
3581                                 const CkArrayIndexMax &idx = data.array.idx.asChild();
3582                                 const int *indexData = idx.data();
3583                                 sprintf(buf,"Array |%d %d %d| id %d \0",indexData[0],indexData[1],indexData[2],data.array.id.idx);
3584                                 break;
3585                         }
3586                 default:
3587                         CkAssert(0);
3588         }
3589         
3590         return buf;
3591 };
3592
3593 void CkObjID::updatePosition(int PE){
3594         if(guessPE() == PE){
3595                 return;
3596         }
3597         switch(type){
3598                 case TypeArray:
3599                         {
3600                                         CkArrayID aid(data.array.id);
3601                                         if(aid.ckLocalBranch() == NULL){
3602                                                 
3603                                         }else{
3604                                                 char str[100];
3605                                                 CkLocMgr *mgr = aid.ckLocalBranch()->getLocMgr();
3606 //                                              CmiPrintf("[%d] location for object %s is %d\n",CmiMyPe(),toString(str),PE);
3607                                                 CkLocRec *rec = mgr->elementNrec(data.array.idx.asChild());
3608                                                 if(rec != NULL){
3609                                                         if(rec->type() == CkLocRec::local){
3610                                                                 CmiPrintf("[%d] local object %s can not exist on another processor %d\n",CmiMyPe(),str,PE);
3611                                                                 return;
3612                                                         }
3613                                                 }
3614                                                 mgr->inform(data.array.idx.asChild(),PE);
3615                                         }       
3616                                 }
3617
3618                         break;
3619                 case TypeChare:
3620                 case TypeMainChare:
3621                         CkAssert(data.chare.id.onPE == PE);
3622                         break;
3623                 case TypeGroup:
3624                 case TypeNodeGroup:
3625                         CkAssert(data.group.onPE == PE);
3626                         break;
3627                 default:
3628                         CkAssert(0);
3629         }
3630 }
3631
3632
3633
3634 void MlogEntry::pup(PUP::er &p){
3635         p | destPE;
3636         p | _infoIdx;
3637         int size;
3638         if(!p.isUnpacking()){
3639 /*              CkAssert(env);
3640                 if(!env->isPacked()){
3641                         CkPackMessage(&env);
3642                 }*/
3643                 if(env == NULL){
3644                         //message was probably local and has been removed from logs
3645                         size = 0;
3646                 }else{
3647                         size = env->getTotalsize();
3648                 }       
3649         }
3650         p | size;
3651         if(p.isUnpacking()){
3652                 if(size > 0){
3653                         env = (envelope *)_allocEnv(ForChareMsg,size);
3654                 }else{
3655                         env = NULL;
3656                 }
3657         }
3658         if(size > 0){
3659                 p((char *)env,size);
3660         }
3661 };
3662
3663 void RestoredLocalMap::pup(PUP::er &p){
3664         p | minSN;
3665         p | maxSN;
3666         p | count;
3667         if(p.isUnpacking()){
3668                 TNArray = new MCount[count];
3669         }
3670         p(TNArray,count);
3671 };
3672
3673 /**********************************
3674         * The methods of the message logging
3675         * data structure stored in each chare
3676         ********************************/
3677
3678 MCount ChareMlogData::nextSN(const CkObjID &recver){
3679         MCount *SN = snTable.getPointer(recver);
3680         if(SN==NULL){
3681                 snTable.put(recver) = 1;
3682                 return 1;
3683         }else{
3684                 (*SN)++;
3685                 return *SN;
3686         }
3687 };
3688  
3689 /**
3690  * @brief Gets a new ticket for a particular object.
3691  */
3692 MCount ChareMlogData::newTN(){
3693         MCount TN;
3694         if(currentHoles > 0){
3695                 int holeidx = numberHoles-currentHoles;
3696                 TN = ticketHoles[holeidx];
3697                 currentHoles--;
3698                 if(currentHoles == 0){
3699                         delete []ticketHoles;
3700                         numberHoles = 0;
3701                 }
3702         }else{
3703                 TN = ++tCount;
3704         }
3705         return TN;
3706 };
3707
3708 /**
3709  * @brief Get the ticket associated with a combination of sender and SN, if any.
3710  */
3711 inline Ticket ChareMlogData::getTicket(CkObjID &sender, MCount SN){
3712         Ticket ticket;
3713
3714         SNToTicket *ticketRow = ticketTable.get(sender);
3715         if(ticketRow != NULL){
3716                 return ticketRow->get(SN);
3717         }
3718         return ticket;
3719 }
3720
3721 /**
3722  * Inserts a ticket in the ticketTable if it is not already there.
3723  */
3724 inline void ChareMlogData::verifyTicket(CkObjID &sender, MCount SN, MCount TN){
3725         Ticket ticket;
3726
3727         SNToTicket *ticketRow = ticketTable.get(sender);
3728         if(ticketRow != NULL){
3729                 Ticket earlierTicket = ticketRow->get(SN);
3730                 if(earlierTicket.TN != 0){
3731                         CkAssert(earlierTicket.TN == TN);
3732                         return;
3733                 }