Adding initial support for optimized message-logging protocol for collectives.
[charm.git] / src / ck-core / ckcausalmlog.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_CAUSAL_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_LOGGING 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x)  //x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
41 #define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
42 #define DEBUG_RECOVERY(x) //x
43
44 extern const char *idx2str(const CkArrayIndex &ind);
45 extern const char *idx2str(const ArrayElement *el);
46
47 void getGlobalStep(CkGroupID gID);
48
49 bool fault_aware(CkObjID &recver);
50 void sendCheckpointData(int mode);
51 void createObjIDList(void *data,ChareMlogData *mlogData);
52 inline bool isLocal(int destPE);
53 inline bool isTeamLocal(int destPE);
54 void printLog(TProcessedLog *log);
55
56 int _restartFlag=0;
57 int _numRestartResponses=0;
58
59 //TODO: remove for perf runs
60 int countHashRefs=0; //count the number of gets
61 int countHashCollisions=0;
62
63 char *checkpointDirectory=".";
64 int unAckedCheckpoint=0;
65
66 int countLocal=0,countBuffered=0;
67 int countPiggy=0;
68 int countClearBufferedLocalCalls=0;
69
70 int countUpdateHomeAcks=0;
71
72 extern int teamSize;
73 extern int chkptPeriod;
74 extern bool fastRecovery;
75 extern int parallelRecovery;
76
77 char *killFile;
78 char *faultFile;
79 int killFlag=0;
80 int faultFlag=0;
81 int restartingMlogFlag=0;
82 void readKillFile();
83 double killTime=0.0;
84 double faultMean;
85 int checkpointCount=0;
86
87 CpvDeclare(Chare *,_currentObj);
88 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
89 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
90 CpvDeclare(Queue, _outOfOrderMessageQueue);
91 CpvDeclare(Queue, _delayedRemoteMessageQueue);
92 CpvDeclare(char **,_bufferedTicketRequests);
93 CpvDeclare(int *,_numBufferedTicketRequests);
94
95 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
96 /** @note All the determinants generated by a PE are stored in variable _localDets.
97  * As soon as a message is sent, then all the determinants are appended to the message,
98  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
99  * to delete the determinants. In the meantime the same determinants may be appended
100  * to other messages and more determinants can be added to _localDets.
101  * A simple solution to this problem was to have a primitive array and keep adding determinants
102  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
103  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
104  * determinant and a number of how many valid determinants there are behind it. We do not remove
105  * determinants until a checkpoint is made, since these determinants may have to be added to
106  * messages in case of a recovery.
107  */
108 // temporal storage for the outgoing determinants
109 CpvDeclare(char *, _localDets);
110 // number of buffered determinants
111 int _numBufferedDets;
112 // current index of first determinant in _localDets
113 int _indexBufferedDets;
114 // current phase of determinants
115 int _phaseBufferedDets;
116
117 // stores the determinants from other nodes, to send them in case of a crash
118 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
119 // max number of buffered determinants
120 int _maxBufferedDets;
121
122 // stores the incarnation number from every other processor
123 CpvDeclare(char *, _incarnation);
124
125 // storage for remove determinants header
126 CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
127 // storage for store determinants header
128 CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
129 // storage for the sizes in vector-send to store determinants
130 CpvDeclare(int *, _storeDetsSizes);
131 // storage for the pointers in vector-send to store determinants
132 CpvDeclare(char **, _storeDetsPtrs);
133
134 /***** *****/
135
136 /***** VARIABLES FOR PARALLEL RECOVERY *****/
137 CpvDeclare(int, _numEmigrantRecObjs);
138 CpvDeclare(int, _numImmigrantRecObjs);
139 CpvDeclare(CkVec<CkLocation *> *, _immigrantRecObjs);
140 /***** *****/
141
142 #if COLLECT_STATS_MSGS
143 int *numMsgsTarget;
144 int *sizeMsgsTarget;
145 int totalMsgsTarget;
146 float totalMsgsSize;
147 #endif
148 #if COLLECT_STATS_DETS
149 int numPiggyDets;
150 int numDets;
151 int numDupDets;
152 #endif
153 #if COLLECT_STATS_MEMORY
154 int msgLogSize;
155 int bufferedDetsSize;
156 int storedDetsSize;
157 #endif
158 //TML: variables for measuring savings with teams in message logging
159 #if COLLECT_STATS_LOGGING
160 float MLOGFT_totalLogSize = 0.0;
161 float MLOGFT_totalMessages = 0.0;
162 float MLOGFT_totalMcastLogSize = 0.0;
163 #endif
164
165 static double adjustChkptPeriod=0.0; //in ms
166 static double nextCheckpointTime=0.0;//in seconds
167 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
168
169 int _pingHandlerIdx;
170
171 char objString[100];
172 int _checkpointRequestHandlerIdx;
173 int _storeCheckpointHandlerIdx;
174 int _checkpointAckHandlerIdx;
175 int _getCheckpointHandlerIdx;
176 int _recvCheckpointHandlerIdx;
177 int _removeProcessedLogHandlerIdx;
178
179 int _verifyAckRequestHandlerIdx;
180 int _verifyAckHandlerIdx;
181 int _dummyMigrationHandlerIdx;
182
183
184 int     _getGlobalStepHandlerIdx;
185 int     _recvGlobalStepHandlerIdx;
186
187 int _updateHomeRequestHandlerIdx;
188 int _updateHomeAckHandlerIdx;
189 int _resendMessagesHandlerIdx;
190 int _sendDetsHandlerIdx;
191 int _sendDetsReplyHandlerIdx;
192 int _receivedTNDataHandlerIdx;
193 int _receivedDetDataHandlerIdx;
194 int _distributedLocationHandlerIdx;
195 int _sendBackLocationHandlerIdx;
196 int _storeDeterminantsHandlerIdx;
197 int _removeDeterminantsHandlerIdx;
198
199 //TML: integer constants for team-based message logging
200 int _restartHandlerIdx;
201 int _getRestartCheckpointHandlerIdx;
202 int _recvRestartCheckpointHandlerIdx;
203 void setTeamRecovery(void *data, ChareMlogData *mlogData);
204 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
205
206 int verifyAckTotal;
207 int verifyAckCount;
208
209 int verifyAckedRequests=0;
210
211 RestartRequest *storedRequest;
212
213 int _falseRestart =0; /**
214                                                                                                         For testing on clusters we might carry out restarts on 
215                                                                                                         a porcessor without actually starting it
216                                                                                                         1 -> false restart
217                                                                                                         0 -> restart after an actual crash
218                                                                                                 */
219
220 //Load balancing globals
221 int onGoingLoadBalancing=0;
222 void *centralLb;
223 void (*resumeLbFnPtr)(void *);
224 int _receiveMlogLocationHandlerIdx;
225 int _receiveMigrationNoticeHandlerIdx;
226 int _receiveMigrationNoticeAckHandlerIdx;
227 int _checkpointBarrierHandlerIdx;
228 int _checkpointBarrierAckHandlerIdx;
229
230 CkVec<MigrationRecord> migratedNoticeList;
231 CkVec<RetainedMigratedObject *> retainedObjectList;
232 int donotCountMigration=0;
233 int countLBMigratedAway=0;
234 int countLBToMigrate=0;
235 int migrationDoneCalled=0;
236 int checkpointBarrierCount=0;
237 int globalResumeCount=0;
238 CkGroupID globalLBID;
239 int restartDecisionNumber=-1;
240
241 double lastCompletedAlarm=0;
242 double lastRestart=0;
243
244 //update location globals
245 int _receiveLocationHandlerIdx;
246
247
248 /** 
249  * @brief Initialize message logging data structures and register handlers
250  */
251 void _messageLoggingInit(){
252         //current object
253         CpvInitialize(Chare *,_currentObj);
254         
255         //registering handlers for message logging
256         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
257                 
258         //handlers for checkpointing
259         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
260         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
261         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
262         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
263
264         //handlers for restart
265         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
266         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
267         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
268         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
269         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
270         _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
271         _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
272         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
273         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
274         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
275         _sendBackLocationHandlerIdx=CkRegisterHandler((CmiHandler)_sendBackLocationHandler);
276         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
277         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
278         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
279
280         //TML: handlers for team-based message logging
281         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
282         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
283         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
284
285         // handlers for causal message logging
286         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
287         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
288         
289         //handlers for load balancing
290         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
291         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
292         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
293         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
294         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
295         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
296         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
297         
298         //handlers for updating locations
299         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
300         
301         //Cpv variables for message logging
302         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
303         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
304         CpvInitialize(Queue, _outOfOrderMessageQueue);
305         CpvInitialize(Queue, _delayedRemoteMessageQueue);
306         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
307         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
308         
309         CpvInitialize(char **,_bufferedTicketRequests);
310         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
311         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
312         for(int i=0;i<CkNumPes();i++){
313                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
314                 CpvAccess(_numBufferedTicketRequests)[i]=0;
315         }
316  
317         // Cpv variables for causal protocol
318         _numBufferedDets = 0;
319         _indexBufferedDets = 0;
320         _phaseBufferedDets = 0;
321         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
322         CpvInitialize(char *, _localDets);
323         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
324         CpvInitialize(char *, _incarnation);
325         CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
326         CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
327         CpvInitialize(int *, _storeDetsSizes);
328         CpvInitialize(char **, _storeDetsPtrs);
329         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
330         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
331         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
332         for(int i=0; i<CmiNumPes(); i++){
333                 CpvAccess(_incarnation)[i] = 0;
334         }
335         CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
336         CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
337         CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
338         CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
339
340         // Cpv variables for parallel recovery
341         CpvInitialize(int, _numEmigrantRecObjs);
342     CpvAccess(_numEmigrantRecObjs) = 0;
343     CpvInitialize(int, _numImmigrantRecObjs);
344     CpvAccess(_numImmigrantRecObjs) = 0;
345
346     CpvInitialize(CkVec<CkLocation *> *, _immigrantRecObjs);
347     CpvAccess(_immigrantRecObjs) = new CkVec<CkLocation *>;
348
349         //Cpv variables for checkpoint
350         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
351         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
352
353         // registering user events for projections      
354         traceRegisterUserEvent("Remove Logs", 20);
355         traceRegisterUserEvent("Ticket Request Handler", 21);
356         traceRegisterUserEvent("Ticket Handler", 22);
357         traceRegisterUserEvent("Local Message Copy Handler", 23);
358         traceRegisterUserEvent("Local Message Ack Handler", 24);        
359         traceRegisterUserEvent("Preprocess current message",25);
360         traceRegisterUserEvent("Preprocess past message",26);
361         traceRegisterUserEvent("Preprocess future message",27);
362         traceRegisterUserEvent("Checkpoint",28);
363         traceRegisterUserEvent("Checkpoint Store",29);
364         traceRegisterUserEvent("Checkpoint Ack",30);
365         traceRegisterUserEvent("Send Ticket Request",31);
366         traceRegisterUserEvent("Generalticketrequest1",32);
367         traceRegisterUserEvent("TicketLogLocal",33);
368         traceRegisterUserEvent("next_ticket and SN",34);
369         traceRegisterUserEvent("Timeout for buffered remote messages",35);
370         traceRegisterUserEvent("Timeout for buffered local messages",36);
371         traceRegisterUserEvent("Inform Location Home",37);
372         traceRegisterUserEvent("Receive Location Handler",38);
373         
374         lastCompletedAlarm=CmiWallTimer();
375         lastRestart = CmiWallTimer();
376
377 #if COLLECT_STATS_MSGS
378 #if COLLECT_STATS_MSGS_TOTAL
379         totalMsgsTarget = 0;
380         totalMsgsSize = 0.0;
381 #else
382         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
383         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
384         for(int i=0; i<CmiNumPes(); i++){
385                 numMsgsTarget[i] = 0;
386                 sizeMsgsTarget[i] = 0;
387         }
388 #endif
389 #endif
390 #if COLLECT_STATS_DETS
391         numPiggyDets = 0;
392         numDets = 0;
393         numDupDets = 0;
394 #endif
395 #if COLLECT_STATS_MEMORY
396         msgLogSize = 0;
397         bufferedDetsSize = 0;
398         storedDetsSize = 0;
399 #endif
400
401
402 }
403
404 void killLocal(void *_dummy,double curWallTime);        
405
406 void readKillFile(){
407         FILE *fp=fopen(killFile,"r");
408         if(!fp){
409                 return;
410         }
411         int proc;
412         double sec;
413         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
414                 if(proc == CkMyPe()){
415                         killTime = CmiWallTimer()+sec;
416                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
417                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
418                 }
419         }
420         fclose(fp);
421 }
422
423 /**
424  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
425  * We assume an exponential distribution for the mean-time-between-failures.
426  */
427 void readFaultFile(){
428         FILE *fp=fopen(faultFile,"r");
429         if(!fp){
430                 return;
431         }
432         int proc;
433         double sec;
434         fscanf(fp,"%d %lf",&proc,&sec);
435         faultMean = sec;
436         if(proc == CkMyPe()){
437                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
438                 CcdCallFnAfter(killLocal,NULL,sec*1000);
439         }
440         fclose(fp);
441 }
442
443 void killLocal(void *_dummy,double curWallTime){
444         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
445         if(CmiWallTimer()<killTime-1){
446                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
447         }else{  
448                 kill(getpid(),SIGKILL);
449         }
450 }
451
452 /*** Auxiliary Functions ***/
453
454 /**
455  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
456  * determinants needs to be extended.
457  */
458 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
459         Determinant *det, *auxDet;
460         char *aux;
461
462         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
463
464         // checking if we are overflowing the buffered determinants array
465         if(_indexBufferedDets >= _maxBufferedDets){
466                 aux = CpvAccess(_localDets);
467                 _maxBufferedDets *= 2;
468                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
469                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
470                 CmiFree(aux);
471         }
472
473         // adding the new determinant at the end
474         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
475         det->sender = sender;
476         det->receiver = receiver;
477         det->SN = SN;
478         det->TN = TN;
479         _numBufferedDets++;
480         _indexBufferedDets++;
481
482 #if COLLECT_STATS_MEMORY
483         bufferedDetsSize++;
484 #endif
485 #if COLLECT_STATS_DETS
486         numDets++;
487 #endif
488 }
489
490 /************************ Message logging methods ****************/
491
492 /**
493  * Sends a group message that might be a broadcast.
494  */
495 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
496         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
497                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
498                 void *origMsg = EnvToUsr(env);
499                 for(int i=0;i<CmiNumPes();i++){
500                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
501                                 void *copyMsg = CkCopyMsg(&origMsg);
502                                 envelope *copyEnv = UsrToEnv(copyMsg);
503                                 copyEnv->SN=0;
504                                 copyEnv->TN=0;
505                                 copyEnv->sender.type = TypeInvalid;
506                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
507                                 sendGroupMsg(copyEnv,i,_infoIdx);
508                         }
509                 }
510                 return;
511         }
512
513         // initializing values of envelope
514         env->SN=0;
515         env->TN=0;
516         env->sender.type = TypeInvalid;
517
518         CkObjID recver;
519         recver.type = TypeGroup;
520         recver.data.group.id = env->getGroupNum();
521         recver.data.group.onPE = destPE;
522         sendCommonMsg(recver,env,destPE,_infoIdx);
523 }
524
525 /**
526  * Sends a nodegroup message that might be a broadcast.
527  */
528 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
529         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
530                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
531                 void *origMsg = EnvToUsr(env);
532                 for(int i=0;i<CmiNumNodes();i++){
533                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
534                                 void *copyMsg = CkCopyMsg(&origMsg);
535                                 envelope *copyEnv = UsrToEnv(copyMsg);
536                                 copyEnv->SN=0;
537                                 copyEnv->TN=0;
538                                 copyEnv->sender.type = TypeInvalid;
539                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
540                         }
541                 }
542                 return;
543         }
544
545         // initializing values of envelope
546         env->SN=0;
547         env->TN=0;
548         env->sender.type = TypeInvalid;
549
550         CkObjID recver;
551         recver.type = TypeNodeGroup;
552         recver.data.group.id = env->getGroupNum();
553         recver.data.group.onPE = destNode;
554         sendCommonMsg(recver,env,destNode,_infoIdx);
555 }
556
557 /**
558  * Sends a message to an array element.
559  */
560 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
561         CkObjID recver;
562         recver.type = TypeArray;
563         recver.data.array.id = env->getsetArrayMgr();
564         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
565
566         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
567                 char recverString[100],senderString[100];
568                 
569                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
570         }
571
572         // initializing values of envelope
573         env->SN = 0;
574         env->TN = 0;
575
576         sendCommonMsg(recver,env,destPE,_infoIdx);
577 };
578
579 /**
580  * Sends a message to a singleton chare.
581  */
582 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
583         CkObjID recver;
584         recver.type = TypeChare;
585         recver.data.chare.id = *pCid;
586
587         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
588                 char recverString[100],senderString[100];
589                 
590                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
591         }
592
593         // initializing values of envelope
594         env->SN = 0;
595         env->TN = 0;
596
597         sendCommonMsg(recver,env,destPE,_infoIdx);
598 };
599
600 /**
601  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
602  */
603 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
604         envelope *env = _env;
605         MCount ticketNumber = 0;
606         int resend=0; //is it a resend
607         char recverName[100];
608         char senderString[100];
609         double _startTime=CkWallTimer();
610         
611         DEBUG_MEM(CmiMemoryCheck());
612
613         if(CpvAccess(_currentObj) == NULL){
614 //              CkAssert(0);
615                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
616                 generalCldEnqueue(destPE,env,_infoIdx);
617                 return;
618         }
619
620         // checking if this message should bypass determinants in message-logging
621         if(env->flags & CK_BYPASS_DET_MLOG){
622                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
623                 env->recver = recver;
624                 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
625                 generalCldEnqueue(destPE,env,_infoIdx);
626                 return;
627         }
628         
629         // setting message logging data in the envelope
630         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
631         if(env->sender.type == TypeInvalid){
632                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
633         }else{
634 //              envelope *copyEnv = copyEnvelope(env);
635 //              env = copyEnv;
636                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
637                 env->SN = 0;
638         }
639         
640         DEBUG_MEM(CmiMemoryCheck());
641
642         CkObjID &sender = env->sender;
643         env->recver = recver;
644
645         Chare *obj = (Chare *)env->sender.getObject();
646           
647         if(env->SN == 0){
648                 DEBUG_MEM(CmiMemoryCheck());
649                 env->SN = obj->mlogData->nextSN(recver);
650         }else{
651                 resend = 1;
652         }
653         
654 //      if(env->SN != 1){
655                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
656         //      CmiPrintStackTrace(0);
657 /*      }else{
658                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
659         }*/
660                 
661 //      CkPackMessage(&(mEntry->env));
662 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
663         
664         _startTime = CkWallTimer();
665
666         // uses the proper ticketing mechanism for local, team and general messages
667         if(isLocal(destPE)){
668                 sendLocalMsg(env, _infoIdx);
669         }else{
670                 if((teamSize > 1) && isTeamLocal(destPE)){
671
672                         // look to see if this message already has a ticket in the team-table
673                         Chare *senderObj = (Chare *)sender.getObject();
674                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
675                         if(ticketRow != NULL){
676                                 Ticket ticket = ticketRow->get(env->SN);
677                                 if(ticket.TN != 0){
678                                         ticketNumber = ticket.TN;
679                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
680                                 }
681                         }
682                 }
683                 
684                 // sending the message
685                 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
686                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
687                 
688         }
689 }
690
691 /**
692  * Determines if the message is local or not. A message is local if:
693  * 1) Both the destination and origin are the same PE.
694  */
695 inline bool isLocal(int destPE){
696         // both the destination and the origin are the same PE
697         if(destPE == CkMyPe())
698                 return true;
699
700         return false;
701 }
702
703 /**
704  * Determines if the message is group local or not. A message is group local if:
705  * 1) They belong to the same group in the group-based message logging.
706  */
707 inline bool isTeamLocal(int destPE){
708
709         // they belong to the same group
710         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
711                 return true;
712
713         return false;
714 }
715
716 /**
717  * Method that does the actual send by creating a ticket request filling it up and sending it.
718  */
719 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
720         DEBUG_NOW(char recverString[100]);
721         DEBUG_NOW(char senderString[100]);
722
723         int totalSize;
724
725         envelope *env = entry->env;
726         DEBUG(printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
727
728         // setting all the information
729         Chare *obj = (Chare *)entry->env->sender.getObject();
730         entry->env->recver = recver;
731         entry->env->SN = SN;
732         entry->env->TN = TN;
733         if(!resend){
734                 //TML: only stores message if either it goes to this processor or to a processor in a different group
735                 if(!isTeamLocal(entry->destPE)){
736                         obj->mlogData->addLogEntry(entry);
737 #if COLLECT_STATS_LOGGING
738                         MLOGFT_totalMessages += 1.0;
739                         MLOGFT_totalLogSize += entry->env->getTotalsize();
740                         if(entry->env->flags & CK_MULTICAST_MSG_MLOG){
741                                 MLOGFT_totalMcastLogSize += entry->env->getTotalsize(); 
742                                 CkPrintf("[%d] Adding %f to the log\n",CkMyPe(),entry->env->getTotalsize());
743                         }
744 #endif
745                 }else{
746                         // the message has to be deleted after it has been sent
747                         entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
748                 }
749         }
750
751         // sending the determinants that causally affect this message
752         if(_numBufferedDets > 0){
753
754                 // modifying the actual number of determinants sent in message
755                 CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
756                 CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
757                 CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
758                 CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
759         
760                 // sending the message
761                 CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
762                 CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
763                 CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
764                 CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
765                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
766                 CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
767                 CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
768         }
769
770         // updating its message log entry
771         entry->indexBufDets = _indexBufferedDets;
772         entry->numBufDets = _numBufferedDets;
773
774         // sending the message
775         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
776
777         DEBUG_MEM(CmiMemoryCheck());
778 #if COLLECT_STATS_MSGS
779 #if COLLECT_STATS_MSGS_TOTAL
780         totalMsgsTarget++;
781         totalMsgsSize += (float)env->getTotalsize();
782 #else
783         numMsgsTarget[destPE]++;
784         sizeMsgsTarget[destPE] += env->getTotalsize();
785 #endif
786 #endif
787 #if COLLECT_STATS_DETS
788         numPiggyDets += _numBufferedDets;
789 #endif
790 #if COLLECT_STATS_MEMORY
791         msgLogSize += env->getTotalsize();
792 #endif
793 };
794
795
796 /**
797  * @brief Function to send a local message. It first gets a ticket and
798  * then enqueues the message. If we are recovering, then the message 
799  * is enqueued in a delay queue.
800  */
801 void sendLocalMsg(envelope *env, int _infoIdx){
802         DEBUG_PERF(double _startTime=CkWallTimer());
803         DEBUG_MEM(CmiMemoryCheck());
804         DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
805         DEBUG(char senderString[100]);
806         DEBUG(char recverString[100]);
807         Ticket ticket;
808
809         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
810
811         // getting the receiver local object
812         Chare *recverObj = (Chare *)env->recver.getObject();
813
814         // if receiver object is not NULL, we will ask it for a ticket
815         if(recverObj){
816
817 /*HERE          // if we are recovery, then we must put this off for later
818                 if(recverObj->mlogData->restartFlag){
819                         CpvAccess(_delayedLocalMsgs)->enq(entry);
820                         return;
821                 }
822
823                 // check if this combination of sender and SN has been already a ticket
824                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
825                         
826                 // assigned a new ticket in case this message is completely new
827                 if(ticket.TN == 0){
828                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
829
830                         // if TN == 0 we enqueue this message since we are recovering from a crash
831                         if(ticket.TN == 0){
832                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
833                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
834                                 
835 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
836                                 return;
837                         }
838                 }
839
840                 //TODO: check for the case when an invalid ticket is returned
841                 //TODO: check for OLD or RECEIVED TICKETS
842                 entry->env->TN = ticket.TN;
843                 CkAssert(entry->env->TN > 0);
844                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
845         
846                 DEBUG_MEM(CmiMemoryCheck());
847
848                 // adding the determinant to _localDets
849                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
850 */
851                 // sends the local message
852                 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
853
854                 DEBUG_MEM(CmiMemoryCheck());
855         }else{
856                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
857         }
858 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
859 };
860
861 /****
862         The handler functions
863 *****/
864
865 /*** CAUSAL PROTOCOL HANDLERS ***/
866
867 /**
868  * @brief Removes the determinants after a particular index in the _localDets array.
869  */
870 void _removeDeterminantsHandler(char *buffer){
871         RemoveDeterminantsHeader *header;
872         int index, phase;
873
874         // getting the header from the message
875         header = (RemoveDeterminantsHeader *)buffer;
876         index = header->index;
877         phase = header->phase;
878
879         // fprintf(stderr,"ACK index=%d\n",index);
880
881         // updating the number of buffered determinants
882         if(phase == _phaseBufferedDets){
883                 if(index > _indexBufferedDets)
884                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
885                 CmiAssert(index <= _indexBufferedDets);
886                 _numBufferedDets = _indexBufferedDets - index;
887         }
888
889         // releasing memory
890         CmiFree(buffer);
891
892 }
893
894 /**
895  * @brief Stores the determinants coming from other processor.
896  */
897 void _storeDeterminantsHandler(char *buffer){
898         StoreDeterminantsHeader *header;
899         Determinant *detPtr, det;
900         int i, n, index, phase, destPE;
901         CkVec<Determinant> *vec;
902
903         // getting the header from the message and pointing to the first determinant
904         header = (StoreDeterminantsHeader *)buffer;
905         n = header->number;
906         index = header->index;
907         phase = header->phase;
908         destPE = header->PE;
909         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
910
911         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
912         DEBUG_MEM(CmiMemoryCheck());
913
914         // going through all determinants and storing them into _remoteDets
915         for(i = 0; i < n; i++){
916                 det.sender = detPtr->sender;
917                 det.receiver = detPtr->receiver;
918                 det.SN = detPtr->SN;
919                 det.TN = detPtr->TN;
920
921                 // getting the determinant array
922                 vec = CpvAccess(_remoteDets)->get(det.receiver);
923                 if(vec == NULL){
924                         vec = new CkVec<Determinant>();
925                         CpvAccess(_remoteDets)->put(det.receiver) = vec;
926                 }
927 #if COLLECT_STATS_DETS
928 #if COLLECT_STATS_DETS_DUP
929                 for(int j=0; j<vec->size(); j++){
930                         if(isSameDet(&(*vec)[j],&det)){
931                                 numDupDets++;
932                                 break;
933                         }
934                 }
935 #endif
936 #endif
937 #if COLLECT_STATS_MEMORY
938                 storedDetsSize++;
939 #endif
940                 vec->push_back(det);
941                 detPtr = detPtr++;
942         }
943
944         DEBUG_MEM(CmiMemoryCheck());
945
946         // freeing buffer
947         CmiFree(buffer);
948
949         // sending the ACK back to the sender
950         CpvAccess(_removeDetsHeader)->index = index;
951         CpvAccess(_removeDetsHeader)->phase = phase;
952         CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
953         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
954         
955 }
956
957 /**
958  *  If there are any delayed requests, process them first before 
959  *  processing this request
960  * */
961 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
962         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
963         double  _startTime = CkWallTimer();
964         CmiFree(ticketRequest);
965 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
966 }
967
968
969 /**
970  * @brief Gets a ticket for a recently received message.
971  * @pre env->recver has to be on this processor.
972  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
973  */
974 inline bool _getTicket(envelope *env, int *flag){
975         DEBUG_MEM(CmiMemoryCheck());
976         DEBUG(char recverName[100]);
977         DEBUG(char senderString[100]);
978         Ticket ticket;
979         
980         // getting information from request
981         CkObjID sender = env->sender;
982         CkObjID recver = env->recver;
983         MCount SN = env->SN;
984         MCount TN = env->TN;
985         Chare *recverObj = (Chare *)recver.getObject();
986         
987         DEBUG(recver.toString(recverName);)
988
989         // verifying whether we are recovering from a failure or not
990         if(recverObj->mlogData->restartFlag)
991                 return false;
992
993         // checking ticket table to see if this message already received a ticket
994         ticket = recverObj->mlogData->getTicket(sender, SN);
995         TN = ticket.TN;
996         
997         // checking if the message is team local and if it has a ticket already assigned
998         if(teamSize > 1 && TN != 0){
999                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
1000                 recverObj->mlogData->verifyTicket(sender,SN,TN);
1001         }
1002
1003         //check if a ticket for this has been already handed out to an object that used to be local but 
1004         // is no longer so.. need for parallel restart
1005         if(TN == 0){
1006                 ticket = recverObj->mlogData->next_ticket(sender,SN);
1007                 *flag = NEW_TICKET;
1008         } else {
1009                 *flag = OLD_TICKET;
1010         }
1011         if(ticket.TN > recverObj->mlogData->tProcessed){
1012                 ticket.state = NEW_TICKET;
1013         } else {
1014                 ticket.state = OLD_TICKET;
1015         }
1016         // @todo: check for the case when an invalid ticket is returned
1017         if(ticket.TN == 0){
1018                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
1019                 return false;
1020         }
1021                 
1022         // setting the ticket number in the envelope
1023         env->TN = ticket.TN;
1024         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
1025         return true;
1026
1027 };
1028
1029 bool fault_aware(CkObjID &recver){
1030         switch(recver.type){
1031                 case TypeChare:
1032                         return true;
1033                 case TypeMainChare:
1034                         return false;
1035                 case TypeGroup:
1036                 case TypeNodeGroup:
1037                 case TypeArray:
1038                         return true;
1039                 default:
1040                         return false;
1041         }
1042 };
1043
1044 /* Preprocesses a received message */
1045 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
1046         DEBUG_NOW(char recverString[100]);
1047         DEBUG_NOW(char senderString[100]);
1048         DEBUG_MEM(CmiMemoryCheck());
1049         int flag;
1050         bool ticketSuccess;
1051
1052         // getting the receiver object
1053         CkObjID recver = env->recver;
1054
1055         // checking for determinants bypass in message logging
1056         if(env->flags & CK_BYPASS_DET_MLOG){
1057                 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
1058                 return 1;       
1059         }
1060
1061         // checking if receiver is fault aware
1062         if(!fault_aware(recver)){
1063                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
1064                 return 1;
1065         }
1066
1067         Chare *obj = (Chare *)recver.getObject();
1068         *objPointer = obj;
1069         if(obj == NULL){
1070                 int possiblePE = recver.guessPE();
1071                 if(possiblePE != CkMyPe()){
1072                         int totalSize = env->getTotalsize();
1073                         CmiSyncSendAndFree(possiblePE,totalSize,(char *)env);
1074                         
1075                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
1076                 }else{
1077                         // this is the case where a message is received and the object has not been initialized
1078                         // we delayed the delivery of the message
1079                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1080                         
1081                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1082                 }
1083                 return 0;
1084         }
1085
1086         // checking if message comes from an old incarnation
1087         // message must be discarded
1088         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1089                 CmiFree(env);
1090                 return 0;
1091         }
1092
1093         DEBUG_MEM(CmiMemoryCheck());
1094         DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1095
1096         // getting a ticket for this message
1097         ticketSuccess = _getTicket(env,&flag);
1098
1099         // we might be during recovery when this message arrives
1100         if(!ticketSuccess){
1101                 
1102                 // adding the message to a delay queue
1103                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1104                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1105
1106                 return 0;
1107         }
1108         
1109         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1110         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1111         
1112         if(flag == NEW_TICKET){
1113                 // storing determinant of message in data structure
1114                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1115         }
1116
1117         DEBUG_MEM(CmiMemoryCheck());
1118
1119         double _startTime = CkWallTimer();
1120 //env->sender.updatePosition(env->getSrcPe());
1121         if(env->TN == obj->mlogData->tProcessed+1){
1122                 //the message that needs to be processed now
1123                 DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1124                 // once we find a message that we can process we put back all the messages in the out of order queue
1125                 // back into the main scheduler queue. 
1126         DEBUG_MEM(CmiMemoryCheck());
1127                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1128                         void *qMsgPtr;
1129                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1130                         envelope *qEnv = (envelope *)qMsgPtr;
1131                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1132         DEBUG_MEM(CmiMemoryCheck());
1133                 }
1134 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1135                 //TODO: this might be a problem.. change made for leanMD
1136 //              CpvAccess(_currentObj) = obj;
1137         DEBUG_MEM(CmiMemoryCheck());
1138                 return 1;
1139         }
1140
1141         // checking if message has already been processed
1142         // message must be discarded
1143         if(env->TN <= obj->mlogData->tProcessed){
1144                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1145                 
1146                 CmiFree(env);
1147                 return 0;
1148         }
1149         //message that needs to be processed in the future
1150
1151         DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1152         //the message cant be processed now put it back in the out of order message Q, 
1153         //It will be transferred to the main queue later
1154         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1155 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1156         DEBUG_MEM(CmiMemoryCheck());
1157         
1158         return 0;
1159 }
1160
1161 /**
1162  * @brief Updates a few variables once a message has been processed.
1163  */
1164 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1165         DEBUG(char senderString[100]);
1166         if(obj){
1167                 if(sender.guessPE() == CkMyPe()){
1168                         if(entry != NULL){
1169                                 entry->env = NULL;
1170                         }
1171                 }
1172                 obj->mlogData->tProcessed++;
1173 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1174                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1175 //              CpvAccess(_currentObj)= NULL;
1176         }
1177         DEBUG_MEM(CmiMemoryCheck());
1178 }
1179
1180 /***
1181         Helpers for the handlers and message logging methods
1182 ***/
1183
1184 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1185 //      double _startTime = CkWallTimer();
1186         if(env->recver.type != TypeNodeGroup){
1187         //This repeats a step performed in skipCldEnq for messages sent to
1188         //other processors. I do this here so that messages to local processors
1189         //undergo the same transformation.. It lets the resend be uniform for 
1190         //all messages
1191 //              CmiSetXHandler(env,CmiGetHandler(env));
1192                 _skipCldEnqueue(destPE,env,_infoIdx);
1193         }else{
1194                 _noCldNodeEnqueue(destPE,env);
1195         }
1196 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1197 }
1198 //extern "C" int CmiGetNonLocalLength();
1199 /** This method is used to retry the ticket requests
1200  * that had been queued up earlier
1201  * */
1202
1203 int calledRetryTicketRequest=0;
1204
1205 void _pingHandler(CkPingMsg *msg){
1206         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1207         CmiFree(msg);
1208 }
1209
1210
1211 /*****************************************************************************
1212         Checkpointing methods..
1213                 Pack all the data on a processor and send it to the buddy periodically
1214                 Also used to throw away message logs
1215 *****************************************************************************/
1216 CkVec<TProcessedLog> processedTicketLog;
1217 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1218 void clearUpMigratedRetainedLists(int PE);
1219
1220 void checkpointAlarm(void *_dummy,double curWallTime){
1221         double diff = curWallTime-lastCompletedAlarm;
1222         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1223 /*      if(CkWallTimer()-lastRestart < 50){
1224                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1225                 return;
1226         }*/
1227         if(diff < ((chkptPeriod) - 2)){
1228                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1229                 return;
1230         }
1231         CheckpointRequest request;
1232         request.PE = CkMyPe();
1233         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1234         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1235 };
1236
1237 void _checkpointRequestHandler(CheckpointRequest *request){
1238         startMlogCheckpoint(NULL,CmiWallTimer());
1239 }
1240
1241 /**
1242  * @brief Starts the checkpoint phase after migration.
1243  */
1244 void startMlogCheckpoint(void *_dummy, double curWallTime){
1245         double _startTime = CkWallTimer();
1246
1247         // increasing the checkpoint counter
1248         checkpointCount++;
1249         
1250 #if DEBUG_CHECKPOINT
1251         if(CmiMyPe() == 0){
1252                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1253         }
1254 #endif
1255
1256         DEBUG_MEM(CmiMemoryCheck());
1257
1258         PUP::sizer psizer;
1259         psizer | checkpointCount;
1260         for(int i=0; i<CmiNumPes(); i++){
1261                 psizer | CpvAccess(_incarnation)[i];
1262         }
1263         CkPupROData(psizer);
1264         DEBUG_MEM(CmiMemoryCheck());
1265         CkPupGroupData(psizer,CmiTrue);
1266         DEBUG_MEM(CmiMemoryCheck());
1267         CkPupNodeGroupData(psizer,CmiTrue);
1268         DEBUG_MEM(CmiMemoryCheck());
1269         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1270         DEBUG_MEM(CmiMemoryCheck());
1271
1272         int dataSize = psizer.size();
1273         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1274         char *msg = (char *)CmiAlloc(totalSize);
1275         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1276         chkMsg->PE = CkMyPe();
1277         chkMsg->dataSize = dataSize;
1278         
1279         char *buf = &msg[sizeof(CheckPointDataMsg)];
1280         PUP::toMem pBuf(buf);
1281
1282         pBuf | checkpointCount;
1283         for(int i=0; i<CmiNumPes(); i++){
1284                 pBuf | CpvAccess(_incarnation)[i];
1285         }
1286         CkPupROData(pBuf);
1287         CkPupGroupData(pBuf,CmiTrue);
1288         CkPupNodeGroupData(pBuf,CmiTrue);
1289         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1290
1291         unAckedCheckpoint=1;
1292         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1293         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1294         
1295         /*
1296                 Store the highest Ticket number processed for each chare on this processor
1297         */
1298         processedTicketLog.removeAll();
1299         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1300
1301 #if DEBUG_CHECKPOINT
1302         if(CmiMyPe() == 0){
1303                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1304         }
1305 #endif
1306
1307 #if COLLECT_STATS_MEMORY
1308         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1309         msgLogSize = 0;
1310         bufferedDetsSize = 0;
1311         storedDetsSize = 0;
1312 #endif
1313
1314         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1315                 lastCompletedAlarm = curWallTime;
1316                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1317         }
1318         traceUserBracketEvent(28,_startTime,CkWallTimer());
1319 };
1320
1321 /**
1322  * @brief A chare adds the latest ticket number processed.
1323  */
1324 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1325         DEBUG(char objString[100]);
1326
1327         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1328         TProcessedLog logEntry;
1329         logEntry.recver = mlogData->objID;
1330         logEntry.tProcessed = mlogData->tProcessed;
1331         log->push_back(logEntry);
1332
1333         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1334 }
1335
1336 class ElementPacker : public CkLocIterator {
1337 private:
1338         CkLocMgr *locMgr;
1339         PUP::er &p;
1340 public:
1341         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1342         void addLocation(CkLocation &loc) {
1343                 CkArrayIndexMax idx=loc.getIndex();
1344                 CkGroupID gID = locMgr->ckGetGroupID();
1345                 p|gID;      // store loc mgr's GID as well for easier restore
1346                 p|idx;
1347                 p|loc;
1348         }
1349 };
1350
1351 /**
1352  * Pups all the array elements in this processor.
1353  */
1354 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1355         int numElements,i;
1356         int numGroups = CkpvAccess(_groupIDTable)->size();      
1357         if(!p.isUnpacking()){
1358                 numElements = CkCountArrayElements();
1359         }       
1360         p | numElements;
1361         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1362         if(!p.isUnpacking()){
1363                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1364         }else{
1365                 //Flush all recs of all LocMgrs before putting in new elements
1366 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1367                 for(int j=0;j<listsize;j++){
1368                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1369                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1370                                 listToSkip[j].idx.print();
1371                         }
1372                 }
1373                 
1374                 printf("numElements = %d\n",numElements);
1375         
1376                 for (int i=0; i<numElements; i++) {
1377                         CkGroupID gID;
1378                         CkArrayIndexMax idx;
1379                         p|gID;
1380                 p|idx;
1381                         int flag=0;
1382                         int matchedIdx=0;
1383                         for(int j=0;j<listsize;j++){
1384                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1385                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1386                                                 matchedIdx = j;
1387                                                 flag = 1;
1388                                                 break;
1389                                         }
1390                                 }
1391                         }
1392                         if(flag == 1){
1393                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1394                         }else{
1395                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1396                         }
1397                                 
1398                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1399                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1400                         mgr->resume(idx,p,create,flag);
1401                         if(flag == 1){
1402                                 int homePE = mgr->homePe(idx);
1403                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1404                         }
1405                 }
1406         }
1407 };
1408
1409
1410 void writeCheckpointToDisk(int size,char *chkpt){
1411         char fNameTemp[100];
1412         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1413         int fd = creat(fNameTemp,S_IRWXU);
1414         int ret = write(fd,chkpt,size);
1415         CkAssert(ret == size);
1416         close(fd);
1417         
1418         char fName[100];
1419         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1420         unlink(fName);
1421
1422         rename(fNameTemp,fName);
1423 }
1424
1425 //handler that receives the checkpoint from a processor
1426 //it stores it and acks it
1427 void _storeCheckpointHandler(char *msg){
1428         double _startTime=CkWallTimer();
1429                 
1430         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1431         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1432         
1433         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1434         
1435         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1436         if(oldChkpt != NULL){
1437                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1438                 CmiFree(oldmsg);
1439         }
1440         //turning off storing checkpoints
1441         
1442         int sendingPE = chkMsg->PE;
1443         
1444         CpvAccess(_storedCheckpointData)->buf = chkpt;
1445         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1446         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1447
1448         int count=0;
1449         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1450                 if(migratedNoticeList[j].fromPE == sendingPE){
1451                         migratedNoticeList[j].ackFrom = 1;
1452                 }else{
1453                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1454                 }
1455                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1456                         migratedNoticeList.remove(j);
1457                         count++;
1458                 }
1459                 
1460         }
1461         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1462         
1463         CheckPointAck ackMsg;
1464         ackMsg.PE = CkMyPe();
1465         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1466         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1467         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1468         
1469         traceUserBracketEvent(29,_startTime,CkWallTimer());
1470 };
1471
1472
1473 /**
1474  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1475  * @note The remove log request message looks like
1476                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1477  */
1478 void sendRemoveLogRequests(){
1479 #if SYNCHRONIZED_CHECKPOINT
1480         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1481 #endif
1482         double _startTime = CkWallTimer();      
1483
1484         // computing total message size
1485         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1486         char *requestMsg = (char *)CmiAlloc(totalSize);
1487
1488         // filling up the message
1489         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1490         request->PE = CkMyPe();
1491         request->numberObjects = processedTicketLog.size();
1492         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1493         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1494         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1495         int *numDeterminants = (int *)listDeterminants;
1496         numDeterminants[0] = 0; 
1497         listDeterminants = (char *)&numDeterminants[1];
1498
1499         // setting the handler in the message
1500         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1501         
1502         DEBUG_MEM(CmiMemoryCheck());
1503         for(int i=0;i<CkNumPes();i++){
1504                 CmiSyncSend(i,totalSize,requestMsg);
1505         }
1506         CmiFree(requestMsg);
1507
1508         clearUpMigratedRetainedLists(CmiMyPe());
1509         //TODO: clear ticketTable
1510         
1511         traceUserBracketEvent(30,_startTime,CkWallTimer());
1512
1513         DEBUG_MEM(CmiMemoryCheck());
1514 }
1515
1516
1517 void _checkpointAckHandler(CheckPointAck *ackMsg){
1518         DEBUG_MEM(CmiMemoryCheck());
1519         unAckedCheckpoint=0;
1520         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1521         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1522         if(onGoingLoadBalancing){
1523                 onGoingLoadBalancing = 0;
1524                 finishedCheckpointLoadBalancing();
1525         }else{
1526                 sendRemoveLogRequests();
1527         }
1528         CmiFree(ackMsg);
1529         
1530 };
1531
1532
1533 /**
1534  * @brief Inserts all the determinants into a hash table.
1535  */
1536 inline void populateDeterminantTable(char *data){
1537         DEBUG(char recverString[100]);
1538         DEBUG(char senderString[100]);
1539         int numDets, *numDetsPtr;
1540         Determinant *detList;
1541         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1542         SNToTicket *tickets;
1543         Ticket ticket;
1544
1545         RemoveLogRequest *request = (RemoveLogRequest *)data;
1546         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1547         
1548         numDetsPtr = (int *)&list[request->numberObjects];
1549         numDets = numDetsPtr[0];
1550         detList = (Determinant *)&numDetsPtr[1];
1551
1552         // inserting determinants into hashtable
1553         for(int i=0; i<numDets; i++){
1554                 table = detTable.get(detList[i].sender);
1555                 if(table == NULL){
1556                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1557                         detTable.put(detList[i].sender) = table;
1558                 }
1559                 tickets = table->get(detList[i].receiver);
1560                 if(tickets == NULL){
1561                         tickets = new SNToTicket();
1562                         table->put(detList[i].receiver) = tickets; 
1563                 }
1564                 ticket.TN = detList[i].TN;
1565                 tickets->put(detList[i].SN) = ticket;
1566         }
1567
1568         DEBUG_MEM(CmiMemoryCheck());    
1569
1570 }
1571
1572 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1573         int total;
1574         DEBUG(char nameString[100]);
1575         DEBUG_MEM(CmiMemoryCheck());
1576         CmiMemoryCheck();
1577         char *data = (char *)_data;
1578         RemoveLogRequest *request = (RemoveLogRequest *)data;
1579         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1580         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1581         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1582         SNToTicket *tickets;
1583         MCount TN;
1584
1585         int count=0;
1586         total = mlog->length();
1587         for(int i=0; i<total; i++){
1588                 MlogEntry *logEntry = mlog->deq();
1589
1590                 // looking message in determinant table
1591                 table = detTable.get(logEntry->env->sender);
1592                 if(table != NULL){
1593                         tickets = table->get(logEntry->env->recver);
1594                         if(tickets != NULL){
1595                                 TN = tickets->get(logEntry->env->SN).TN;
1596                                 if(TN != 0){
1597                                         logEntry->env->TN = TN;
1598                                 }
1599                         }
1600                 }
1601         
1602                 int match=0;
1603                 for(int j=0;j<request->numberObjects;j++){
1604                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1605                                 //this log Entry should be removed
1606                                 match = 1;
1607                                 break;
1608                         }
1609                 }
1610                 char senderString[100],recverString[100];
1611 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1612                 if(match){
1613                         count++;
1614                         delete logEntry;
1615                 }else{
1616                         mlog->enq(logEntry);
1617                 }
1618         }
1619         if(count > 0){
1620                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1621         }
1622         DEBUG_MEM(CmiMemoryCheck());
1623         CmiMemoryCheck();
1624 }
1625
1626 /**
1627  * @brief Removes messages in the log according to the received ticket numbers
1628  */
1629 void _removeProcessedLogHandler(char *requestMsg){
1630         double start = CkWallTimer();
1631
1632         // building map for determinants
1633         populateDeterminantTable(requestMsg);
1634
1635         // removing processed messages from the message log
1636         forAllCharesDo(removeProcessedLogs,requestMsg);
1637
1638         // @todo: clean determinant table 
1639         
1640         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1641         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1642         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1643         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1644         /*
1645                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1646         */
1647         DEBUG_MEM(CmiMemoryCheck());
1648         clearUpMigratedRetainedLists(request->PE);
1649         
1650         traceUserBracketEvent(20,start,CkWallTimer());
1651         CmiFree(requestMsg);    
1652 };
1653
1654
1655 void clearUpMigratedRetainedLists(int PE){
1656         int count=0;
1657         CmiMemoryCheck();
1658         
1659         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1660                 if(migratedNoticeList[j].toPE == PE){
1661                         migratedNoticeList[j].ackTo = 1;
1662                 }
1663                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1664                         migratedNoticeList.remove(j);
1665                         count++;
1666                 }
1667         }
1668         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1669         
1670         for(int j=retainedObjectList.size()-1;j>=0;j--){
1671                 if(retainedObjectList[j]->migRecord.toPE == PE){
1672                         RetainedMigratedObject *obj = retainedObjectList[j];
1673                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1674                         retainedObjectList.remove(j);
1675                         if(obj->msg != NULL){
1676                                 CmiMemoryCheck();
1677                                 CmiFree(obj->msg);
1678                         }
1679                         delete obj;
1680                 }
1681         }
1682 }
1683
1684 /***************************************************************
1685         Restart Methods and handlers
1686 ***************************************************************/        
1687
1688 /**
1689  * Function for restarting the crashed processor.
1690  * It sets the restart flag and contacts the buddy
1691  * processor to get the latest checkpoint.
1692  */
1693 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1694         RestartRequest msg;
1695
1696         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1697
1698         // setting the restart flag
1699         _restartFlag = 1;
1700         _numRestartResponses = 0;
1701
1702         // if we are using team-based message logging, all members of the group have to be restarted
1703         if(teamSize > 1){
1704                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1705                         if(i != CkMyPe() && i < CkNumPes()){
1706                                 // sending a message to the team member
1707                                 msg.PE = CkMyPe();
1708                             CmiSetHandler(&msg,_restartHandlerIdx);
1709                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1710                         }
1711                 }
1712         }
1713
1714         // requesting the latest checkpoint from its buddy
1715         msg.PE = CkMyPe();
1716         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1717         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1718 };
1719
1720 /**
1721  * Function to restart this processor.
1722  * The handler is invoked by a member of its same team in message logging.
1723  */
1724 void _restartHandler(RestartRequest *restartMsg){
1725         int i;
1726         int numGroups = CkpvAccess(_groupIDTable)->size();
1727         RestartRequest msg;
1728         
1729         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1730
1731     // setting the restart flag
1732         _restartFlag = 1;
1733         _numRestartResponses = 0;
1734
1735         // flushing all buffers
1736         //TEST END
1737 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1738         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1739         for(int i=0;i<numGroups;i++){
1740         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1741                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1742                 obj->flushStates();
1743                 obj->ckJustMigrated();
1744         }*/
1745
1746     // requesting the latest checkpoint from its buddy
1747         msg.PE = CkMyPe();
1748         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1749         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1750 }
1751
1752
1753 /**
1754  * Gets the stored checkpoint but calls another function in the sender.
1755  */
1756 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1757
1758         // retrieving the stored checkpoint
1759         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1760
1761         // making sure it is its buddy who is requesting the checkpoint
1762         CkAssert(restartMsg->PE == storedChkpt->PE);
1763
1764         storedRequest = restartMsg;
1765         verifyAckTotal = 0;
1766
1767         for(int i=0;i<migratedNoticeList.size();i++){
1768                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1769 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1770                         if(migratedNoticeList[i].ackFrom == 0){
1771                                 //need to verify if the object actually exists .. it might not
1772                                 //have been acked but it might exist on it
1773                                 VerifyAckMsg msg;
1774                                 msg.migRecord = migratedNoticeList[i];
1775                                 msg.index = i;
1776                                 msg.fromPE = CmiMyPe();
1777                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1778                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1779                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1780                                 verifyAckTotal++;
1781                         }
1782                 }
1783         }
1784
1785         // sending the checkpoint back to its buddy     
1786         if(verifyAckTotal == 0){
1787                 sendCheckpointData(MLOG_RESTARTED);
1788         }
1789         verifyAckCount = 0;
1790 }
1791
1792 /**
1793  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1794  */
1795 void _recvRestartCheckpointHandler(char *_restartData){
1796         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1797         MigrationRecord *migratedAwayElements;
1798
1799         globalLBID = restartData->lbGroupID;
1800         
1801         restartData->restartWallTime *= 1000;
1802         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1803         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1804         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1805
1806         
1807         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1808         char *buf = &_restartData[sizeof(RestartProcessorData)];
1809         
1810         if(restartData->numMigratedAwayElements != 0){
1811                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1812                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1813                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1814                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1815         }
1816
1817         // turning on the team recovery flag
1818         forAllCharesDo(setTeamRecovery,NULL);
1819         
1820         PUP::fromMem pBuf(buf);
1821         pBuf | checkpointCount;
1822         for(int i=0; i<CmiNumPes(); i++){
1823                 pBuf | CpvAccess(_incarnation)[i];
1824         }
1825         CkPupROData(pBuf);
1826         CkPupGroupData(pBuf,CmiFalse);
1827         CkPupNodeGroupData(pBuf,CmiFalse);
1828         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1829         CkAssert(pBuf.size() == restartData->checkPointSize);
1830         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1831
1832         // increasing the incarnation number of all the team members
1833         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1834                 CpvAccess(_incarnation)[i]++;
1835         }
1836         
1837         // turning off the team recovery flag
1838         forAllCharesDo(unsetTeamRecovery,NULL);
1839
1840         // initializing a few variables for handling local messages
1841         forAllCharesDo(initializeRestart,NULL);
1842         
1843         CmiFree(_restartData);  
1844
1845         /*HERE _initDone();
1846
1847         getGlobalStep(globalLBID);
1848         
1849         countUpdateHomeAcks = 0;
1850         RestartRequest updateHomeRequest;
1851         updateHomeRequest.PE = CmiMyPe();
1852         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1853         for(int i=0;i<CmiNumPes();i++){
1854                 if(i != CmiMyPe()){
1855                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1856                 }
1857         }
1858 */
1859
1860
1861         // Send out the request to resend logged messages to all other processors
1862         CkVec<TProcessedLog> objectVec;
1863         forAllCharesDo(createObjIDList, (void *)&objectVec);
1864         int numberObjects = objectVec.size();
1865         
1866         /*
1867                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1868         */
1869         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1870         char *resendMsg = (char *)CmiAlloc(totalSize);  
1871
1872         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1873         resendReq->PE =CkMyPe(); 
1874         resendReq->numberObjects = numberObjects;
1875         char *objList = &resendMsg[sizeof(ResendRequest)];
1876         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1877         
1878
1879         /* test for parallel restart migrate away object**/
1880 //      if(fastRecovery){
1881 //              distributeRestartedObjects();
1882 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1883 //      }
1884         
1885         /*      To make restart work for load balancing.. should only
1886         be used when checkpoint happens along with load balancing
1887         **/
1888 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1889
1890         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1891         CpvAccess(_currentObj) = lb;
1892         lb->ReceiveDummyMigration(restartDecisionNumber);
1893
1894         sleep(10);
1895         
1896         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1897         for(int i=0;i<CkNumPes();i++){
1898                 if(i != CkMyPe()){
1899                         CmiSyncSend(i,totalSize,resendMsg);
1900                 }       
1901         }
1902         _resendMessagesHandler(resendMsg);
1903
1904 }
1905
1906
1907 void CkMlogRestartDouble(void *,double){
1908         CkMlogRestart(NULL,NULL);
1909 };
1910
1911 //TML: restarting from local (group) failure
1912 void CkMlogRestartLocal(){
1913     CkMlogRestart(NULL,NULL);
1914 };
1915
1916 /**
1917  * Gets the stored checkpoint for its buddy processor.
1918  */
1919 void _getCheckpointHandler(RestartRequest *restartMsg){
1920
1921         // retrieving the stored checkpoint
1922         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1923
1924         // making sure it is its buddy who is requesting the checkpoint
1925         CkAssert(restartMsg->PE == storedChkpt->PE);
1926
1927         storedRequest = restartMsg;
1928         verifyAckTotal = 0;
1929
1930         for(int i=0;i<migratedNoticeList.size();i++){
1931                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1932 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1933                         if(migratedNoticeList[i].ackFrom == 0){
1934                                 //need to verify if the object actually exists .. it might not
1935                                 //have been acked but it might exist on it
1936                                 VerifyAckMsg msg;
1937                                 msg.migRecord = migratedNoticeList[i];
1938                                 msg.index = i;
1939                                 msg.fromPE = CmiMyPe();
1940                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1941                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1942                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1943                                 verifyAckTotal++;
1944                         }
1945                 }
1946         }
1947
1948         // sending the checkpoint back to its buddy     
1949         if(verifyAckTotal == 0){
1950                 sendCheckpointData(MLOG_CRASHED);
1951         }
1952         verifyAckCount = 0;
1953 }
1954
1955
1956 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1957         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1958         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1959         if(rec != NULL && rec->type() == CkLocRec::local){
1960                         //this location exists on this processor
1961                         //and needs to be removed       
1962                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1963                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1964                         
1965                         int localIdx = localRec->getLocalIndex();
1966                         LBDatabase *lbdb = localRec->getLBDB();
1967                         LDObjHandle ldHandle = localRec->getLdHandle();
1968                                 
1969                         locMgr->setDuringMigration(true);
1970                         
1971                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1972                         lbdb->UnregisterObj(ldHandle);
1973                         
1974                         locMgr->setDuringMigration(false);
1975                         
1976                         verifyAckedRequests++;
1977
1978         }
1979         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1980         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1981 };
1982
1983
1984 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1985         int index =     verifyReply->index;
1986         migratedNoticeList[index] = verifyReply->migRecord;
1987         verifyAckCount++;
1988         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1989         if(verifyAckCount == verifyAckTotal){
1990                 sendCheckpointData(MLOG_CRASHED);
1991         }
1992 }
1993
1994
1995 /**
1996  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
1997  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
1998  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
1999  */
2000 void sendCheckpointData(int mode){      
2001         RestartRequest *restartMsg = storedRequest;
2002         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
2003         int numMigratedAwayElements = migratedNoticeList.size();
2004         if(migratedNoticeList.size() != 0){
2005                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
2006 //                      CkAssert(migratedNoticeList.size() == 0);
2007         }
2008         
2009         
2010         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2011         
2012         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2013         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2014         
2015         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2016         
2017         char *msg = (char *)CmiAlloc(totalSize);
2018         
2019         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2020         dataMsg->PE = CkMyPe();
2021         dataMsg->restartWallTime = CmiTimer();
2022         dataMsg->checkPointSize = storedChkpt->bufSize;
2023         
2024         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2025 //      dataMsg->numMigratedAwayElements = 0;
2026         
2027         dataMsg->numMigratedInElements = 0;
2028         dataMsg->migratedElementSize = 0;
2029         dataMsg->lbGroupID = globalLBID;
2030         /*msg layout 
2031                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2032                 Local MessageLog|
2033         */
2034         //store checkpoint data
2035         char *buf = &msg[sizeof(RestartProcessorData)];
2036
2037         if(dataMsg->numMigratedAwayElements != 0){
2038                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2039                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2040         }
2041         
2042
2043         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2044         buf = &buf[storedChkpt->bufSize];
2045
2046         if(mode == MLOG_RESTARTED){
2047                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2048                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2049                 CmiFree(restartMsg);
2050         }else{
2051                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2052                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2053                 CmiFree(restartMsg);
2054         }
2055 };
2056
2057
2058 // this list is used to create a vector of the object ids of all
2059 //the chares on this processor currently and the highest TN processed by them 
2060 //the first argument is actually a CkVec<TProcessedLog> *
2061 void createObjIDList(void *data,ChareMlogData *mlogData){
2062         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2063         TProcessedLog entry;
2064         entry.recver = mlogData->objID;
2065         entry.tProcessed = mlogData->tProcessed;
2066         list->push_back(entry);
2067         DEBUG_TEAM(char objString[100]);
2068         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2069         DEBUG_RECOVERY(printLog(&entry));
2070 }
2071
2072
2073 /**
2074  * Receives the checkpoint data from its buddy, restores the state of all the objects
2075  * and asks everyone else to update its home.
2076  */
2077 void _recvCheckpointHandler(char *_restartData){
2078         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2079         MigrationRecord *migratedAwayElements;
2080
2081         globalLBID = restartData->lbGroupID;
2082         
2083         restartData->restartWallTime *= 1000;
2084         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2085         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2086         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2087
2088         
2089         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2090         char *buf = &_restartData[sizeof(RestartProcessorData)];
2091         
2092         if(restartData->numMigratedAwayElements != 0){
2093                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2094                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2095                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2096                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2097         }
2098         
2099         PUP::fromMem pBuf(buf);
2100
2101         pBuf | checkpointCount;
2102         for(int i=0; i<CmiNumPes(); i++){
2103                 pBuf | CpvAccess(_incarnation)[i];
2104         }
2105         CkPupROData(pBuf);
2106         CkPupGroupData(pBuf,CmiTrue);
2107         CkPupNodeGroupData(pBuf,CmiTrue);
2108         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2109         CkAssert(pBuf.size() == restartData->checkPointSize);
2110         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2111
2112         // increases the incarnation number
2113         CpvAccess(_incarnation)[CmiMyPe()]++;
2114         
2115         forAllCharesDo(initializeRestart,NULL);
2116         
2117         CmiFree(_restartData);
2118         
2119         _initDone();
2120
2121         getGlobalStep(globalLBID);
2122
2123         // sending request to send determinants
2124         _numRestartResponses = 0;
2125         
2126         // Send out the request to resend logged determinants to all other processors
2127         CkVec<TProcessedLog> objectVec;
2128         forAllCharesDo(createObjIDList, (void *)&objectVec);
2129         int numberObjects = objectVec.size();
2130         
2131         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2132         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2133         char *resendMsg = (char *)CmiAlloc(totalSize);  
2134
2135         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2136         resendReq->PE =CkMyPe(); 
2137         resendReq->numberObjects = numberObjects;
2138         char *objList = &resendMsg[sizeof(ResendRequest)];
2139         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2140
2141         CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
2142         for(int i=0;i<CkNumPes();i++){
2143                 if(i != CkMyPe()){
2144                         CmiSyncSend(i,totalSize,resendMsg);
2145                 }
2146         }
2147         CmiFree(resendMsg);
2148         
2149 }
2150
2151 /**
2152  * Receives the updateHome ACKs from all other processors. Once everybody
2153  * has replied, it sends a request to resend the logged messages.
2154  */
2155 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2156         countUpdateHomeAcks++;
2157         CmiFree(updateHomeAck);
2158         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2159         if(countUpdateHomeAcks != CmiNumPes()){
2160                 return;
2161         }
2162
2163         // Send out the request to resend logged messages to all other processors
2164         CkVec<TProcessedLog> objectVec;
2165         forAllCharesDo(createObjIDList, (void *)&objectVec);
2166         int numberObjects = objectVec.size();
2167         
2168         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2169         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2170         char *resendMsg = (char *)CmiAlloc(totalSize);  
2171
2172         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2173         resendReq->PE =CkMyPe(); 
2174         resendReq->numberObjects = numberObjects;
2175         char *objList = &resendMsg[sizeof(ResendRequest)];
2176         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2177
2178         /* test for parallel restart migrate away object**/
2179         if(fastRecovery){
2180                 distributeRestartedObjects();
2181                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2182         }
2183         
2184         /*      To make restart work for load balancing.. should only
2185         be used when checkpoint happens along with load balancing
2186         **/
2187 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2188
2189         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2190         CpvAccess(_currentObj) = lb;
2191         lb->ReceiveDummyMigration(restartDecisionNumber);
2192
2193 //HERE  sleep(10);
2194         
2195         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2196         for(int i=0;i<CkNumPes();i++){
2197                 if(i != CkMyPe()){
2198                         CmiSyncSend(i,totalSize,resendMsg);
2199                 }
2200         }
2201         _resendMessagesHandler(resendMsg);
2202         CmiFree(resendMsg);
2203
2204 };
2205
2206 /**
2207  * @brief Initializes variables and flags for restarting procedure.
2208  */
2209 void initializeRestart(void *data, ChareMlogData *mlogData){
2210         mlogData->resendReplyRecvd = 0;
2211         mlogData->receivedTNs = new CkVec<MCount>;
2212         mlogData->restartFlag = 1;
2213 };
2214
2215 /**
2216  * Updates the homePe of chare array elements.
2217  */
2218 void updateHomePE(void *data,ChareMlogData *mlogData){
2219         RestartRequest *updateRequest = (RestartRequest *)data;
2220         int PE = updateRequest->PE; //restarted PE
2221         //if this object is an array Element and its home is the restarted processor
2222         // the home processor needs to know its current location
2223         if(mlogData->objID.type == TypeArray){
2224                 //it is an array element
2225                 CkGroupID myGID = mlogData->objID.data.array.id;
2226                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2227                 CkArrayID aid(mlogData->objID.data.array.id);           
2228                 //check if the restarted processor is the home processor for this object
2229                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2230                 if(locMgr->homePe(myIdx) == PE){
2231                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2232                         DEBUG_RESTART(myIdx.print());
2233                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2234                 }
2235         }
2236 };
2237
2238
2239 /**
2240  * Updates the homePe for all chares in this processor.
2241  */
2242 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2243         int sender = updateRequest->PE;
2244         
2245         forAllCharesDo(updateHomePE,updateRequest);
2246         
2247         updateRequest->PE = CmiMyPe();
2248         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2249         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2250         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2251                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2252                 checkpointCount--;
2253                 startMlogCheckpoint(NULL,0);
2254         }
2255         if(sender == getCheckPointPE()){
2256                 for(int i=0;i<retainedObjectList.size();i++){
2257                         if(retainedObjectList[i]->acked == 0){
2258                                 MigrationNotice migMsg;
2259                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2260                                 migMsg.record = retainedObjectList[i];
2261                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2262                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2263                         }
2264                 }
2265         }
2266 }
2267
2268 /**
2269  * @brief Fills up the ticket vector for each chare.
2270  */
2271 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2272         DEBUG(char name[100]);
2273         ResendData *resendData = (ResendData *)data;
2274         int PE = resendData->PE; //restarted PE
2275         int count=0;
2276         CkHashtableIterator *iterator;
2277         void *objp;
2278         void *objkey;
2279         CkObjID *objID;
2280         SNToTicket *snToTicket;
2281         Ticket ticket;
2282         
2283         // traversing the team table looking up for the maximum TN received     
2284         iterator = mlogData->teamTable.iterator();
2285         while( (objp = iterator->next(&objkey)) != NULL ){
2286                 objID = (CkObjID *)objkey;
2287         
2288                 // traversing the resendData structure to add ticket numbers
2289                 for(int j=0;j<resendData->numberObjects;j++){
2290                         if((*objID) == (resendData->listObjects)[j].recver){
2291                                 snToTicket = *(SNToTicket **)objp;
2292 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2293                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2294                                         ticket = snToTicket->get(snIndex);      
2295                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2296                                                 //store the TNs that have been since the recver last checkpointed
2297                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2298                                         }
2299                                 }
2300                         }
2301                 }
2302         }
2303
2304         //releasing the memory for the iterator
2305         delete iterator;
2306 }
2307
2308
2309 /**
2310  * @brief Turns on the flag for team recovery that selectively restores
2311  * particular metadata information.
2312  */
2313 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2314         char name[100];
2315         mlogData->teamRecoveryFlag = 1; 
2316 }
2317
2318 /**
2319  * @brief Turns off the flag for team recovery.
2320  */
2321 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2322         mlogData->teamRecoveryFlag = 0;
2323 }
2324
2325 /**
2326  * Prints a processed log.
2327  */
2328 void printLog(TProcessedLog *log){
2329         char recverString[100];
2330         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2331 }
2332
2333 /**
2334  * Prints information about a message.
2335  */
2336 void printMsg(envelope *env, const char* par){
2337         char senderString[100];
2338         char recverString[100];
2339         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2340 }
2341
2342 /**
2343  * Prints information about a determinant.
2344  */
2345 void printDet(Determinant *det, const char* par){
2346         char senderString[100];
2347         char recverString[100];
2348         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2349 }
2350
2351 /**
2352  * @brief Resends all the logged messages to a particular chare list.
2353  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2354  * @param mlogData a particular chare living in this processor.
2355  */
2356 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2357         DEBUG_RESTART(char nameString[100]);
2358         DEBUG_RESTART(char recverString[100]);
2359         DEBUG_RESTART(char senderString[100]);
2360
2361         ResendData *resendData = (ResendData *)data;
2362         int PE = resendData->PE; //restarted PE
2363         int count=0;
2364         int ticketRequests=0;
2365         CkQ<MlogEntry *> *log = mlogData->getMlog();
2366
2367         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2368
2369         // traversing the message log to see if we must resend a message        
2370         for(int i=0;i<log->length();i++){
2371                 MlogEntry *logEntry = (*log)[i];
2372                 
2373                 // if we sent out the logs of a local message to buddy and it crashed
2374                 //before acknowledging 
2375                 envelope *env = logEntry->env;
2376                 if(env == NULL){
2377                         continue;
2378                 }
2379         
2380                 // resend if type is not invalid        
2381                 if(env->recver.type != TypeInvalid){
2382                         for(int j=0;j<resendData->numberObjects;j++){
2383                                 if(env->recver == (resendData->listObjects)[j].recver){
2384                                         if(PE != CkMyPe()){
2385                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2386                                                 if(env->recver.type == TypeNodeGroup){
2387                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2388                                                 }else{
2389                                                         CmiSetHandler(env,CmiGetXHandler(env));
2390                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2391                                                 }
2392                                         }else{
2393                                                 envelope *copyEnv = copyEnvelope(env);
2394                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2395                                         }
2396                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2397                                         count++;
2398                                 }
2399                         }//end of for loop of objects
2400                         
2401                 }       
2402         }
2403         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2404 }
2405
2406 /**
2407  * Send all remote determinants to a particular failed PE.
2408  * It only sends determinants to those objects on the list.
2409  */
2410 void _sendDetsHandler(char *msg){
2411         ResendData d;
2412         CkVec<Determinant> *detVec;
2413         ResendRequest *resendReq = (ResendRequest *)msg;
2414
2415         // CkPrintf("[%d] Sending determinants\n",CkMyPe());
2416
2417         // building the reply message
2418         char *listObjects = &msg[sizeof(ResendRequest)];
2419         d.numberObjects = resendReq->numberObjects;
2420         d.PE = resendReq->PE;
2421         d.listObjects = (TProcessedLog *)listObjects;
2422         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2423         detVec = new CkVec<Determinant>[d.numberObjects];
2424
2425         // adding the remote determinants to the resendReplyMsg
2426         // traversing all the remote determinants
2427         CkVec<Determinant> *vec;
2428         for(int i=0; i<d.numberObjects; i++){
2429                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2430                 if(vec != NULL){
2431                         for(int j=0; j<vec->size(); j++){
2432
2433                                 // only relevant determinants are to be sent
2434                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2435
2436                                         // adding the ticket in the ticket vector
2437                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2438
2439                                         // adding the determinant in the determinant vector
2440                                         detVec[i].push_back((*vec)[j]);
2441
2442                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2443                                 }
2444                         }
2445                 }
2446         }
2447
2448         int totalDetStored = 0;
2449         for(int i=0;i<d.numberObjects;i++){
2450                 totalDetStored += detVec[i].size();
2451         }
2452
2453         //send back the maximum ticket number for a message sent to each object on the 
2454         //restarted processor
2455         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2456         
2457         int totalTNStored=0;
2458         for(int i=0;i<d.numberObjects;i++){
2459                 totalTNStored += d.ticketVecs[i].size();
2460         }
2461         
2462         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2463         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2464         
2465         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2466         resendReply->PE = CkMyPe();
2467         resendReply->numberObjects = d.numberObjects;
2468         
2469         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2470         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2471         for(int i=0;i<d.numberObjects;i++){
2472                 replyObjects[i] = d.listObjects[i].recver;
2473         }
2474         
2475         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2476         for(int i=0;i<d.numberObjects;i++){
2477                 int vecsize = d.ticketVecs[i].size();
2478                 memcpy(ticketList,&vecsize,sizeof(int));
2479                 ticketList = &ticketList[sizeof(int)];
2480                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2481                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2482         }
2483
2484         // adding the stored remote determinants to the message
2485         for(int i=0;i<d.numberObjects;i++){
2486                 int vecsize = detVec[i].size();
2487                 memcpy(ticketList,&vecsize,sizeof(int));
2488                 ticketList = &ticketList[sizeof(int)];
2489                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2490                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2491         }
2492
2493         CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
2494         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2495
2496         delete [] detVec;
2497         delete [] d.ticketVecs;
2498
2499         DEBUG_MEM(CmiMemoryCheck());
2500
2501         if(resendReq->PE != CkMyPe()){
2502                 CmiFree(msg);
2503         }       
2504 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2505         lastRestart = CmiWallTimer();
2506
2507 }
2508
2509 /**
2510  * Resends messages since last checkpoint to the list of objects included in the 
2511  * request. It also sends stored remote determinants to the particular failed PE.
2512  */
2513 void _resendMessagesHandler(char *msg){
2514         ResendData d;
2515         ResendRequest *resendReq = (ResendRequest *)msg;
2516
2517         //CkPrintf("[%d] Resending messages\n",CkMyPe());
2518
2519         // building the reply message
2520         char *listObjects = &msg[sizeof(ResendRequest)];
2521         d.numberObjects = resendReq->numberObjects;
2522         d.PE = resendReq->PE;
2523         d.listObjects = (TProcessedLog *)listObjects;
2524         
2525         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2526
2527         //TML: examines the origin processor to determine if it belongs to the same group.
2528         // In that case, it only returns the maximum ticket received for each object in the list.
2529         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2530                 forAllCharesDo(fillTicketForChare,&d);
2531         else
2532                 forAllCharesDo(resendMessageForChare,&d);
2533
2534         DEBUG_MEM(CmiMemoryCheck());
2535
2536         if(resendReq->PE != CkMyPe()){
2537                 CmiFree(msg);
2538         }       
2539 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2540         lastRestart = CmiWallTimer();
2541 }
2542
2543 MCount maxVec(CkVec<MCount> *TNvec);
2544 void sortVec(CkVec<MCount> *TNvec);
2545 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2546
2547 /**
2548  * @brief Processes the messages in the delayed remote message queue
2549  */
2550 void processDelayedRemoteMsgQueue(){
2551         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2552         
2553         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2554                 void *qMsgPtr;
2555                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2556                 envelope *qEnv = (envelope *)qMsgPtr;
2557                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2558                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2559                 DEBUG_MEM(CmiMemoryCheck());
2560         }
2561
2562 }
2563
2564 /**
2565  * @brief Receives determinants stored on remote nodes.
2566  * Message format: |Header|ObjID list|TN list|Determinant list|
2567  * TN list = |number of TNs|list of TNs|...|
2568  */
2569 void _sendDetsReplyHandler(char *msg){
2570         ResendRequest *resendReply = (ResendRequest *)msg;
2571         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2572         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2573         
2574 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2575         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2576         
2577         for(int i =0; i< resendReply->numberObjects;i++){
2578                 Chare *obj = (Chare *)listObjects[i].getObject();
2579                 
2580                 int vecsize;
2581                 memcpy(&vecsize,listTickets,sizeof(int));
2582                 listTickets = &listTickets[sizeof(int)];
2583                 MCount *listTNs = (MCount *)listTickets;
2584                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2585         
2586                 if(obj != NULL){
2587                         //the object was restarted on the processor on which it existed
2588                         processReceivedTN(obj,vecsize,listTNs);
2589                 }else{
2590                         //pack up objID vecsize and listTNs and send it to the correct processor
2591                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2592                         char *TNMsg = (char *)CmiAlloc(totalSize);
2593                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2594                         receivedTNData->recver = listObjects[i];
2595                         receivedTNData->numTNs = vecsize;
2596                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2597                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2598                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2599                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2600                 }
2601
2602         }
2603
2604         // traversing all the retrieved determinants
2605         for(int i = 0; i < resendReply->numberObjects; i++){
2606                 Chare *obj = (Chare *)listObjects[i].getObject();
2607                 
2608                 int vecsize;
2609                 memcpy(&vecsize,listTickets,sizeof(int));
2610                 listTickets = &listTickets[sizeof(int)];
2611                 Determinant *listDets = (Determinant *)listTickets;     
2612                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2613         
2614                 if(obj != NULL){
2615                         //the object was restarted on the processor on which it existed
2616                         processReceivedDet(obj,vecsize,listDets);
2617                 } else {
2618                         // pack the determinants and ship them to the other processor
2619                         // pack up objID vecsize and listDets and send it to the correct processor
2620                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2621                         char *detMsg = (char *)CmiAlloc(totalSize);
2622                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2623                         receivedDetData->recver = listObjects[i];
2624                         receivedDetData->numDets = vecsize;
2625                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2626                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2627                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2628                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2629                 }
2630
2631         }
2632
2633         // checking if we have received all replies
2634         _numRestartResponses++;
2635         if(_numRestartResponses != CkNumPes())
2636                 return;
2637         else 
2638                 _numRestartResponses = 0;
2639
2640         // continuing with restart process; send out the request to resend logged messages to all other processors
2641         CkVec<TProcessedLog> objectVec;
2642         forAllCharesDo(createObjIDList, (void *)&objectVec);
2643         int numberObjects = objectVec.size();
2644         
2645         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2646         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2647         char *resendMsg = (char *)CmiAlloc(totalSize);  
2648
2649         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2650         resendReq->PE =CkMyPe(); 
2651         resendReq->numberObjects = numberObjects;
2652         char *objList = &resendMsg[sizeof(ResendRequest)];
2653         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2654
2655         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2656         CpvAccess(_currentObj) = lb;
2657         lb->ReceiveDummyMigration(restartDecisionNumber);
2658
2659 //HERE  sleep(10);
2660 //      CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
2661         
2662         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2663         for(int i=0;i<CkNumPes();i++){
2664                 if(i != CkMyPe()){
2665                         CmiSyncSend(i,totalSize,resendMsg);
2666                 }
2667         }
2668         _resendMessagesHandler(resendMsg);
2669         CmiFree(resendMsg);
2670
2671         /* test for parallel restart migrate away object**/
2672         if(fastRecovery){
2673                 distributeRestartedObjects();
2674                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2675         }
2676
2677 //      processDelayedRemoteMsgQueue();
2678
2679 };
2680
2681 /**
2682  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2683  */
2684 void _receivedDetDataHandler(ReceivedDetData *msg){
2685         DEBUG_NOW(char objName[100]);
2686         Chare *obj = (Chare *) msg->recver.getObject();
2687         if(obj){                
2688                 char *_msg = (char *)msg;
2689                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2690                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2691                 processReceivedDet(obj,msg->numDets,listDets);
2692                 CmiFree(msg);
2693         }else{
2694                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2695                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2696         }
2697 }
2698
2699 /**
2700  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2701  */
2702 void _receivedTNDataHandler(ReceivedTNData *msg){
2703         DEBUG_NOW(char objName[100]);
2704         Chare *obj = (Chare *) msg->recver.getObject();
2705         if(obj){                
2706                 char *_msg = (char *)msg;
2707                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2708                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2709                 processReceivedTN(obj,msg->numTNs,listTNs);
2710                 CmiFree(msg);
2711         }else{
2712                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2713                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2714         }
2715 };
2716
2717 /**
2718  * @brief Processes the received list of determinants from a particular PE.
2719  */
2720 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2721         Determinant *det;
2722
2723         // traversing the whole list of determinants
2724         for(int i=0; i<listSize; i++){
2725                 det = &listDets[i];
2726                 if(CkMyPe() == 4) printDet(det,"RECOVERY");
2727                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2728                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2729         }
2730         
2731         DEBUG_MEM(CmiMemoryCheck());
2732 }
2733         
2734 /**
2735  * @brief Processes the received list of tickets from a particular PE.
2736  */
2737 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2738         // increases the number of resendReply received
2739         obj->mlogData->resendReplyRecvd++;
2740
2741         DEBUG(char objName[100]);
2742         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2743         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2744         //if(obj->mlogData->receivedTNs == NULL)
2745         //      CkPrintf("NULL\n");     
2746         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2747
2748         // includes the tickets into the receivedTN structure
2749         for(int j=0;j<listSize;j++){
2750                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2751         }
2752         
2753         //if this object has received all the replies find the ticket numbers
2754         //that senders know about. Those less than the ticket number processed 
2755         //by the receiver can be thrown away. The rest need not be consecutive
2756         // ie there can be holes in the list of ticket numbers seen by senders
2757         if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
2758                 obj->mlogData->resendReplyRecvd = 0;
2759
2760 #if VERIFY_DETS
2761                 //sort the received TNS
2762                 sortVec(obj->mlogData->receivedTNs);
2763         
2764                 //after all the received tickets are in we need to sort them and then 
2765                 // calculate the holes  
2766                 if(obj->mlogData->receivedTNs->size() > 0){
2767                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2768                         int vecsize = obj->mlogData->receivedTNs->size();
2769                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2770                         
2771                         // updating tCount with the highest ticket handed out
2772                         if(teamSize > 1){
2773                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2774                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2775                         }else{
2776                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2777                         }
2778                         
2779                         if(numberHoles == 0){
2780                         }else{
2781                                 char objName[100];                                      
2782                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2783                                 obj->mlogData->numberHoles = numberHoles;
2784                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2785                                 int countHoles=0;
2786                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2787                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2788                                                 //the TNs are not consecutive at this point
2789                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2790                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2791                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2792                                                         countHoles++;
2793                                                 }       
2794                                         }
2795                                 }
2796                                 //Holes have been given new TN
2797                                 if(countHoles != numberHoles){
2798                                         char str[100];
2799                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2800                                 }
2801                                 CkAssert(countHoles == numberHoles);                                    
2802                                 obj->mlogData->currentHoles = numberHoles;
2803                         }
2804                 }
2805 #else
2806                 if(obj->mlogData->receivedTNs->size() > 0){
2807                         obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
2808                 }
2809 #endif
2810                 // cleaning up structures and getting ready to continue execution       
2811                 delete obj->mlogData->receivedTNs;
2812                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2813                 obj->mlogData->receivedTNs = NULL;
2814                 obj->mlogData->restartFlag = 0;
2815
2816                 // processDelayedRemoteMsgQueue();
2817
2818                 DEBUG_RESTART(char objString[100]);
2819                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2820         }
2821
2822 }
2823
2824 /** @brief Returns the maximum ticket from a vector */
2825 MCount maxVec(CkVec<MCount> *TNvec){
2826         MCount max = 0;
2827         for(int i=0; i<TNvec->size(); i++){
2828                 if((*TNvec)[i] > max)
2829                         max = (*TNvec)[i];
2830         }
2831         return max;
2832 }
2833
2834 void sortVec(CkVec<MCount> *TNvec){
2835         //sort it ->its bloddy bubble sort
2836         //TODO: use quicksort
2837         for(int i=0;i<TNvec->size();i++){
2838                 for(int j=i+1;j<TNvec->size();j++){
2839                         if((*TNvec)[j] < (*TNvec)[i]){
2840                                 MCount temp;
2841                                 temp = (*TNvec)[i];
2842                                 (*TNvec)[i] = (*TNvec)[j];
2843                                 (*TNvec)[j] = temp;
2844                         }
2845                 }
2846         }
2847         //make it unique .. since its sorted all equal units will be consecutive
2848         MCount *tempArray = new MCount[TNvec->size()];
2849         int     uniqueCount=-1;
2850         for(int i=0;i<TNvec->size();i++){
2851                 tempArray[i] = 0;
2852                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2853                         uniqueCount++;
2854                         tempArray[uniqueCount] = (*TNvec)[i];
2855                 }
2856         }
2857         uniqueCount++;
2858         TNvec->removeAll();
2859         for(int i=0;i<uniqueCount;i++){
2860                 TNvec->push_back(tempArray[i]);
2861         }
2862         delete [] tempArray;
2863 }       
2864
2865 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2866         if(TNVec->size() == 0){
2867                 return -1; //not found in an empty vec
2868         }
2869         //binary search to find 
2870         int left=0;
2871         int right = TNVec->size();
2872         int mid = (left +right)/2;
2873         while(searchTN != (*TNVec)[mid] && left < right){
2874                 if((*TNVec)[mid] > searchTN){
2875                         right = mid-1;
2876                 }else{
2877                         left = mid+1;
2878                 }
2879                 mid = (left + right)/2;
2880         }
2881         if(left < right){
2882                 //mid is the element to be returned
2883                 return mid;
2884         }else{
2885                 if(mid < TNVec->size() && mid >=0){
2886                         if((*TNVec)[mid] == searchTN){
2887                                 return mid;
2888                         }else{
2889                                 return -1;
2890                         }
2891                 }else{
2892                         return -1;
2893                 }
2894         }
2895 };
2896
2897
2898 /*
2899         Method to do parallel restart. Distribute some of the array elements to other processors.
2900         The problem is that we cant use to charm entry methods to do migration as it will get
2901         stuck in the protocol that is going to restart
2902         Note: in order to avoid interference between the objects being recovered, the current PE
2903     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2904     objects end up stepping into each other's shoes (interference).
2905 */
2906
2907 class ElementDistributor: public CkLocIterator{
2908         CkLocMgr *locMgr;
2909         int *targetPE;
2910
2911         void pupLocation(CkLocation &loc,PUP::er &p){
2912                 CkArrayIndexMax idx=loc.getIndex();
2913                 CkGroupID gID = locMgr->ckGetGroupID();
2914                 p|gID;      // store loc mgr's GID as well for easier restore
2915                 p|idx;
2916                 p|loc;
2917         };
2918 public:
2919         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2920
2921         void addLocation(CkLocation &loc){
2922
2923                 // leaving object on this PE
2924                 if(*targetPE == CkMyPe()){
2925                         *targetPE = (*targetPE +1)%CkNumPes();
2926                         return;
2927                 }
2928                         
2929                 CkArrayIndexMax idx = loc.getIndex();
2930                 CkLocRec_local *rec = loc.getLocalRecord();
2931                 CkLocMgr *locMgr = loc.getManager();
2932                 CkVec<CkMigratable *> eltList;
2933                         
2934                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2935                 idx.print();
2936
2937                 // incrementing number of emigrant objects
2938                 CpvAccess(_numEmigrantRecObjs)++;
2939         locMgr->migratableList((CkLocRec_local *)rec,eltList);
2940                 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
2941                 
2942                 // let everybody else know the object is leaving
2943                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
2944                 reductionMgr->incNumEmigrantRecObjs();
2945         
2946                 //pack up this location and send it across
2947                 PUP::sizer psizer;
2948                 pupLocation(loc,psizer);
2949                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
2950                 char *msg = (char *)CmiAlloc(totalSize);
2951                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
2952                 distributeMsg->PE = CkMyPe();
2953                 char *buf = &msg[sizeof(DistributeObjectMsg)];
2954                 PUP::toMem pmem(buf);
2955                 pmem.becomeDeleting();
2956                 pupLocation(loc,pmem);
2957                         
2958                 locMgr->setDuringMigration(CmiTrue);
2959                 delete rec;
2960                 locMgr->setDuringMigration(CmiFalse);
2961                 locMgr->inform(idx,*targetPE);
2962
2963                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2964                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2965
2966                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2967
2968                 //decide on the target processor for the next object
2969                 *targetPE = *targetPE + 1;
2970                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2971                         *targetPE = CkMyPe() + 1;
2972                 }
2973         }
2974
2975 };
2976
2977 /**
2978  * Distributes objects to accelerate recovery after a failure.
2979  */
2980 void distributeRestartedObjects(){
2981         int numGroups = CkpvAccess(_groupIDTable)->size();      
2982         int i;
2983         int targetPE=CkMyPe()+1;
2984         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2985 };
2986
2987 /**
2988  * Handler to receive back a location.
2989  */
2990 void _sendBackLocationHandler(char *receivedMsg){
2991         printf("Array element received at processor %d after recovery\n",CkMyPe());
2992         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
2993         int sourcePE = distributeMsg->PE;
2994         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
2995         PUP::fromMem pmem(buf);
2996         CkGroupID gID;
2997         CkArrayIndexMax idx;
2998         pmem |gID;
2999         pmem |idx;
3000         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3001         donotCountMigration=1;
3002         mgr->resume(idx,pmem,CmiTrue);
3003         donotCountMigration=0;
3004         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3005         printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
3006         idx.print();
3007
3008         // decrementing number of emigrant objects at reduction manager
3009         CkVec<CkMigratable *> eltList;
3010         CkLocRec *rec = mgr->elementRec(idx);
3011         mgr->migratableList((CkLocRec_local *)rec,eltList);
3012         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
3013         reductionMgr->decNumEmigrantRecObjs();
3014         reductionMgr->decGCount();
3015
3016         // checking if it has received all emigrant recovering objects
3017         CpvAccess(_numEmigrantRecObjs)--;
3018         if(CpvAccess(_numEmigrantRecObjs) == 0){
3019                 (*resumeLbFnPtr)(centralLb);
3020         }
3021
3022 }
3023
3024 /**
3025  * Handler to update information about an object just received.
3026  */
3027 void _distributedLocationHandler(char *receivedMsg){
3028         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
3029         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
3030         int sourcePE = distributeMsg->PE;
3031         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
3032         PUP::fromMem pmem(buf);
3033         CkGroupID gID;
3034         CkArrayIndexMax idx;
3035         pmem |gID;
3036         pmem |idx;
3037         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3038         donotCountMigration=1;
3039         mgr->resume(idx,pmem,CmiTrue);
3040         donotCountMigration=0;
3041         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3042         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
3043         idx.print();
3044
3045         CkLocRec *rec = mgr->elementRec(idx);
3046         CmiAssert(rec->type() == CkLocRec::local);
3047
3048         // adding object to the list of immigrant recovery objects
3049         CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,(CkLocRec_local *)rec));
3050         CpvAccess(_numImmigrantRecObjs)++;
3051         
3052         CkVec<CkMigratable *> eltList;
3053         mgr->migratableList((CkLocRec_local *)rec,eltList);
3054         for(int i=0;i<eltList.size();i++){
3055                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
3056                         CpvAccess(_currentObj) = eltList[i];
3057                         eltList[i]->mlogData->immigrantRecFlag = 1;
3058                         eltList[i]->mlogData->immigrantSourcePE = sourcePE;
3059
3060                         // incrementing immigrant counter at reduction manager
3061                         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
3062                         reductionMgr->incNumImmigrantRecObjs();
3063                         reductionMgr->decGCount();
3064
3065                         eltList[i]->ResumeFromSync();
3066                 }
3067         }
3068 }
3069
3070
3071 /** this method is used to send messages to a restarted processor to tell
3072  * it that a particular expected object is not going to get to it */
3073 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
3074         DummyMigrationMsg buf;
3075         buf.flag = MLOG_OBJECT;
3076         buf.lbID = lbID;
3077         buf.mgrID = locMgrID;
3078         buf.idx = idx;
3079         buf.locationPE = locationPE;
3080         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3081         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3082 };
3083
3084
3085 /**this method is used by a restarted processor to tell other processors
3086  * that they are not going to receive these many objects.. just the count
3087  * not the objects themselves ***/
3088
3089 void sendDummyMigrationCounts(int *dummyCounts){
3090         DummyMigrationMsg buf;
3091         buf.flag = MLOG_COUNT;
3092         buf.lbID = globalLBID;
3093         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3094         for(int i=0;i<CmiNumPes();i++){
3095                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3096                         buf.count = dummyCounts[i];
3097                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3098                 }
3099         }
3100 }
3101
3102
3103 /** this handler is used to process a dummy migration msg.
3104  * it looks up the load balancer and calls migrated for it */
3105
3106 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3107         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3108         if(msg->flag == MLOG_OBJECT){
3109                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3110                 LDObjHandle h;
3111                 lb->Migrated(h,1);
3112         }
3113         if(msg->flag == MLOG_COUNT){
3114                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3115                 msg->count -= verifyAckedRequests;
3116                 for(int i=0;i<msg->count;i++){
3117                         LDObjHandle h;
3118                         lb->Migrated(h,1);
3119                 }
3120         }
3121         verifyAckedRequests=0;
3122         CmiFree(msg);
3123 };
3124
3125 /*****************************************************
3126         Implementation of a method that can be used to call
3127         any method on the ChareMlogData of all the chares on
3128         a processor currently
3129 ******************************************************/
3130
3131
3132 class ElementCaller :  public CkLocIterator {
3133 private:
3134         CkLocMgr *locMgr;
3135         MlogFn fnPointer;
3136         void *data;
3137 public:
3138         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3139                 locMgr = _locMgr;
3140                 fnPointer = _fnPointer;
3141                 data = _data;
3142         };
3143         void addLocation(CkLocation &loc){
3144                 CkVec<CkMigratable *> list;
3145                 CkLocRec_local *local = loc.getLocalRecord();
3146                 locMgr->migratableList (local,list);
3147                 for(int i=0;i<list.size();i++){
3148                         CkMigratable *migratableElement = list[i];
3149                         fnPointer(data,migratableElement->mlogData);
3150                 }
3151         }
3152 };
3153
3154 /**
3155  * Map function pointed by fnPointer over all the chares living in this processor.
3156  */
3157 void forAllCharesDo(MlogFn fnPointer, void *data){
3158         int numGroups = CkpvAccess(_groupIDTable)->size();
3159         for(int i=0;i<numGroups;i++){
3160                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3161                 fnPointer(data,obj->mlogData);
3162         }
3163         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3164         for(int i=0;i<numNodeGroups;i++){
3165                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3166                 fnPointer(data,obj->mlogData);
3167         }
3168         int i;
3169         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3170 };
3171
3172
3173 /******************************************************************
3174  Load Balancing
3175 ******************************************************************/
3176
3177 /**
3178  * This is the first time Converse is called after AtSync method has been called by every local object.
3179  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3180  * message logging, we can take advantage of this situation and garbage collect at this point.
3181  */
3182 void initMlogLBStep(CkGroupID gid){
3183         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3184         countLBMigratedAway = 0;
3185         countLBToMigrate=0;
3186         onGoingLoadBalancing=1;
3187         migrationDoneCalled=0;
3188         checkpointBarrierCount=0;
3189         if(globalLBID.idx != 0){
3190                 CmiAssert(globalLBID.idx == gid.idx);
3191         }
3192         globalLBID = gid;
3193 #if SYNCHRONIZED_CHECKPOINT
3194         garbageCollectMlog();
3195 #endif
3196 }
3197
3198 /**
3199  * Pups a location
3200  */
3201 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
3202         CkArrayIndexMax idx = loc->getIndex();
3203         CkGroupID gID = locMgr->ckGetGroupID();
3204         p|gID;      // store loc mgr's GID as well for easier restore
3205         p|idx;
3206         p|*loc;
3207 };
3208
3209 /**
3210  * Sends back the immigrant recovering object to their origin PE.
3211  */
3212 void sendBackImmigrantRecObjs(){
3213         CkLocation *loc;
3214         CkLocMgr *locMgr;
3215         CkArrayIndexMax idx;
3216         CkLocRec_local *rec;
3217         PUP::sizer psizer;
3218         int targetPE;
3219         CkVec<CkMigratable *> eltList;
3220         CkReductionMgr *reductionMgr;
3221  
3222         // looping through all elements in immigrant recovery objects vector
3223         for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
3224
3225                 // getting the components of each location
3226                 loc = (*CpvAccess(_immigrantRecObjs))[i];
3227                 idx = loc->getIndex();
3228                 rec = loc->getLocalRecord();
3229