Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / ck-core / ckcausalmlog.C
1 /**
2   * Simple Causal Message Logging Fault Tolerance Protocol.
3   * Features:
4         * Reduces the latency overhead of the pessimistic approach.
5         * Supports a single failure (only supports multiple concurrent failures under certain circumstances).
6   */
7
8 #include "charm.h"
9 #include "ck.h"
10 #include "ckcausalmlog.h"
11 #include "queueing.h"
12 #include <sys/types.h>
13 #include <signal.h>
14 #include "CentralLB.h"
15
16 #ifdef _FAULT_CAUSAL_
17
18 // Collects some statistics about message logging. Beware of the high cost of accounting for
19 // duplicated determinants (for every determinant received, it consists in a linear search 
20 // through a potentially big list).
21 #define COLLECT_STATS_MSGS 0
22 #define COLLECT_STATS_MSGS_TOTAL 0
23 #define COLLECT_STATS_MSG_COUNT 0
24 #define COLLECT_STATS_DETS 0
25 #define COLLECT_STATS_DETS_DUP 0
26 #define COLLECT_STATS_MEMORY 0
27 #define COLLECT_STATS_LOGGING 0
28
29 #define RECOVERY_SEND "SEND"
30 #define RECOVERY_PROCESS "PROCESS"
31
32 #define DEBUG_MEM(x)  //x
33 #define DEBUG(x) // x
34 #define DEBUG_RESTART(x)  //x
35 #define DEBUGLB(x)   // x
36 #define DEBUG_TEAM(x)  // x
37 #define DEBUG_PERF(x) // x
38 #define DEBUG_CHECKPOINT 1
39 #define DEBUG_NOW(x) x
40 #define DEBUG_PE(x,y) // if(CkMyPe() == x) y
41 #define DEBUG_PE_NOW(x,y)  if(CkMyPe() == x) y
42 #define DEBUG_RECOVERY(x) //x
43
44 extern const char *idx2str(const CkArrayIndex &ind);
45 extern const char *idx2str(const ArrayElement *el);
46
47 void getGlobalStep(CkGroupID gID);
48
49 bool fault_aware(CkObjID &recver);
50 void sendCheckpointData(int mode);
51 void createObjIDList(void *data,ChareMlogData *mlogData);
52 inline bool isLocal(int destPE);
53 inline bool isTeamLocal(int destPE);
54 void printLog(TProcessedLog *log);
55
56 int _restartFlag=0;
57 int _numRestartResponses=0;
58
59 //TODO: remove for perf runs
60 int countHashRefs=0; //count the number of gets
61 int countHashCollisions=0;
62
63 char *checkpointDirectory=".";
64 int unAckedCheckpoint=0;
65
66 int countLocal=0,countBuffered=0;
67 int countPiggy=0;
68 int countClearBufferedLocalCalls=0;
69
70 int countUpdateHomeAcks=0;
71
72 extern int teamSize;
73 extern int chkptPeriod;
74 extern bool fastRecovery;
75 extern int parallelRecovery;
76
77 char *killFile;
78 char *faultFile;
79 int killFlag=0;
80 int faultFlag=0;
81 int restartingMlogFlag=0;
82 void readKillFile();
83 double killTime=0.0;
84 double faultMean;
85 int checkpointCount=0;
86
87 CpvDeclare(Chare *,_currentObj);
88 CpvDeclare(StoredCheckpoint *,_storedCheckpointData);
89 CpvDeclare(CkQ<MlogEntry *> *,_delayedLocalMsgs);
90 CpvDeclare(Queue, _outOfOrderMessageQueue);
91 CpvDeclare(Queue, _delayedRemoteMessageQueue);
92 CpvDeclare(char **,_bufferedTicketRequests);
93 CpvDeclare(int *,_numBufferedTicketRequests);
94
95 /***** VARIABLES FOR CAUSAL MESSAGE LOGGING *****/
96 /** @note All the determinants generated by a PE are stored in variable _localDets.
97  * As soon as a message is sent, then all the determinants are appended to the message,
98  * but those determinants are not deleted. We must wait until an ACK comes from the receiver
99  * to delete the determinants. In the meantime the same determinants may be appended
100  * to other messages and more determinants can be added to _localDets.
101  * A simple solution to this problem was to have a primitive array and keep adding determinants
102  * at the end. However, to avoid multiple copies of determinants, we will keep a pointer
103  * to the first 'valid' determinant in the array. Alternatively, we can keep a pointer to the latest
104  * determinant and a number of how many valid determinants there are behind it. We do not remove
105  * determinants until a checkpoint is made, since these determinants may have to be added to
106  * messages in case of a recovery.
107  */
108 // temporal storage for the outgoing determinants
109 CpvDeclare(char *, _localDets);
110 // number of buffered determinants
111 int _numBufferedDets;
112 // current index of first determinant in _localDets
113 int _indexBufferedDets;
114 // current phase of determinants
115 int _phaseBufferedDets;
116
117 // stores the determinants from other nodes, to send them in case of a crash
118 CpvDeclare(CkDeterminantHashtableT *, _remoteDets);
119 // max number of buffered determinants
120 int _maxBufferedDets;
121
122 // stores the incarnation number from every other processor
123 CpvDeclare(char *, _incarnation);
124
125 // storage for remove determinants header
126 CpvDeclare(RemoveDeterminantsHeader *, _removeDetsHeader);
127 // storage for store determinants header
128 CpvDeclare(StoreDeterminantsHeader *, _storeDetsHeader);
129 // storage for the sizes in vector-send to store determinants
130 CpvDeclare(int *, _storeDetsSizes);
131 // storage for the pointers in vector-send to store determinants
132 CpvDeclare(char **, _storeDetsPtrs);
133
134 /***** *****/
135
136 /***** VARIABLES FOR PARALLEL RECOVERY *****/
137 CpvDeclare(int, _numEmigrantRecObjs);
138 CpvDeclare(int, _numImmigrantRecObjs);
139 CpvDeclare(CkVec<CkLocation *> *, _immigrantRecObjs);
140 /***** *****/
141
142 #if COLLECT_STATS_MSGS
143 int *numMsgsTarget;
144 int *sizeMsgsTarget;
145 int totalMsgsTarget;
146 float totalMsgsSize;
147 #endif
148 #if COLLECT_STATS_DETS
149 int numPiggyDets;
150 int numDets;
151 int numDupDets;
152 #endif
153 #if COLLECT_STATS_MEMORY
154 int msgLogSize;
155 int bufferedDetsSize;
156 int storedDetsSize;
157 #endif
158 //TML: variables for measuring savings with teams in message logging
159 #if COLLECT_STATS_LOGGING
160 float MLOGFT_totalLogSize = 0.0;
161 float MLOGFT_totalMessages = 0.0;
162 float MLOGFT_totalMcastLogSize = 0.0;
163 float MLOGFT_totalReductionLogSize = 0.0;
164 #endif
165
166 static double adjustChkptPeriod=0.0; //in ms
167 static double nextCheckpointTime=0.0;//in seconds
168 static CkHashtableT<CkHashtableAdaptorT<CkObjID>,CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *> detTable (1000,0.3);
169
170 int _pingHandlerIdx;
171
172 char objString[100];
173 int _checkpointRequestHandlerIdx;
174 int _storeCheckpointHandlerIdx;
175 int _checkpointAckHandlerIdx;
176 int _getCheckpointHandlerIdx;
177 int _recvCheckpointHandlerIdx;
178 int _removeProcessedLogHandlerIdx;
179
180 int _verifyAckRequestHandlerIdx;
181 int _verifyAckHandlerIdx;
182 int _dummyMigrationHandlerIdx;
183
184
185 int     _getGlobalStepHandlerIdx;
186 int     _recvGlobalStepHandlerIdx;
187
188 int _updateHomeRequestHandlerIdx;
189 int _updateHomeAckHandlerIdx;
190 int _resendMessagesHandlerIdx;
191 int _sendDetsHandlerIdx;
192 int _sendDetsReplyHandlerIdx;
193 int _receivedTNDataHandlerIdx;
194 int _receivedDetDataHandlerIdx;
195 int _distributedLocationHandlerIdx;
196 int _sendBackLocationHandlerIdx;
197 int _storeDeterminantsHandlerIdx;
198 int _removeDeterminantsHandlerIdx;
199
200 //TML: integer constants for team-based message logging
201 int _restartHandlerIdx;
202 int _getRestartCheckpointHandlerIdx;
203 int _recvRestartCheckpointHandlerIdx;
204 void setTeamRecovery(void *data, ChareMlogData *mlogData);
205 void unsetTeamRecovery(void *data, ChareMlogData *mlogData);
206
207 int verifyAckTotal;
208 int verifyAckCount;
209
210 int verifyAckedRequests=0;
211
212 RestartRequest *storedRequest;
213
214 int _falseRestart =0; /**
215                                                                                                         For testing on clusters we might carry out restarts on 
216                                                                                                         a porcessor without actually starting it
217                                                                                                         1 -> false restart
218                                                                                                         0 -> restart after an actual crash
219                                                                                                 */
220
221 //Load balancing globals
222 int onGoingLoadBalancing=0;
223 void *centralLb;
224 void (*resumeLbFnPtr)(void *);
225 int _receiveMlogLocationHandlerIdx;
226 int _receiveMigrationNoticeHandlerIdx;
227 int _receiveMigrationNoticeAckHandlerIdx;
228 int _checkpointBarrierHandlerIdx;
229 int _checkpointBarrierAckHandlerIdx;
230
231 CkVec<MigrationRecord> migratedNoticeList;
232 CkVec<RetainedMigratedObject *> retainedObjectList;
233 int donotCountMigration=0;
234 int countLBMigratedAway=0;
235 int countLBToMigrate=0;
236 int migrationDoneCalled=0;
237 int checkpointBarrierCount=0;
238 int globalResumeCount=0;
239 CkGroupID globalLBID;
240 int restartDecisionNumber=-1;
241
242 double lastCompletedAlarm=0;
243 double lastRestart=0;
244
245 //update location globals
246 int _receiveLocationHandlerIdx;
247
248
249 /** 
250  * @brief Initialize message logging data structures and register handlers
251  */
252 void _messageLoggingInit(){
253         //current object
254         CpvInitialize(Chare *,_currentObj);
255         
256         //registering handlers for message logging
257         _pingHandlerIdx = CkRegisterHandler((CmiHandler)_pingHandler);
258                 
259         //handlers for checkpointing
260         _storeCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_storeCheckpointHandler);
261         _checkpointAckHandlerIdx = CkRegisterHandler((CmiHandler) _checkpointAckHandler);
262         _removeProcessedLogHandlerIdx  = CkRegisterHandler((CmiHandler)_removeProcessedLogHandler);
263         _checkpointRequestHandlerIdx =  CkRegisterHandler((CmiHandler)_checkpointRequestHandler);
264
265         //handlers for restart
266         _getCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getCheckpointHandler);
267         _recvCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvCheckpointHandler);
268         _updateHomeRequestHandlerIdx =CkRegisterHandler((CmiHandler)_updateHomeRequestHandler);
269         _updateHomeAckHandlerIdx =  CkRegisterHandler((CmiHandler) _updateHomeAckHandler);
270         _resendMessagesHandlerIdx = CkRegisterHandler((CmiHandler)_resendMessagesHandler);
271         _sendDetsHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsHandler);
272         _sendDetsReplyHandlerIdx = CkRegisterHandler((CmiHandler)_sendDetsReplyHandler);
273         _receivedTNDataHandlerIdx=CkRegisterHandler((CmiHandler)_receivedTNDataHandler);
274         _receivedDetDataHandlerIdx = CkRegisterHandler((CmiHandler)_receivedDetDataHandler);
275         _distributedLocationHandlerIdx=CkRegisterHandler((CmiHandler)_distributedLocationHandler);
276         _sendBackLocationHandlerIdx=CkRegisterHandler((CmiHandler)_sendBackLocationHandler);
277         _verifyAckRequestHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckRequestHandler);
278         _verifyAckHandlerIdx = CkRegisterHandler((CmiHandler)_verifyAckHandler);
279         _dummyMigrationHandlerIdx = CkRegisterHandler((CmiHandler)_dummyMigrationHandler);
280
281         //TML: handlers for team-based message logging
282         _restartHandlerIdx = CkRegisterHandler((CmiHandler)_restartHandler);
283         _getRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_getRestartCheckpointHandler);
284         _recvRestartCheckpointHandlerIdx = CkRegisterHandler((CmiHandler)_recvRestartCheckpointHandler);
285
286         // handlers for causal message logging
287         _storeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_storeDeterminantsHandler);
288         _removeDeterminantsHandlerIdx = CkRegisterHandler((CmiHandler)_removeDeterminantsHandler);
289         
290         //handlers for load balancing
291         _receiveMlogLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMlogLocationHandler);
292         _receiveMigrationNoticeHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeHandler);
293         _receiveMigrationNoticeAckHandlerIdx=CkRegisterHandler((CmiHandler)_receiveMigrationNoticeAckHandler);
294         _getGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_getGlobalStepHandler);
295         _recvGlobalStepHandlerIdx=CkRegisterHandler((CmiHandler)_recvGlobalStepHandler);
296         _checkpointBarrierHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierHandler);
297         _checkpointBarrierAckHandlerIdx=CkRegisterHandler((CmiHandler)_checkpointBarrierAckHandler);
298         
299         //handlers for updating locations
300         _receiveLocationHandlerIdx=CkRegisterHandler((CmiHandler)_receiveLocationHandler);
301         
302         //Cpv variables for message logging
303         CpvInitialize(CkQ<MlogEntry *>*,_delayedLocalMsgs);
304         CpvAccess(_delayedLocalMsgs) = new CkQ<MlogEntry *>;
305         CpvInitialize(Queue, _outOfOrderMessageQueue);
306         CpvInitialize(Queue, _delayedRemoteMessageQueue);
307         CpvAccess(_outOfOrderMessageQueue) = CqsCreate();
308         CpvAccess(_delayedRemoteMessageQueue) = CqsCreate();
309         
310         CpvInitialize(char **,_bufferedTicketRequests);
311         CpvAccess(_bufferedTicketRequests) = new char *[CkNumPes()];
312         CpvAccess(_numBufferedTicketRequests) = new int[CkNumPes()];
313         for(int i=0;i<CkNumPes();i++){
314                 CpvAccess(_bufferedTicketRequests)[i]=NULL;
315                 CpvAccess(_numBufferedTicketRequests)[i]=0;
316         }
317  
318         // Cpv variables for causal protocol
319         _numBufferedDets = 0;
320         _indexBufferedDets = 0;
321         _phaseBufferedDets = 0;
322         _maxBufferedDets = INITIAL_BUFFERED_DETERMINANTS;
323         CpvInitialize(char *, _localDets);
324         CpvInitialize((CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> *),_remoteDets);
325         CpvInitialize(char *, _incarnation);
326         CpvInitialize(RemoveDeterminantsHeader *, _removeDetsHeader);
327         CpvInitialize(StoreDeterminantsHeader *, _storeDetsHeader);
328         CpvInitialize(int *, _storeDetsSizes);
329         CpvInitialize(char **, _storeDetsPtrs);
330         CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
331         CpvAccess(_remoteDets) = new CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *>(100, 0.4);
332         CpvAccess(_incarnation) = (char *) CmiAlloc(CmiNumPes() * sizeof(int));
333         for(int i=0; i<CmiNumPes(); i++){
334                 CpvAccess(_incarnation)[i] = 0;
335         }
336         CpvAccess(_removeDetsHeader) = (RemoveDeterminantsHeader *) CmiAlloc(sizeof(RemoveDeterminantsHeader));
337         CpvAccess(_storeDetsHeader) = (StoreDeterminantsHeader *) CmiAlloc(sizeof(StoreDeterminantsHeader));
338         CpvAccess(_storeDetsSizes) = (int *) CmiAlloc(sizeof(int) * 2);
339         CpvAccess(_storeDetsPtrs) = (char **) CmiAlloc(sizeof(char *) * 2);
340
341         // Cpv variables for parallel recovery
342         CpvInitialize(int, _numEmigrantRecObjs);
343     CpvAccess(_numEmigrantRecObjs) = 0;
344     CpvInitialize(int, _numImmigrantRecObjs);
345     CpvAccess(_numImmigrantRecObjs) = 0;
346
347     CpvInitialize(CkVec<CkLocation *> *, _immigrantRecObjs);
348     CpvAccess(_immigrantRecObjs) = new CkVec<CkLocation *>;
349
350         //Cpv variables for checkpoint
351         CpvInitialize(StoredCheckpoint *,_storedCheckpointData);
352         CpvAccess(_storedCheckpointData) = new StoredCheckpoint;
353
354         // registering user events for projections      
355         traceRegisterUserEvent("Remove Logs", 20);
356         traceRegisterUserEvent("Ticket Request Handler", 21);
357         traceRegisterUserEvent("Ticket Handler", 22);
358         traceRegisterUserEvent("Local Message Copy Handler", 23);
359         traceRegisterUserEvent("Local Message Ack Handler", 24);        
360         traceRegisterUserEvent("Preprocess current message",25);
361         traceRegisterUserEvent("Preprocess past message",26);
362         traceRegisterUserEvent("Preprocess future message",27);
363         traceRegisterUserEvent("Checkpoint",28);
364         traceRegisterUserEvent("Checkpoint Store",29);
365         traceRegisterUserEvent("Checkpoint Ack",30);
366         traceRegisterUserEvent("Send Ticket Request",31);
367         traceRegisterUserEvent("Generalticketrequest1",32);
368         traceRegisterUserEvent("TicketLogLocal",33);
369         traceRegisterUserEvent("next_ticket and SN",34);
370         traceRegisterUserEvent("Timeout for buffered remote messages",35);
371         traceRegisterUserEvent("Timeout for buffered local messages",36);
372         traceRegisterUserEvent("Inform Location Home",37);
373         traceRegisterUserEvent("Receive Location Handler",38);
374         
375         lastCompletedAlarm=CmiWallTimer();
376         lastRestart = CmiWallTimer();
377
378 #if COLLECT_STATS_MSGS
379 #if COLLECT_STATS_MSGS_TOTAL
380         totalMsgsTarget = 0;
381         totalMsgsSize = 0.0;
382 #else
383         numMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
384         sizeMsgsTarget = (int *)CmiAlloc(sizeof(int) * CmiNumPes());
385         for(int i=0; i<CmiNumPes(); i++){
386                 numMsgsTarget[i] = 0;
387                 sizeMsgsTarget[i] = 0;
388         }
389 #endif
390 #endif
391 #if COLLECT_STATS_DETS
392         numPiggyDets = 0;
393         numDets = 0;
394         numDupDets = 0;
395 #endif
396 #if COLLECT_STATS_MEMORY
397         msgLogSize = 0;
398         bufferedDetsSize = 0;
399         storedDetsSize = 0;
400 #endif
401
402
403 }
404
405 void killLocal(void *_dummy,double curWallTime);        
406
407 void readKillFile(){
408         FILE *fp=fopen(killFile,"r");
409         if(!fp){
410                 return;
411         }
412         int proc;
413         double sec;
414         while(fscanf(fp,"%d %lf",&proc,&sec)==2){
415                 if(proc == CkMyPe()){
416                         killTime = CmiWallTimer()+sec;
417                         printf("[%d] To be killed after %.6lf s (MLOG) \n",CkMyPe(),sec);
418                         CcdCallFnAfter(killLocal,NULL,sec*1000);        
419                 }
420         }
421         fclose(fp);
422 }
423
424 /**
425  * @brief: reads the PE that will be failing throughout the execution and the mean time between failures.
426  * We assume an exponential distribution for the mean-time-between-failures.
427  */
428 void readFaultFile(){
429         FILE *fp=fopen(faultFile,"r");
430         if(!fp){
431                 return;
432         }
433         int proc;
434         double sec;
435         fscanf(fp,"%d %lf",&proc,&sec);
436         faultMean = sec;
437         if(proc == CkMyPe()){
438                 printf("[%d] PE %d to be killed every %.6lf s (MEMCKPT) \n",CkMyPe(),proc,sec);
439                 CcdCallFnAfter(killLocal,NULL,sec*1000);
440         }
441         fclose(fp);
442 }
443
444 void killLocal(void *_dummy,double curWallTime){
445         printf("[%d] KillLocal called at %.6lf \n",CkMyPe(),CmiWallTimer());
446         if(CmiWallTimer()<killTime-1){
447                 CcdCallFnAfter(killLocal,NULL,(killTime-CmiWallTimer())*1000);  
448         }else{  
449                 kill(getpid(),SIGKILL);
450         }
451 }
452
453 /*** Auxiliary Functions ***/
454
455 /**
456  * @brief Adds a determinants to the buffered determinants and checks whether the array of buffered
457  * determinants needs to be extended.
458  */
459 inline void addBufferedDeterminant(CkObjID sender, CkObjID receiver, MCount SN, MCount TN){
460         Determinant *det, *auxDet;
461         char *aux;
462
463         DEBUG(CkPrintf("[%d]Adding determinant\n",CkMyPe()));
464
465         // checking if we are overflowing the buffered determinants array
466         if(_indexBufferedDets >= _maxBufferedDets){
467                 aux = CpvAccess(_localDets);
468                 _maxBufferedDets *= 2;
469                 CpvAccess(_localDets) = (char *) CmiAlloc(_maxBufferedDets * sizeof(Determinant));
470                 memcpy(CpvAccess(_localDets), aux, _indexBufferedDets * sizeof(Determinant));
471                 CmiFree(aux);
472         }
473
474         // adding the new determinant at the end
475         det = (Determinant *) (CpvAccess(_localDets) + _indexBufferedDets * sizeof(Determinant));
476         det->sender = sender;
477         det->receiver = receiver;
478         det->SN = SN;
479         det->TN = TN;
480         _numBufferedDets++;
481         _indexBufferedDets++;
482
483 #if COLLECT_STATS_MEMORY
484         bufferedDetsSize++;
485 #endif
486 #if COLLECT_STATS_DETS
487         numDets++;
488 #endif
489 }
490
491 /************************ Message logging methods ****************/
492
493 /**
494  * Sends a group message that might be a broadcast.
495  */
496 void sendGroupMsg(envelope *env, int destPE, int _infoIdx){
497         if(destPE == CLD_BROADCAST || destPE == CLD_BROADCAST_ALL){
498                 DEBUG(printf("[%d] Group Broadcast \n",CkMyPe()));
499                 void *origMsg = EnvToUsr(env);
500                 for(int i=0;i<CmiNumPes();i++){
501                         if(!(destPE == CLD_BROADCAST && i == CmiMyPe())){
502                                 void *copyMsg = CkCopyMsg(&origMsg);
503                                 envelope *copyEnv = UsrToEnv(copyMsg);
504                                 copyEnv->SN=0;
505                                 copyEnv->TN=0;
506                                 copyEnv->sender.type = TypeInvalid;
507                                 DEBUG(printf("[%d] Sending group broadcast message to proc %d \n",CkMyPe(),i));
508                                 sendGroupMsg(copyEnv,i,_infoIdx);
509                         }
510                 }
511                 return;
512         }
513
514         // initializing values of envelope
515         env->SN=0;
516         env->TN=0;
517         env->sender.type = TypeInvalid;
518
519         CkObjID recver;
520         recver.type = TypeGroup;
521         recver.data.group.id = env->getGroupNum();
522         recver.data.group.onPE = destPE;
523         sendCommonMsg(recver,env,destPE,_infoIdx);
524 }
525
526 /**
527  * Sends a nodegroup message that might be a broadcast.
528  */
529 void sendNodeGroupMsg(envelope *env, int destNode, int _infoIdx){
530         if(destNode == CLD_BROADCAST || destNode == CLD_BROADCAST_ALL){
531                 DEBUG(printf("[%d] NodeGroup Broadcast \n",CkMyPe()));
532                 void *origMsg = EnvToUsr(env);
533                 for(int i=0;i<CmiNumNodes();i++){
534                         if(!(destNode == CLD_BROADCAST && i == CmiMyNode())){
535                                 void *copyMsg = CkCopyMsg(&origMsg);
536                                 envelope *copyEnv = UsrToEnv(copyMsg);
537                                 copyEnv->SN=0;
538                                 copyEnv->TN=0;
539                                 copyEnv->sender.type = TypeInvalid;
540                                 sendNodeGroupMsg(copyEnv,i,_infoIdx);
541                         }
542                 }
543                 return;
544         }
545
546         // initializing values of envelope
547         env->SN=0;
548         env->TN=0;
549         env->sender.type = TypeInvalid;
550
551         CkObjID recver;
552         recver.type = TypeNodeGroup;
553         recver.data.group.id = env->getGroupNum();
554         recver.data.group.onPE = destNode;
555         sendCommonMsg(recver,env,destNode,_infoIdx);
556 }
557
558 /**
559  * Sends a message to an array element.
560  */
561 void sendArrayMsg(envelope *env,int destPE,int _infoIdx){
562         CkObjID recver;
563         recver.type = TypeArray;
564         recver.data.array.id = env->getsetArrayMgr();
565         recver.data.array.idx.asChild() = *(&env->getsetArrayIndex());
566
567         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
568                 char recverString[100],senderString[100];
569                 
570                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
571         }
572
573         // initializing values of envelope
574         env->SN = 0;
575         env->TN = 0;
576
577         sendCommonMsg(recver,env,destPE,_infoIdx);
578 };
579
580 /**
581  * Sends a message to a singleton chare.
582  */
583 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid){
584         CkObjID recver;
585         recver.type = TypeChare;
586         recver.data.chare.id = *pCid;
587
588         if(CpvAccess(_currentObj)!=NULL &&  CpvAccess(_currentObj)->mlogData->objID.type != TypeArray){
589                 char recverString[100],senderString[100];
590                 
591                 DEBUG(printf("[%d] %s being sent message from non-array %s \n",CkMyPe(),recver.toString(recverString),CpvAccess(_currentObj)->mlogData->objID.toString(senderString)));
592         }
593
594         // initializing values of envelope
595         env->SN = 0;
596         env->TN = 0;
597
598         sendCommonMsg(recver,env,destPE,_infoIdx);
599 };
600
601 /**
602  * A method to generate the actual ticket requests for groups, nodegroups or arrays.
603  */
604 void sendCommonMsg(CkObjID &recver,envelope *_env,int destPE,int _infoIdx){
605         envelope *env = _env;
606         MCount ticketNumber = 0;
607         int resend=0; //is it a resend
608         char recverName[100];
609         char senderString[100];
610         double _startTime=CkWallTimer();
611         
612         DEBUG_MEM(CmiMemoryCheck());
613
614         if(CpvAccess(_currentObj) == NULL){
615 //              CkAssert(0);
616                 DEBUG(printf("[%d] !!!!WARNING: _currentObj is NULL while message is being sent\n",CkMyPe());)
617                 generalCldEnqueue(destPE,env,_infoIdx);
618                 return;
619         }
620
621         // checking if this message should bypass determinants in message-logging
622         if(env->flags & CK_BYPASS_DET_MLOG){
623                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
624                 env->recver = recver;
625                 DEBUG(CkPrintf("[%d] Bypassing determinants from %s to %s PE %d\n",CkMyPe(),CpvAccess(_currentObj)->mlogData->objID.toString(senderString),recver.toString(recverName),destPE));
626                 generalCldEnqueue(destPE,env,_infoIdx);
627                 return;
628         }
629         
630         // setting message logging data in the envelope
631         env->incarnation = CpvAccess(_incarnation)[CkMyPe()];
632         if(env->sender.type == TypeInvalid){
633                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
634         }else{
635 //              envelope *copyEnv = copyEnvelope(env);
636 //              env = copyEnv;
637                 env->sender = CpvAccess(_currentObj)->mlogData->objID;
638                 env->SN = 0;
639         }
640         
641         DEBUG_MEM(CmiMemoryCheck());
642
643         CkObjID &sender = env->sender;
644         env->recver = recver;
645
646         Chare *obj = (Chare *)env->sender.getObject();
647           
648         if(env->SN == 0){
649                 DEBUG_MEM(CmiMemoryCheck());
650                 env->SN = obj->mlogData->nextSN(recver);
651         }else{
652                 resend = 1;
653         }
654         
655 //      if(env->SN != 1){
656                 DEBUG(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
657         //      CmiPrintStackTrace(0);
658 /*      }else{
659                 DEBUG_RESTART(printf("[%d] Generate Ticket Request to %s from %s PE %d SN %d \n",CkMyPe(),env->recver.toString(recverName),env->sender.toString(senderString),destPE,env->SN));
660         }*/
661                 
662 //      CkPackMessage(&(mEntry->env));
663 //      traceUserBracketEvent(32,_startTime,CkWallTimer());
664         
665         _startTime = CkWallTimer();
666
667         // uses the proper ticketing mechanism for local, team and general messages
668         if(isLocal(destPE)){
669                 sendLocalMsg(env, _infoIdx);
670         }else{
671                 if((teamSize > 1) && isTeamLocal(destPE)){
672
673                         // look to see if this message already has a ticket in the team-table
674                         Chare *senderObj = (Chare *)sender.getObject();
675                         SNToTicket *ticketRow = senderObj->mlogData->teamTable.get(recver);
676                         if(ticketRow != NULL){
677                                 Ticket ticket = ticketRow->get(env->SN);
678                                 if(ticket.TN != 0){
679                                         ticketNumber = ticket.TN;
680                                         DEBUG(CkPrintf("[%d] Found a team preticketed message\n",CkMyPe()));
681                                 }
682                         }
683                 }
684                 
685                 // sending the message
686                 MlogEntry *mEntry = new MlogEntry(env,destPE,_infoIdx);
687                 sendMsg(sender,recver,destPE,mEntry,env->SN,ticketNumber,resend);
688                 
689         }
690 }
691
692 /**
693  * Determines if the message is local or not. A message is local if:
694  * 1) Both the destination and origin are the same PE.
695  */
696 inline bool isLocal(int destPE){
697         // both the destination and the origin are the same PE
698         if(destPE == CkMyPe())
699                 return true;
700
701         return false;
702 }
703
704 /**
705  * Determines if the message is group local or not. A message is group local if:
706  * 1) They belong to the same group in the group-based message logging.
707  */
708 inline bool isTeamLocal(int destPE){
709
710         // they belong to the same group
711         if(teamSize > 1 && destPE/teamSize == CkMyPe()/teamSize)
712                 return true;
713
714         return false;
715 }
716
717 /**
718  * Method that does the actual send by creating a ticket request filling it up and sending it.
719  */
720 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend){
721         DEBUG_NOW(char recverString[100]);
722         DEBUG_NOW(char senderString[100]);
723
724         int totalSize;
725
726         envelope *env = entry->env;
727         DEBUG(printf("[%d] Sending message to %s from %s PE %d SN %d time %.6lf \n",CkMyPe(),env->recver.toString(recverString),env->sender.toString(senderString),destPE,env->SN,CkWallTimer()));
728
729         // setting all the information
730         Chare *obj = (Chare *)entry->env->sender.getObject();
731         entry->env->recver = recver;
732         entry->env->SN = SN;
733         entry->env->TN = TN;
734         if(!resend){
735                 //TML: only stores message if either it goes to this processor or to a processor in a different group
736                 if(!isTeamLocal(entry->destPE)){
737                         obj->mlogData->addLogEntry(entry);
738 #if COLLECT_STATS_LOGGING
739                         MLOGFT_totalMessages += 1.0;
740                         MLOGFT_totalLogSize += entry->env->getTotalsize();
741                         if(entry->env->flags & CK_MULTICAST_MSG_MLOG){
742                                 MLOGFT_totalMcastLogSize += entry->env->getTotalsize(); 
743                         }
744                         if(entry->env->flags & CK_REDUCTION_MSG_MLOG){
745                                 MLOGFT_totalReductionLogSize += entry->env->getTotalsize();     
746                         }
747
748 #endif
749                 }else{
750                         // the message has to be deleted after it has been sent
751                         entry->env->flags = entry->env->flags | CK_FREE_MSG_MLOG;
752                 }
753         }
754
755         // sending the determinants that causally affect this message
756         if(_numBufferedDets > 0){
757
758                 // modifying the actual number of determinants sent in message
759                 CpvAccess(_storeDetsHeader)->number = _numBufferedDets;
760                 CpvAccess(_storeDetsHeader)->index = _indexBufferedDets;
761                 CpvAccess(_storeDetsHeader)->phase = _phaseBufferedDets;
762                 CpvAccess(_storeDetsHeader)->PE = CmiMyPe();
763         
764                 // sending the message
765                 CpvAccess(_storeDetsSizes)[0] = sizeof(StoreDeterminantsHeader);
766                 CpvAccess(_storeDetsSizes)[1] = _numBufferedDets * sizeof(Determinant);
767                 CpvAccess(_storeDetsPtrs)[0] = (char *) CpvAccess(_storeDetsHeader);
768                 CpvAccess(_storeDetsPtrs)[1] = CpvAccess(_localDets) + (_indexBufferedDets - _numBufferedDets) * sizeof(Determinant);
769                 DEBUG(CkPrintf("[%d] Sending %d determinants\n",CkMyPe(),_numBufferedDets));
770                 CmiSetHandler(CpvAccess(_storeDetsHeader), _storeDeterminantsHandlerIdx);
771                 CmiSyncVectorSend(destPE, 2, CpvAccess(_storeDetsSizes), CpvAccess(_storeDetsPtrs));
772         }
773
774         // updating its message log entry
775         entry->indexBufDets = _indexBufferedDets;
776         entry->numBufDets = _numBufferedDets;
777
778         // sending the message
779         generalCldEnqueue(destPE, entry->env, entry->_infoIdx);
780
781         DEBUG_MEM(CmiMemoryCheck());
782 #if COLLECT_STATS_MSGS
783 #if COLLECT_STATS_MSGS_TOTAL
784         totalMsgsTarget++;
785         totalMsgsSize += (float)env->getTotalsize();
786 #else
787         numMsgsTarget[destPE]++;
788         sizeMsgsTarget[destPE] += env->getTotalsize();
789 #endif
790 #endif
791 #if COLLECT_STATS_DETS
792         numPiggyDets += _numBufferedDets;
793 #endif
794 #if COLLECT_STATS_MEMORY
795         msgLogSize += env->getTotalsize();
796 #endif
797 };
798
799
800 /**
801  * @brief Function to send a local message. It first gets a ticket and
802  * then enqueues the message. If we are recovering, then the message 
803  * is enqueued in a delay queue.
804  */
805 void sendLocalMsg(envelope *env, int _infoIdx){
806         DEBUG_PERF(double _startTime=CkWallTimer());
807         DEBUG_MEM(CmiMemoryCheck());
808         DEBUG(Chare *senderObj = (Chare *)env->sender.getObject();)
809         DEBUG(char senderString[100]);
810         DEBUG(char recverString[100]);
811         Ticket ticket;
812
813         DEBUG(printf("[%d] Local Message being sent for SN %d sender %s recver %s \n",CmiMyPe(),env->SN,env->sender.toString(senderString),env->recver.toString(recverString)));
814
815         // getting the receiver local object
816         Chare *recverObj = (Chare *)env->recver.getObject();
817
818         // if receiver object is not NULL, we will ask it for a ticket
819         if(recverObj){
820
821 /*HERE          // if we are recovery, then we must put this off for later
822                 if(recverObj->mlogData->restartFlag){
823                         CpvAccess(_delayedLocalMsgs)->enq(entry);
824                         return;
825                 }
826
827                 // check if this combination of sender and SN has been already a ticket
828                 ticket = recverObj->mlogData->getTicket(entry->env->sender, entry->env->SN);
829                         
830                 // assigned a new ticket in case this message is completely new
831                 if(ticket.TN == 0){
832                         ticket = recverObj->mlogData->next_ticket(entry->env->sender,entry->env->SN);
833
834                         // if TN == 0 we enqueue this message since we are recovering from a crash
835                         if(ticket.TN == 0){
836                                 CpvAccess(_delayedLocalMsgs)->enq(entry);
837                                 DEBUG(printf("[%d] Local Message request enqueued for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
838                                 
839 //                              traceUserBracketEvent(33,_startTime,CkWallTimer());
840                                 return;
841                         }
842                 }
843
844                 //TODO: check for the case when an invalid ticket is returned
845                 //TODO: check for OLD or RECEIVED TICKETS
846                 entry->env->TN = ticket.TN;
847                 CkAssert(entry->env->TN > 0);
848                 DEBUG(printf("[%d] Local Message gets TN %d for SN %d sender %s recver %s \n",CmiMyPe(),entry->env->TN,entry->env->SN,entry->env->sender.toString(senderString),entry->env->recver.toString(recverString)));
849         
850                 DEBUG_MEM(CmiMemoryCheck());
851
852                 // adding the determinant to _localDets
853                 addBufferedDeterminant(entry->env->sender, entry->env->recver, entry->env->SN, entry->env->TN);
854 */
855                 // sends the local message
856                 _skipCldEnqueue(CmiMyPe(),env,_infoIdx);        
857
858                 DEBUG_MEM(CmiMemoryCheck());
859         }else{
860                 DEBUG(printf("[%d] Local recver object is NULL \n",CmiMyPe()););
861         }
862 //      traceUserBracketEvent(33,_startTime,CkWallTimer());
863 };
864
865 /****
866         The handler functions
867 *****/
868
869 /*** CAUSAL PROTOCOL HANDLERS ***/
870
871 /**
872  * @brief Removes the determinants after a particular index in the _localDets array.
873  */
874 void _removeDeterminantsHandler(char *buffer){
875         RemoveDeterminantsHeader *header;
876         int index, phase;
877
878         // getting the header from the message
879         header = (RemoveDeterminantsHeader *)buffer;
880         index = header->index;
881         phase = header->phase;
882
883         // fprintf(stderr,"ACK index=%d\n",index);
884
885         // updating the number of buffered determinants
886         if(phase == _phaseBufferedDets){
887                 if(index > _indexBufferedDets)
888                         CkPrintf("phase: %d %d, index:%d %d\n",phase, _phaseBufferedDets, index, _indexBufferedDets);
889                 CmiAssert(index <= _indexBufferedDets);
890                 _numBufferedDets = _indexBufferedDets - index;
891         }
892
893         // releasing memory
894         CmiFree(buffer);
895
896 }
897
898 /**
899  * @brief Stores the determinants coming from other processor.
900  */
901 void _storeDeterminantsHandler(char *buffer){
902         StoreDeterminantsHeader *header;
903         Determinant *detPtr, det;
904         int i, n, index, phase, destPE;
905         CkVec<Determinant> *vec;
906
907         // getting the header from the message and pointing to the first determinant
908         header = (StoreDeterminantsHeader *)buffer;
909         n = header->number;
910         index = header->index;
911         phase = header->phase;
912         destPE = header->PE;
913         detPtr = (Determinant *)(buffer + sizeof(StoreDeterminantsHeader));
914
915         DEBUG(CkPrintf("[%d] Storing %d determinants\n",CkMyPe(),header->number));
916         DEBUG_MEM(CmiMemoryCheck());
917
918         // going through all determinants and storing them into _remoteDets
919         for(i = 0; i < n; i++){
920                 det.sender = detPtr->sender;
921                 det.receiver = detPtr->receiver;
922                 det.SN = detPtr->SN;
923                 det.TN = detPtr->TN;
924
925                 // getting the determinant array
926                 vec = CpvAccess(_remoteDets)->get(det.receiver);
927                 if(vec == NULL){
928                         vec = new CkVec<Determinant>();
929                         CpvAccess(_remoteDets)->put(det.receiver) = vec;
930                 }
931 #if COLLECT_STATS_DETS
932 #if COLLECT_STATS_DETS_DUP
933                 for(int j=0; j<vec->size(); j++){
934                         if(isSameDet(&(*vec)[j],&det)){
935                                 numDupDets++;
936                                 break;
937                         }
938                 }
939 #endif
940 #endif
941 #if COLLECT_STATS_MEMORY
942                 storedDetsSize++;
943 #endif
944                 vec->push_back(det);
945                 detPtr = detPtr++;
946         }
947
948         DEBUG_MEM(CmiMemoryCheck());
949
950         // freeing buffer
951         CmiFree(buffer);
952
953         // sending the ACK back to the sender
954         CpvAccess(_removeDetsHeader)->index = index;
955         CpvAccess(_removeDetsHeader)->phase = phase;
956         CmiSetHandler(CpvAccess(_removeDetsHeader),_removeDeterminantsHandlerIdx);
957         CmiSyncSend(destPE, sizeof(RemoveDeterminantsHeader), (char *)CpvAccess(_removeDetsHeader));
958         
959 }
960
961 /**
962  *  If there are any delayed requests, process them first before 
963  *  processing this request
964  * */
965 inline void _ticketRequestHandler(TicketRequest *ticketRequest){
966         DEBUG(printf("[%d] Ticket Request handler started \n",CkMyPe()));
967         double  _startTime = CkWallTimer();
968         CmiFree(ticketRequest);
969 //      traceUserBracketEvent(21,_startTime,CkWallTimer());                     
970 }
971
972
973 /**
974  * @brief Gets a ticket for a recently received message.
975  * @pre env->recver has to be on this processor.
976  * @return Returns true if ticket assignment is successful, otherwise returns false. A false result is due to the fact that we are recovering.
977  */
978 inline bool _getTicket(envelope *env, int *flag){
979         DEBUG_MEM(CmiMemoryCheck());
980         DEBUG(char recverName[100]);
981         DEBUG(char senderString[100]);
982         Ticket ticket;
983         
984         // getting information from request
985         CkObjID sender = env->sender;
986         CkObjID recver = env->recver;
987         MCount SN = env->SN;
988         MCount TN = env->TN;
989         Chare *recverObj = (Chare *)recver.getObject();
990         
991         DEBUG(recver.toString(recverName);)
992
993         // verifying whether we are recovering from a failure or not
994         if(recverObj->mlogData->restartFlag)
995                 return false;
996
997         // checking ticket table to see if this message already received a ticket
998         ticket = recverObj->mlogData->getTicket(sender, SN);
999         TN = ticket.TN;
1000         
1001         // checking if the message is team local and if it has a ticket already assigned
1002         if(teamSize > 1 && TN != 0){
1003                 DEBUG(CkPrintf("[%d] Message has a ticket already assigned\n",CkMyPe()));
1004                 recverObj->mlogData->verifyTicket(sender,SN,TN);
1005         }
1006
1007         //check if a ticket for this has been already handed out to an object that used to be local but 
1008         // is no longer so.. need for parallel restart
1009         if(TN == 0){
1010                 ticket = recverObj->mlogData->next_ticket(sender,SN);
1011                 *flag = NEW_TICKET;
1012         } else {
1013                 *flag = OLD_TICKET;
1014         }
1015         if(ticket.TN > recverObj->mlogData->tProcessed){
1016                 ticket.state = NEW_TICKET;
1017         } else {
1018                 ticket.state = OLD_TICKET;
1019         }
1020         // @todo: check for the case when an invalid ticket is returned
1021         if(ticket.TN == 0){
1022                 DEBUG(printf("[%d] Ticket request to %s SN %d from %s delayed mesg %p\n",CkMyPe(),recverName, SN,sender.toString(senderString),ticketRequest));
1023                 return false;
1024         }
1025                 
1026         // setting the ticket number in the envelope
1027         env->TN = ticket.TN;
1028         DEBUG(printf("[%d] TN %d handed out to %s SN %d by %s sent to PE %d mesg %p at %.6lf\n",CkMyPe(),ticket.TN,sender.toString(senderString),SN,recverName,ticketRequest->senderPE,ticketRequest,CmiWallTimer()));
1029         return true;
1030
1031 };
1032
1033 bool fault_aware(CkObjID &recver){
1034         switch(recver.type){
1035                 case TypeChare:
1036                         return true;
1037                 case TypeMainChare:
1038                         return false;
1039                 case TypeGroup:
1040                 case TypeNodeGroup:
1041                 case TypeArray:
1042                         return true;
1043                 default:
1044                         return false;
1045         }
1046 };
1047
1048 /* Preprocesses a received message */
1049 int preProcessReceivedMessage(envelope *env, Chare **objPointer, MlogEntry **logEntryPointer){
1050         DEBUG_NOW(char recverString[100]);
1051         DEBUG_NOW(char senderString[100]);
1052         DEBUG_MEM(CmiMemoryCheck());
1053         int flag;
1054         bool ticketSuccess;
1055
1056         // getting the receiver object
1057         CkObjID recver = env->recver;
1058         Chare *obj = (Chare *)recver.getObject();
1059         *objPointer = obj;
1060
1061         // checking for determinants bypass in message logging
1062         if(env->flags & CK_BYPASS_DET_MLOG){
1063                 DEBUG(printf("[%d] Bypassing message sender %s recver %s \n",CkMyPe(),env->sender.toString(senderString), recver.toString(recverString)));
1064                 return 1;       
1065         }
1066
1067         // checking if receiver is fault aware
1068         if(!fault_aware(recver)){
1069                 CkPrintf("[%d] Receiver NOT fault aware\n",CkMyPe());
1070                 return 1;
1071         }
1072
1073         // checking if receiver is NULL
1074         if(obj == NULL){
1075                 return 1;
1076
1077 /*              int possiblePE = recver.guessPE();
1078                 if(possiblePE != CkMyPe()){
1079                         int totalSize = env->getTotalsize();
1080                         CmiSyncSendAndFree(possiblePE,totalSize,(char *)env);
1081                         DEBUG_PE(0,printf("[%d] Forwarding message SN %d sender %s recver %s to %d\n",CkMyPe(),env->SN,env->sender.toString(senderString), recver.toString(recverString), possiblePE));
1082                 }else{
1083                         // this is the case where a message is received and the object has not been initialized
1084                         // we delayed the delivery of the message
1085                         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1086                         DEBUG_PE(0,printf("[%d] Message SN %d TN %d sender %s recver %s, receiver NOT found\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString)));
1087                 }
1088                 return 0;*/
1089
1090         }
1091
1092         // checking if message comes from an old incarnation (if so, message must be discarded)
1093         if(env->incarnation < CpvAccess(_incarnation)[env->getSrcPe()]){
1094                 CmiFree(env);
1095                 return 0;
1096         }
1097
1098         DEBUG_MEM(CmiMemoryCheck());
1099         DEBUG_PE(2,printf("[%d] Message received, sender = %s SN %d TN %d tProcessed %d for recver %s at %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1100
1101         // getting a ticket for this message
1102         ticketSuccess = _getTicket(env,&flag);
1103
1104         // we might be during recovery when this message arrives
1105         if(!ticketSuccess){
1106                 
1107                 // adding the message to a delay queue
1108                 CqsEnqueue(CpvAccess(_delayedRemoteMessageQueue),env);
1109                 DEBUG(printf("[%d] Adding to delayed remote message queue\n",CkMyPe()));
1110
1111                 return 0;
1112         }
1113         
1114         //printf("[%d] ----------> SN = %d, TN = %d\n",CkMyPe(),env->SN,env->TN);
1115         //printf("[%d] ----------> numBufferedDeterminants = %d \n",CkMyPe(),_numBufferedDets);
1116         
1117         if(flag == NEW_TICKET){
1118                 // storing determinant of message in data structure
1119                 addBufferedDeterminant(env->sender, env->recver, env->SN, env->TN);
1120         }
1121
1122         DEBUG_MEM(CmiMemoryCheck());
1123
1124         double _startTime = CkWallTimer();
1125 //env->sender.updatePosition(env->getSrcPe());
1126         if(env->TN == obj->mlogData->tProcessed+1){
1127                 //the message that needs to be processed now
1128                 DEBUG_PE(2,printf("[%d] Message SN %d TN %d sender %s recver %s being processed recvPointer %p\n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString), recver.toString(recverString),obj));
1129                 // once we find a message that we can process we put back all the messages in the out of order queue
1130                 // back into the main scheduler queue. 
1131         DEBUG_MEM(CmiMemoryCheck());
1132                 while(!CqsEmpty(CpvAccess(_outOfOrderMessageQueue))){
1133                         void *qMsgPtr;
1134                         CqsDequeue(CpvAccess(_outOfOrderMessageQueue),&qMsgPtr);
1135                         envelope *qEnv = (envelope *)qMsgPtr;
1136                         CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
1137         DEBUG_MEM(CmiMemoryCheck());
1138                 }
1139 //              traceUserBracketEvent(25,_startTime,CkWallTimer());
1140                 //TODO: this might be a problem.. change made for leanMD
1141 //              CpvAccess(_currentObj) = obj;
1142         DEBUG_MEM(CmiMemoryCheck());
1143                 return 1;
1144         }
1145
1146         // checking if message has already been processed
1147         // message must be discarded
1148         if(env->TN <= obj->mlogData->tProcessed){
1149                 DEBUG_PE(3,printf("[%d] Message SN %d TN %d sender %s for recver %s being ignored tProcessed %d \n",CkMyPe(),env->SN,env->TN,env->sender.toString(senderString),recver.toString(recverString),obj->mlogData->tProcessed));
1150                 
1151                 CmiFree(env);
1152                 return 0;
1153         }
1154         //message that needs to be processed in the future
1155
1156         DEBUG_PE(3,printf("[%d] Early Message sender = %s SN %d TN %d tProcessed %d for recver %s stored for future time %.6lf \n",CkMyPe(),env->sender.toString(senderString),env->SN,env->TN,obj->mlogData->tProcessed, recver.toString(recverString),CkWallTimer()));
1157         //the message cant be processed now put it back in the out of order message Q, 
1158         //It will be transferred to the main queue later
1159         CqsEnqueue(CpvAccess(_outOfOrderMessageQueue),env);
1160 //              traceUserBracketEvent(27,_startTime,CkWallTimer());
1161         DEBUG_MEM(CmiMemoryCheck());
1162         
1163         return 0;
1164 }
1165
1166 /**
1167  * @brief Updates a few variables once a message has been processed.
1168  */
1169 void postProcessReceivedMessage(Chare *obj, CkObjID &sender, MCount SN, MlogEntry *entry){
1170         DEBUG(char senderString[100]);
1171         if(obj){
1172                 if(sender.guessPE() == CkMyPe()){
1173                         if(entry != NULL){
1174                                 entry->env = NULL;
1175                         }
1176                 }
1177                 obj->mlogData->tProcessed++;
1178 /*              DEBUG(int qLength = CqsLength((Queue )CpvAccess(CsdSchedQueue)));               
1179                 DEBUG(printf("[%d] Message SN %d %s has been processed  tProcessed %d scheduler queue length %d\n",CkMyPe(),SN,obj->mlogData->objID.toString(senderString),obj->mlogData->tProcessed,qLength));         */
1180 //              CpvAccess(_currentObj)= NULL;
1181         }
1182         DEBUG_MEM(CmiMemoryCheck());
1183 }
1184
1185 /***
1186         Helpers for the handlers and message logging methods
1187 ***/
1188
1189 void generalCldEnqueue(int destPE, envelope *env, int _infoIdx){
1190 //      double _startTime = CkWallTimer();
1191         if(env->recver.type != TypeNodeGroup){
1192         //This repeats a step performed in skipCldEnq for messages sent to
1193         //other processors. I do this here so that messages to local processors
1194         //undergo the same transformation.. It lets the resend be uniform for 
1195         //all messages
1196 //              CmiSetXHandler(env,CmiGetHandler(env));
1197                 _skipCldEnqueue(destPE,env,_infoIdx);
1198         }else{
1199                 _noCldNodeEnqueue(destPE,env);
1200         }
1201 //      traceUserBracketEvent(22,_startTime,CkWallTimer());
1202 }
1203 //extern "C" int CmiGetNonLocalLength();
1204 /** This method is used to retry the ticket requests
1205  * that had been queued up earlier
1206  * */
1207
1208 int calledRetryTicketRequest=0;
1209
1210 void _pingHandler(CkPingMsg *msg){
1211         printf("[%d] Received Ping from %d\n",CkMyPe(),msg->PE);
1212         CmiFree(msg);
1213 }
1214
1215
1216 /*****************************************************************************
1217         Checkpointing methods..
1218                 Pack all the data on a processor and send it to the buddy periodically
1219                 Also used to throw away message logs
1220 *****************************************************************************/
1221 CkVec<TProcessedLog> processedTicketLog;
1222 void buildProcessedTicketLog(void *data,ChareMlogData *mlogData);
1223 void clearUpMigratedRetainedLists(int PE);
1224
1225 void checkpointAlarm(void *_dummy,double curWallTime){
1226         double diff = curWallTime-lastCompletedAlarm;
1227         DEBUG(printf("[%d] calling for checkpoint %.6lf after last one\n",CkMyPe(),diff));
1228 /*      if(CkWallTimer()-lastRestart < 50){
1229                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1230                 return;
1231         }*/
1232         if(diff < ((chkptPeriod) - 2)){
1233                 CcdCallFnAfter(checkpointAlarm,NULL,(chkptPeriod-diff)*1000);
1234                 return;
1235         }
1236         CheckpointRequest request;
1237         request.PE = CkMyPe();
1238         CmiSetHandler(&request,_checkpointRequestHandlerIdx);
1239         CmiSyncBroadcastAll(sizeof(CheckpointRequest),(char *)&request);
1240 };
1241
1242 void _checkpointRequestHandler(CheckpointRequest *request){
1243         startMlogCheckpoint(NULL,CmiWallTimer());
1244 }
1245
1246 /**
1247  * @brief Starts the checkpoint phase after migration.
1248  */
1249 void startMlogCheckpoint(void *_dummy, double curWallTime){
1250         double _startTime = CkWallTimer();
1251
1252         // increasing the checkpoint counter
1253         checkpointCount++;
1254         
1255 #if DEBUG_CHECKPOINT
1256         if(CmiMyPe() == 0){
1257                 printf("[%d] starting checkpoint at %.6lf CmiTimer %.6lf \n",CkMyPe(),CmiWallTimer(),CmiTimer());
1258         }
1259 #endif
1260
1261         DEBUG_MEM(CmiMemoryCheck());
1262
1263         PUP::sizer psizer;
1264         psizer | checkpointCount;
1265         for(int i=0; i<CmiNumPes(); i++){
1266                 psizer | CpvAccess(_incarnation)[i];
1267         }
1268         CkPupROData(psizer);
1269         DEBUG_MEM(CmiMemoryCheck());
1270         CkPupGroupData(psizer,CmiTrue);
1271         DEBUG_MEM(CmiMemoryCheck());
1272         CkPupNodeGroupData(psizer,CmiTrue);
1273         DEBUG_MEM(CmiMemoryCheck());
1274         pupArrayElementsSkip(psizer,CmiTrue,NULL);
1275         DEBUG_MEM(CmiMemoryCheck());
1276
1277         int dataSize = psizer.size();
1278         int totalSize = sizeof(CheckPointDataMsg)+dataSize;
1279         char *msg = (char *)CmiAlloc(totalSize);
1280         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1281         chkMsg->PE = CkMyPe();
1282         chkMsg->dataSize = dataSize;
1283         
1284         char *buf = &msg[sizeof(CheckPointDataMsg)];
1285         PUP::toMem pBuf(buf);
1286
1287         pBuf | checkpointCount;
1288         for(int i=0; i<CmiNumPes(); i++){
1289                 pBuf | CpvAccess(_incarnation)[i];
1290         }
1291         CkPupROData(pBuf);
1292         CkPupGroupData(pBuf,CmiTrue);
1293         CkPupNodeGroupData(pBuf,CmiTrue);
1294         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
1295
1296         unAckedCheckpoint=1;
1297         CmiSetHandler(msg,_storeCheckpointHandlerIdx);
1298         CmiSyncSendAndFree(getCheckPointPE(),totalSize,msg);
1299         
1300         /*
1301                 Store the highest Ticket number processed for each chare on this processor
1302         */
1303         processedTicketLog.removeAll();
1304         forAllCharesDo(buildProcessedTicketLog,(void *)&processedTicketLog);
1305
1306 #if DEBUG_CHECKPOINT
1307         if(CmiMyPe() == 0){
1308                 printf("[%d] finishing checkpoint at %.6lf CmiTimer %.6lf with dataSize %d\n",CkMyPe(),CmiWallTimer(),CmiTimer(),dataSize);
1309         }
1310 #endif
1311
1312 #if COLLECT_STATS_MEMORY
1313         CkPrintf("[%d] CKP=%d BUF_DET=%d STO_DET=%d MSG_LOG=%d\n",CkMyPe(),totalSize,bufferedDetsSize*sizeof(Determinant),storedDetsSize*sizeof(Determinant),msgLogSize);
1314         msgLogSize = 0;
1315         bufferedDetsSize = 0;
1316         storedDetsSize = 0;
1317 #endif
1318
1319         if(CkMyPe() ==  0 && onGoingLoadBalancing==0 ){
1320                 lastCompletedAlarm = curWallTime;
1321                 CcdCallFnAfter(checkpointAlarm,NULL,chkptPeriod);
1322         }
1323         traceUserBracketEvent(28,_startTime,CkWallTimer());
1324 };
1325
1326 /**
1327  * @brief A chare adds the latest ticket number processed.
1328  */
1329 void buildProcessedTicketLog(void *data, ChareMlogData *mlogData){
1330         DEBUG(char objString[100]);
1331
1332         CkVec<TProcessedLog> *log = (CkVec<TProcessedLog> *)data;
1333         TProcessedLog logEntry;
1334         logEntry.recver = mlogData->objID;
1335         logEntry.tProcessed = mlogData->tProcessed;
1336         log->push_back(logEntry);
1337
1338         DEBUG(printf("[%d] Tickets lower than %d to be thrown away for %s \n",CkMyPe(),logEntry.tProcessed,logEntry.recver.toString(objString)));
1339 }
1340
1341 class ElementPacker : public CkLocIterator {
1342 private:
1343         CkLocMgr *locMgr;
1344         PUP::er &p;
1345 public:
1346         ElementPacker(CkLocMgr* mgr_, PUP::er &p_):locMgr(mgr_),p(p_){};
1347         void addLocation(CkLocation &loc) {
1348                 CkArrayIndexMax idx=loc.getIndex();
1349                 CkGroupID gID = locMgr->ckGetGroupID();
1350                 p|gID;      // store loc mgr's GID as well for easier restore
1351                 p|idx;
1352                 p|loc;
1353         }
1354 };
1355
1356 /**
1357  * Pups all the array elements in this processor.
1358  */
1359 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listsize){
1360         int numElements,i;
1361         int numGroups = CkpvAccess(_groupIDTable)->size();      
1362         if(!p.isUnpacking()){
1363                 numElements = CkCountArrayElements();
1364         }       
1365         p | numElements;
1366         DEBUG(printf("[%d] Number of arrayElements %d \n",CkMyPe(),numElements));
1367         if(!p.isUnpacking()){
1368                 CKLOCMGR_LOOP(ElementPacker packer(mgr, p); mgr->iterate(packer););
1369         }else{
1370                 //Flush all recs of all LocMgrs before putting in new elements
1371 //              CKLOCMGR_LOOP(mgr->flushAllRecs(););
1372                 for(int j=0;j<listsize;j++){
1373                         if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1374                                 printf("[%d] Array element to be skipped gid %d idx",CmiMyPe(),listToSkip[j].gID.idx);
1375                                 listToSkip[j].idx.print();
1376                         }
1377                 }
1378                 
1379                 printf("numElements = %d\n",numElements);
1380         
1381                 for (int i=0; i<numElements; i++) {
1382                         CkGroupID gID;
1383                         CkArrayIndexMax idx;
1384                         p|gID;
1385                 p|idx;
1386                         int flag=0;
1387                         int matchedIdx=0;
1388                         for(int j=0;j<listsize;j++){
1389                                 if(listToSkip[j].ackFrom == 0 && listToSkip[j].ackTo == 1){
1390                                         if(listToSkip[j].gID == gID && listToSkip[j].idx == idx){
1391                                                 matchedIdx = j;
1392                                                 flag = 1;
1393                                                 break;
1394                                         }
1395                                 }
1396                         }
1397                         if(flag == 1){
1398                                 printf("[%d] Array element being skipped gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1399                         }else{
1400                                 printf("[%d] Array element being recovered gid %d idx %s\n",CmiMyPe(),gID.idx,idx2str(idx));
1401                         }
1402                                 
1403                         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
1404                         CkPrintf("numLocalElements = %d\n",mgr->numLocalElements());
1405                         mgr->resume(idx,p,create,flag);
1406                         if(flag == 1){
1407                                 int homePE = mgr->homePe(idx);
1408                                 informLocationHome(gID,idx,homePE,listToSkip[matchedIdx].toPE);
1409                         }
1410                 }
1411         }
1412 };
1413
1414
1415 void writeCheckpointToDisk(int size,char *chkpt){
1416         char fNameTemp[100];
1417         sprintf(fNameTemp,"%s/mlogCheckpoint%d_tmp",checkpointDirectory,CkMyPe());
1418         int fd = creat(fNameTemp,S_IRWXU);
1419         int ret = write(fd,chkpt,size);
1420         CkAssert(ret == size);
1421         close(fd);
1422         
1423         char fName[100];
1424         sprintf(fName,"%s/mlogCheckpoint%d",checkpointDirectory,CkMyPe());
1425         unlink(fName);
1426
1427         rename(fNameTemp,fName);
1428 }
1429
1430 //handler that receives the checkpoint from a processor
1431 //it stores it and acks it
1432 void _storeCheckpointHandler(char *msg){
1433         double _startTime=CkWallTimer();
1434                 
1435         CheckPointDataMsg *chkMsg = (CheckPointDataMsg *)msg;
1436         DEBUG(printf("[%d] Checkpoint Data from %d stored with datasize %d\n",CkMyPe(),chkMsg->PE,chkMsg->dataSize);)
1437         
1438         char *chkpt = &msg[sizeof(CheckPointDataMsg)];  
1439         
1440         char *oldChkpt =        CpvAccess(_storedCheckpointData)->buf;
1441         if(oldChkpt != NULL){
1442                 char *oldmsg = oldChkpt - sizeof(CheckPointDataMsg);
1443                 CmiFree(oldmsg);
1444         }
1445         //turning off storing checkpoints
1446         
1447         int sendingPE = chkMsg->PE;
1448         
1449         CpvAccess(_storedCheckpointData)->buf = chkpt;
1450         CpvAccess(_storedCheckpointData)->bufSize = chkMsg->dataSize;
1451         CpvAccess(_storedCheckpointData)->PE = sendingPE;
1452
1453         int count=0;
1454         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1455                 if(migratedNoticeList[j].fromPE == sendingPE){
1456                         migratedNoticeList[j].ackFrom = 1;
1457                 }else{
1458                         CmiAssert("migratedNoticeList entry for processor other than buddy");
1459                 }
1460                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1461                         migratedNoticeList.remove(j);
1462                         count++;
1463                 }
1464                 
1465         }
1466         DEBUG(printf("[%d] For proc %d from number of migratedNoticeList cleared %d checkpointAckHandler %d\n",CmiMyPe(),sendingPE,count,_checkpointAckHandlerIdx));
1467         
1468         CheckPointAck ackMsg;
1469         ackMsg.PE = CkMyPe();
1470         ackMsg.dataSize = CpvAccess(_storedCheckpointData)->bufSize;
1471         CmiSetHandler(&ackMsg,_checkpointAckHandlerIdx);
1472         CmiSyncSend(sendingPE,sizeof(CheckPointAck),(char *)&ackMsg);
1473         
1474         traceUserBracketEvent(29,_startTime,CkWallTimer());
1475 };
1476
1477
1478 /**
1479  * @brief Sends out the messages asking senders to throw away message logs below a certain ticket number.       
1480  * @note The remove log request message looks like
1481                 |RemoveLogRequest||List of TProcessedLog||Number of Determinants||List of Determinants|
1482  */
1483 void sendRemoveLogRequests(){
1484 #if SYNCHRONIZED_CHECKPOINT
1485         CmiAbort("Remove log requests should not be sent in a synchronized checkpoint");
1486 #endif
1487         double _startTime = CkWallTimer();      
1488
1489         // computing total message size
1490         int totalSize = sizeof(RemoveLogRequest) + processedTicketLog.size()*sizeof(TProcessedLog) + sizeof(int) + sizeof(Determinant);
1491         char *requestMsg = (char *)CmiAlloc(totalSize);
1492
1493         // filling up the message
1494         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1495         request->PE = CkMyPe();
1496         request->numberObjects = processedTicketLog.size();
1497         char *listProcessedLogs = &requestMsg[sizeof(RemoveLogRequest)];
1498         memcpy(listProcessedLogs,(char *)processedTicketLog.getVec(),processedTicketLog.size()*sizeof(TProcessedLog));
1499         char *listDeterminants = &listProcessedLogs[processedTicketLog.size()*sizeof(TProcessedLog)];
1500         int *numDeterminants = (int *)listDeterminants;
1501         numDeterminants[0] = 0; 
1502         listDeterminants = (char *)&numDeterminants[1];
1503
1504         // setting the handler in the message
1505         CmiSetHandler(requestMsg,_removeProcessedLogHandlerIdx);
1506         
1507         DEBUG_MEM(CmiMemoryCheck());
1508         for(int i=0;i<CkNumPes();i++){
1509                 CmiSyncSend(i,totalSize,requestMsg);
1510         }
1511         CmiFree(requestMsg);
1512
1513         clearUpMigratedRetainedLists(CmiMyPe());
1514         //TODO: clear ticketTable
1515         
1516         traceUserBracketEvent(30,_startTime,CkWallTimer());
1517
1518         DEBUG_MEM(CmiMemoryCheck());
1519 }
1520
1521
1522 void _checkpointAckHandler(CheckPointAck *ackMsg){
1523         DEBUG_MEM(CmiMemoryCheck());
1524         unAckedCheckpoint=0;
1525         DEBUGLB(printf("[%d] CheckPoint Acked from PE %d with size %d onGoingLoadBalancing %d \n",CkMyPe(),ackMsg->PE,ackMsg->dataSize,onGoingLoadBalancing));
1526         DEBUGLB(CkPrintf("[%d] ACK HANDLER with %d\n",CkMyPe(),onGoingLoadBalancing));  
1527         if(onGoingLoadBalancing){
1528                 onGoingLoadBalancing = 0;
1529                 finishedCheckpointLoadBalancing();
1530         }else{
1531                 sendRemoveLogRequests();
1532         }
1533         CmiFree(ackMsg);
1534         
1535 };
1536
1537
1538 /**
1539  * @brief Inserts all the determinants into a hash table.
1540  */
1541 inline void populateDeterminantTable(char *data){
1542         DEBUG(char recverString[100]);
1543         DEBUG(char senderString[100]);
1544         int numDets, *numDetsPtr;
1545         Determinant *detList;
1546         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1547         SNToTicket *tickets;
1548         Ticket ticket;
1549
1550         RemoveLogRequest *request = (RemoveLogRequest *)data;
1551         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1552         
1553         numDetsPtr = (int *)&list[request->numberObjects];
1554         numDets = numDetsPtr[0];
1555         detList = (Determinant *)&numDetsPtr[1];
1556
1557         // inserting determinants into hashtable
1558         for(int i=0; i<numDets; i++){
1559                 table = detTable.get(detList[i].sender);
1560                 if(table == NULL){
1561                         table = new CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *>();
1562                         detTable.put(detList[i].sender) = table;
1563                 }
1564                 tickets = table->get(detList[i].receiver);
1565                 if(tickets == NULL){
1566                         tickets = new SNToTicket();
1567                         table->put(detList[i].receiver) = tickets; 
1568                 }
1569                 ticket.TN = detList[i].TN;
1570                 tickets->put(detList[i].SN) = ticket;
1571         }
1572
1573         DEBUG_MEM(CmiMemoryCheck());    
1574
1575 }
1576
1577 void removeProcessedLogs(void *_data,ChareMlogData *mlogData){
1578         int total;
1579         DEBUG(char nameString[100]);
1580         DEBUG_MEM(CmiMemoryCheck());
1581         CmiMemoryCheck();
1582         char *data = (char *)_data;
1583         RemoveLogRequest *request = (RemoveLogRequest *)data;
1584         TProcessedLog *list = (TProcessedLog *)(&data[sizeof(RemoveLogRequest)]);
1585         CkQ<MlogEntry *> *mlog = mlogData->getMlog();
1586         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> *table;
1587         SNToTicket *tickets;
1588         MCount TN;
1589
1590         int count=0;
1591         total = mlog->length();
1592         for(int i=0; i<total; i++){
1593                 MlogEntry *logEntry = mlog->deq();
1594
1595                 // looking message in determinant table
1596                 table = detTable.get(logEntry->env->sender);
1597                 if(table != NULL){
1598                         tickets = table->get(logEntry->env->recver);
1599                         if(tickets != NULL){
1600                                 TN = tickets->get(logEntry->env->SN).TN;
1601                                 if(TN != 0){
1602                                         logEntry->env->TN = TN;
1603                                 }
1604                         }
1605                 }
1606         
1607                 int match=0;
1608                 for(int j=0;j<request->numberObjects;j++){
1609                         if(logEntry->env == NULL || (logEntry->env->recver == list[j].recver && logEntry->env->TN > 0 && logEntry->env->TN < list[j].tProcessed)){
1610                                 //this log Entry should be removed
1611                                 match = 1;
1612                                 break;
1613                         }
1614                 }
1615                 char senderString[100],recverString[100];
1616 //              DEBUG(CkPrintf("[%d] Message sender %s recver %s TN %d removed %d PE %d\n",CkMyPe(),logEntry->env->sender.toString(senderString),logEntry->env->recver.toString(recverString),logEntry->env->TN,match,request->PE));
1617                 if(match){
1618                         count++;
1619                         delete logEntry;
1620                 }else{
1621                         mlog->enq(logEntry);
1622                 }
1623         }
1624         if(count > 0){
1625                 DEBUG(printf("[%d] Removed %d processed Logs for %s\n",CkMyPe(),count,mlogData->objID.toString(nameString)));
1626         }
1627         DEBUG_MEM(CmiMemoryCheck());
1628         CmiMemoryCheck();
1629 }
1630
1631 /**
1632  * @brief Removes messages in the log according to the received ticket numbers
1633  */
1634 void _removeProcessedLogHandler(char *requestMsg){
1635         double start = CkWallTimer();
1636
1637         // building map for determinants
1638         populateDeterminantTable(requestMsg);
1639
1640         // removing processed messages from the message log
1641         forAllCharesDo(removeProcessedLogs,requestMsg);
1642
1643         // @todo: clean determinant table 
1644         
1645         // printf("[%d] Removing Processed logs took %.6lf \n",CkMyPe(),CkWallTimer()-start);
1646         RemoveLogRequest *request = (RemoveLogRequest *)requestMsg;
1647         DEBUG(printf("[%d] Removing Processed logs for proc %d took %.6lf \n",CkMyPe(),request->PE,CkWallTimer()-start));
1648         //this assumes the buddy relationship between processors is symmetric. TODO:remove this assumption later
1649         /*
1650                 Clear up the retainedObjectList and the migratedNoticeList that were created during load balancing
1651         */
1652         DEBUG_MEM(CmiMemoryCheck());
1653         clearUpMigratedRetainedLists(request->PE);
1654         
1655         traceUserBracketEvent(20,start,CkWallTimer());
1656         CmiFree(requestMsg);    
1657 };
1658
1659
1660 void clearUpMigratedRetainedLists(int PE){
1661         int count=0;
1662         CmiMemoryCheck();
1663         
1664         for(int j=migratedNoticeList.size()-1;j>=0;j--){
1665                 if(migratedNoticeList[j].toPE == PE){
1666                         migratedNoticeList[j].ackTo = 1;
1667                 }
1668                 if(migratedNoticeList[j].ackFrom == 1 && migratedNoticeList[j].ackTo == 1){
1669                         migratedNoticeList.remove(j);
1670                         count++;
1671                 }
1672         }
1673         DEBUG(printf("[%d] For proc %d to number of migratedNoticeList cleared %d \n",CmiMyPe(),PE,count));
1674         
1675         for(int j=retainedObjectList.size()-1;j>=0;j--){
1676                 if(retainedObjectList[j]->migRecord.toPE == PE){
1677                         RetainedMigratedObject *obj = retainedObjectList[j];
1678                         DEBUG(printf("[%d] Clearing retainedObjectList %d to PE %d obj %p msg %p\n",CmiMyPe(),j,PE,obj,obj->msg));
1679                         retainedObjectList.remove(j);
1680                         if(obj->msg != NULL){
1681                                 CmiMemoryCheck();
1682                                 CmiFree(obj->msg);
1683                         }
1684                         delete obj;
1685                 }
1686         }
1687 }
1688
1689 /***************************************************************
1690         Restart Methods and handlers
1691 ***************************************************************/        
1692
1693 /**
1694  * Function for restarting the crashed processor.
1695  * It sets the restart flag and contacts the buddy
1696  * processor to get the latest checkpoint.
1697  */
1698 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg){
1699         RestartRequest msg;
1700
1701         fprintf(stderr,"[%d] Restart started at %.6lf \n",CkMyPe(),CmiWallTimer());
1702
1703         // setting the restart flag
1704         _restartFlag = 1;
1705         _numRestartResponses = 0;
1706
1707         // if we are using team-based message logging, all members of the group have to be restarted
1708         if(teamSize > 1){
1709                 for(int i=(CkMyPe()/teamSize)*teamSize; i<((CkMyPe()/teamSize)+1)*teamSize; i++){
1710                         if(i != CkMyPe() && i < CkNumPes()){
1711                                 // sending a message to the team member
1712                                 msg.PE = CkMyPe();
1713                             CmiSetHandler(&msg,_restartHandlerIdx);
1714                             CmiSyncSend(i,sizeof(RestartRequest),(char *)&msg);
1715                         }
1716                 }
1717         }
1718
1719         // requesting the latest checkpoint from its buddy
1720         msg.PE = CkMyPe();
1721         CmiSetHandler(&msg,_getCheckpointHandlerIdx);
1722         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1723 };
1724
1725 /**
1726  * Function to restart this processor.
1727  * The handler is invoked by a member of its same team in message logging.
1728  */
1729 void _restartHandler(RestartRequest *restartMsg){
1730         int i;
1731         int numGroups = CkpvAccess(_groupIDTable)->size();
1732         RestartRequest msg;
1733         
1734         fprintf(stderr,"[%d] Restart-team started at %.6lf \n",CkMyPe(),CmiWallTimer());
1735
1736     // setting the restart flag
1737         _restartFlag = 1;
1738         _numRestartResponses = 0;
1739
1740         // flushing all buffers
1741         //TEST END
1742 /*      CkPrintf("[%d] HERE numGroups = %d\n",CkMyPe(),numGroups);
1743         CKLOCMGR_LOOP(mgr->flushAllRecs(););    
1744         for(int i=0;i<numGroups;i++){
1745         CkGroupID gID = (*CkpvAccess(_groupIDTable))[i];
1746                 IrrGroup *obj = CkpvAccess(_groupTable)->find(gID).getObj();
1747                 obj->flushStates();
1748                 obj->ckJustMigrated();
1749         }*/
1750
1751     // requesting the latest checkpoint from its buddy
1752         msg.PE = CkMyPe();
1753         CmiSetHandler(&msg,_getRestartCheckpointHandlerIdx);
1754         CmiSyncSend(getCheckPointPE(),sizeof(RestartRequest),(char *)&msg);
1755 }
1756
1757
1758 /**
1759  * Gets the stored checkpoint but calls another function in the sender.
1760  */
1761 void _getRestartCheckpointHandler(RestartRequest *restartMsg){
1762
1763         // retrieving the stored checkpoint
1764         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1765
1766         // making sure it is its buddy who is requesting the checkpoint
1767         CkAssert(restartMsg->PE == storedChkpt->PE);
1768
1769         storedRequest = restartMsg;
1770         verifyAckTotal = 0;
1771
1772         for(int i=0;i<migratedNoticeList.size();i++){
1773                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1774 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1775                         if(migratedNoticeList[i].ackFrom == 0){
1776                                 //need to verify if the object actually exists .. it might not
1777                                 //have been acked but it might exist on it
1778                                 VerifyAckMsg msg;
1779                                 msg.migRecord = migratedNoticeList[i];
1780                                 msg.index = i;
1781                                 msg.fromPE = CmiMyPe();
1782                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1783                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1784                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1785                                 verifyAckTotal++;
1786                         }
1787                 }
1788         }
1789
1790         // sending the checkpoint back to its buddy     
1791         if(verifyAckTotal == 0){
1792                 sendCheckpointData(MLOG_RESTARTED);
1793         }
1794         verifyAckCount = 0;
1795 }
1796
1797 /**
1798  * Receives the checkpoint coming from its buddy. This is the case of restart for one team member that did not crash.
1799  */
1800 void _recvRestartCheckpointHandler(char *_restartData){
1801         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
1802         MigrationRecord *migratedAwayElements;
1803
1804         globalLBID = restartData->lbGroupID;
1805         
1806         restartData->restartWallTime *= 1000;
1807         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
1808         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
1809         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
1810
1811         
1812         printf("[%d] Team Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
1813         char *buf = &_restartData[sizeof(RestartProcessorData)];
1814         
1815         if(restartData->numMigratedAwayElements != 0){
1816                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
1817                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
1818                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
1819                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
1820         }
1821
1822         // turning on the team recovery flag
1823         forAllCharesDo(setTeamRecovery,NULL);
1824         
1825         PUP::fromMem pBuf(buf);
1826         pBuf | checkpointCount;
1827         for(int i=0; i<CmiNumPes(); i++){
1828                 pBuf | CpvAccess(_incarnation)[i];
1829         }
1830         CkPupROData(pBuf);
1831         CkPupGroupData(pBuf,CmiFalse);
1832         CkPupNodeGroupData(pBuf,CmiFalse);
1833         pupArrayElementsSkip(pBuf,CmiFalse,NULL);
1834         CkAssert(pBuf.size() == restartData->checkPointSize);
1835         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
1836
1837         // increasing the incarnation number of all the team members
1838         for(int i=(CmiMyPe()/teamSize)*teamSize; i<(CmiMyPe()/teamSize+1)*(teamSize); i++){
1839                 CpvAccess(_incarnation)[i]++;
1840         }
1841         
1842         // turning off the team recovery flag
1843         forAllCharesDo(unsetTeamRecovery,NULL);
1844
1845         // initializing a few variables for handling local messages
1846         forAllCharesDo(initializeRestart,NULL);
1847         
1848         CmiFree(_restartData);  
1849
1850         /*HERE _initDone();
1851
1852         getGlobalStep(globalLBID);
1853         
1854         countUpdateHomeAcks = 0;
1855         RestartRequest updateHomeRequest;
1856         updateHomeRequest.PE = CmiMyPe();
1857         CmiSetHandler (&updateHomeRequest,_updateHomeRequestHandlerIdx);
1858         for(int i=0;i<CmiNumPes();i++){
1859                 if(i != CmiMyPe()){
1860                         CmiSyncSend(i,sizeof(RestartRequest),(char *)&updateHomeRequest);
1861                 }
1862         }
1863 */
1864
1865
1866         // Send out the request to resend logged messages to all other processors
1867         CkVec<TProcessedLog> objectVec;
1868         forAllCharesDo(createObjIDList, (void *)&objectVec);
1869         int numberObjects = objectVec.size();
1870         
1871         /*
1872                 resendMsg layout |ResendRequest|Array of TProcessedLog|
1873         */
1874         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
1875         char *resendMsg = (char *)CmiAlloc(totalSize);  
1876
1877         ResendRequest *resendReq = (ResendRequest *)resendMsg;
1878         resendReq->PE =CkMyPe(); 
1879         resendReq->numberObjects = numberObjects;
1880         char *objList = &resendMsg[sizeof(ResendRequest)];
1881         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog));
1882         
1883
1884         /* test for parallel restart migrate away object**/
1885 //      if(fastRecovery){
1886 //              distributeRestartedObjects();
1887 //              printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
1888 //      }
1889         
1890         /*      To make restart work for load balancing.. should only
1891         be used when checkpoint happens along with load balancing
1892         **/
1893 //      forAllCharesDo(resumeFromSyncRestart,NULL);
1894
1895         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
1896         CpvAccess(_currentObj) = lb;
1897         lb->ReceiveDummyMigration(restartDecisionNumber);
1898
1899         sleep(10);
1900         
1901         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
1902         for(int i=0;i<CkNumPes();i++){
1903                 if(i != CkMyPe()){
1904                         CmiSyncSend(i,totalSize,resendMsg);
1905                 }       
1906         }
1907         _resendMessagesHandler(resendMsg);
1908
1909 }
1910
1911
1912 void CkMlogRestartDouble(void *,double){
1913         CkMlogRestart(NULL,NULL);
1914 };
1915
1916 //TML: restarting from local (group) failure
1917 void CkMlogRestartLocal(){
1918     CkMlogRestart(NULL,NULL);
1919 };
1920
1921 /**
1922  * Gets the stored checkpoint for its buddy processor.
1923  */
1924 void _getCheckpointHandler(RestartRequest *restartMsg){
1925
1926         // retrieving the stored checkpoint
1927         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
1928
1929         // making sure it is its buddy who is requesting the checkpoint
1930         CkAssert(restartMsg->PE == storedChkpt->PE);
1931
1932         storedRequest = restartMsg;
1933         verifyAckTotal = 0;
1934
1935         for(int i=0;i<migratedNoticeList.size();i++){
1936                 if(migratedNoticeList[i].fromPE == restartMsg->PE){
1937 //                      if(migratedNoticeList[i].ackFrom == 0 && migratedNoticeList[i].ackTo == 0){
1938                         if(migratedNoticeList[i].ackFrom == 0){
1939                                 //need to verify if the object actually exists .. it might not
1940                                 //have been acked but it might exist on it
1941                                 VerifyAckMsg msg;
1942                                 msg.migRecord = migratedNoticeList[i];
1943                                 msg.index = i;
1944                                 msg.fromPE = CmiMyPe();
1945                                 CmiPrintf("[%d] Verify  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[i].gID.idx,idx2str(migratedNoticeList[i].idx),migratedNoticeList[i].toPE);
1946                                 CmiSetHandler(&msg,_verifyAckRequestHandlerIdx);
1947                                 CmiSyncSend(migratedNoticeList[i].toPE,sizeof(VerifyAckMsg),(char *)&msg);
1948                                 verifyAckTotal++;
1949                         }
1950                 }
1951         }
1952
1953         // sending the checkpoint back to its buddy     
1954         if(verifyAckTotal == 0){
1955                 sendCheckpointData(MLOG_CRASHED);
1956         }
1957         verifyAckCount = 0;
1958 }
1959
1960
1961 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest){
1962         CkLocMgr *locMgr =  (CkLocMgr*)CkpvAccess(_groupTable)->find(verifyRequest->migRecord.gID).getObj();
1963         CkLocRec *rec = locMgr->elementNrec(verifyRequest->migRecord.idx);
1964         if(rec != NULL && rec->type() == CkLocRec::local){
1965                         //this location exists on this processor
1966                         //and needs to be removed       
1967                         CkLocRec_local *localRec = (CkLocRec_local *) rec;
1968                         CmiPrintf("[%d] Found element gid %d idx %s that needs to be removed\n",CmiMyPe(),verifyRequest->migRecord.gID.idx,idx2str(verifyRequest->migRecord.idx));
1969                         
1970                         int localIdx = localRec->getLocalIndex();
1971                         LBDatabase *lbdb = localRec->getLBDB();
1972                         LDObjHandle ldHandle = localRec->getLdHandle();
1973                                 
1974                         locMgr->setDuringMigration(true);
1975                         
1976                         locMgr->reclaim(verifyRequest->migRecord.idx,localIdx);
1977                         lbdb->UnregisterObj(ldHandle);
1978                         
1979                         locMgr->setDuringMigration(false);
1980                         
1981                         verifyAckedRequests++;
1982
1983         }
1984         CmiSetHandler(verifyRequest, _verifyAckHandlerIdx);
1985         CmiSyncSendAndFree(verifyRequest->fromPE,sizeof(VerifyAckMsg),(char *)verifyRequest);
1986 };
1987
1988
1989 void _verifyAckHandler(VerifyAckMsg *verifyReply){
1990         int index =     verifyReply->index;
1991         migratedNoticeList[index] = verifyReply->migRecord;
1992         verifyAckCount++;
1993         CmiPrintf("[%d] VerifyReply received %d for  gid %d idx %s from proc %d\n",CmiMyPe(),migratedNoticeList[index].ackTo, migratedNoticeList[index].gID,idx2str(migratedNoticeList[index].idx),migratedNoticeList[index].toPE);
1994         if(verifyAckCount == verifyAckTotal){
1995                 sendCheckpointData(MLOG_CRASHED);
1996         }
1997 }
1998
1999
2000 /**
2001  * Sends the checkpoint to its buddy. The mode distinguishes between the two cases:
2002  * MLOG_RESTARTED: sending the checkpoint to a team member that did not crash but is restarting.
2003  * MLOG_CRASHED: sending the checkpoint to the processor that crashed.
2004  */
2005 void sendCheckpointData(int mode){      
2006         RestartRequest *restartMsg = storedRequest;
2007         StoredCheckpoint *storedChkpt = CpvAccess(_storedCheckpointData);
2008         int numMigratedAwayElements = migratedNoticeList.size();
2009         if(migratedNoticeList.size() != 0){
2010                         printf("[%d] size of migratedNoticeList %d\n",CmiMyPe(),migratedNoticeList.size());
2011 //                      CkAssert(migratedNoticeList.size() == 0);
2012         }
2013         
2014         
2015         int totalSize = sizeof(RestartProcessorData)+storedChkpt->bufSize;
2016         
2017         DEBUG_RESTART(CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);)
2018         CkPrintf("[%d] Sending out checkpoint for processor %d size %d \n",CkMyPe(),restartMsg->PE,totalSize);
2019         
2020         totalSize += numMigratedAwayElements*sizeof(MigrationRecord);
2021         
2022         char *msg = (char *)CmiAlloc(totalSize);
2023         
2024         RestartProcessorData *dataMsg = (RestartProcessorData *)msg;
2025         dataMsg->PE = CkMyPe();
2026         dataMsg->restartWallTime = CmiTimer();
2027         dataMsg->checkPointSize = storedChkpt->bufSize;
2028         
2029         dataMsg->numMigratedAwayElements = numMigratedAwayElements;
2030 //      dataMsg->numMigratedAwayElements = 0;
2031         
2032         dataMsg->numMigratedInElements = 0;
2033         dataMsg->migratedElementSize = 0;
2034         dataMsg->lbGroupID = globalLBID;
2035         /*msg layout 
2036                 |RestartProcessorData|List of Migrated Away ObjIDs|CheckpointData|CheckPointData for objects migrated in|
2037                 Local MessageLog|
2038         */
2039         //store checkpoint data
2040         char *buf = &msg[sizeof(RestartProcessorData)];
2041
2042         if(dataMsg->numMigratedAwayElements != 0){
2043                 memcpy(buf,migratedNoticeList.getVec(),migratedNoticeList.size()*sizeof(MigrationRecord));
2044                 buf = &buf[migratedNoticeList.size()*sizeof(MigrationRecord)];
2045         }
2046         
2047
2048         memcpy(buf,storedChkpt->buf,storedChkpt->bufSize);
2049         buf = &buf[storedChkpt->bufSize];
2050
2051         if(mode == MLOG_RESTARTED){
2052                 CmiSetHandler(msg,_recvRestartCheckpointHandlerIdx);
2053                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2054                 CmiFree(restartMsg);
2055         }else{
2056                 CmiSetHandler(msg,_recvCheckpointHandlerIdx);
2057                 CmiSyncSendAndFree(restartMsg->PE,totalSize,msg);
2058                 CmiFree(restartMsg);
2059         }
2060 };
2061
2062
2063 // this list is used to create a vector of the object ids of all
2064 //the chares on this processor currently and the highest TN processed by them 
2065 //the first argument is actually a CkVec<TProcessedLog> *
2066 void createObjIDList(void *data,ChareMlogData *mlogData){
2067         CkVec<TProcessedLog> *list = (CkVec<TProcessedLog> *)data;
2068         TProcessedLog entry;
2069         entry.recver = mlogData->objID;
2070         entry.tProcessed = mlogData->tProcessed;
2071         list->push_back(entry);
2072         DEBUG_TEAM(char objString[100]);
2073         DEBUG_TEAM(CkPrintf("[%d] %s restored with tProcessed set to %d \n",CkMyPe(),mlogData->objID.toString(objString),mlogData->tProcessed));
2074         DEBUG_RECOVERY(printLog(&entry));
2075 }
2076
2077
2078 /**
2079  * Receives the checkpoint data from its buddy, restores the state of all the objects
2080  * and asks everyone else to update its home.
2081  */
2082 void _recvCheckpointHandler(char *_restartData){
2083         RestartProcessorData *restartData = (RestartProcessorData *)_restartData;
2084         MigrationRecord *migratedAwayElements;
2085
2086         globalLBID = restartData->lbGroupID;
2087         
2088         restartData->restartWallTime *= 1000;
2089         adjustChkptPeriod = restartData->restartWallTime/(double) chkptPeriod - floor(restartData->restartWallTime/(double) chkptPeriod);
2090         adjustChkptPeriod = (double )chkptPeriod*(adjustChkptPeriod);
2091         if(adjustChkptPeriod < 0) adjustChkptPeriod = 0;
2092
2093         
2094         printf("[%d] Restart Checkpointdata received from PE %d at %.6lf with checkpointSize %d\n",CkMyPe(),restartData->PE,CmiWallTimer(),restartData->checkPointSize);
2095         char *buf = &_restartData[sizeof(RestartProcessorData)];
2096         
2097         if(restartData->numMigratedAwayElements != 0){
2098                 migratedAwayElements = new MigrationRecord[restartData->numMigratedAwayElements];
2099                 memcpy(migratedAwayElements,buf,restartData->numMigratedAwayElements*sizeof(MigrationRecord));
2100                 printf("[%d] Number of migratedaway elements %d\n",CmiMyPe(),restartData->numMigratedAwayElements);
2101                 buf = &buf[restartData->numMigratedAwayElements*sizeof(MigrationRecord)];
2102         }
2103         
2104         PUP::fromMem pBuf(buf);
2105
2106         pBuf | checkpointCount;
2107         for(int i=0; i<CmiNumPes(); i++){
2108                 pBuf | CpvAccess(_incarnation)[i];
2109         }
2110         CkPupROData(pBuf);
2111         CkPupGroupData(pBuf,CmiTrue);
2112         CkPupNodeGroupData(pBuf,CmiTrue);
2113         pupArrayElementsSkip(pBuf,CmiTrue,NULL);
2114         CkAssert(pBuf.size() == restartData->checkPointSize);
2115         printf("[%d] Restart Objects created from CheckPointData at %.6lf \n",CkMyPe(),CmiWallTimer());
2116
2117         // increases the incarnation number
2118         CpvAccess(_incarnation)[CmiMyPe()]++;
2119         
2120         forAllCharesDo(initializeRestart,NULL);
2121         
2122         CmiFree(_restartData);
2123         
2124         _initDone();
2125
2126         getGlobalStep(globalLBID);
2127
2128         // sending request to send determinants
2129         _numRestartResponses = 0;
2130         
2131         // Send out the request to resend logged determinants to all other processors
2132         CkVec<TProcessedLog> objectVec;
2133         forAllCharesDo(createObjIDList, (void *)&objectVec);
2134         int numberObjects = objectVec.size();
2135         
2136         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2137         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2138         char *resendMsg = (char *)CmiAlloc(totalSize);  
2139
2140         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2141         resendReq->PE =CkMyPe(); 
2142         resendReq->numberObjects = numberObjects;
2143         char *objList = &resendMsg[sizeof(ResendRequest)];
2144         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2145
2146         CmiSetHandler(resendMsg,_sendDetsHandlerIdx);
2147         for(int i=0;i<CkNumPes();i++){
2148                 if(i != CkMyPe()){
2149                         CmiSyncSend(i,totalSize,resendMsg);
2150                 }
2151         }
2152         CmiFree(resendMsg);
2153         
2154 }
2155
2156 /**
2157  * Receives the updateHome ACKs from all other processors. Once everybody
2158  * has replied, it sends a request to resend the logged messages.
2159  */
2160 void _updateHomeAckHandler(RestartRequest *updateHomeAck){
2161         countUpdateHomeAcks++;
2162         CmiFree(updateHomeAck);
2163         // one is from the recvglobal step handler .. it is a dummy updatehomeackhandler
2164         if(countUpdateHomeAcks != CmiNumPes()){
2165                 return;
2166         }
2167
2168         // Send out the request to resend logged messages to all other processors
2169         CkVec<TProcessedLog> objectVec;
2170         forAllCharesDo(createObjIDList, (void *)&objectVec);
2171         int numberObjects = objectVec.size();
2172         
2173         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2174         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2175         char *resendMsg = (char *)CmiAlloc(totalSize);  
2176
2177         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2178         resendReq->PE =CkMyPe(); 
2179         resendReq->numberObjects = numberObjects;
2180         char *objList = &resendMsg[sizeof(ResendRequest)];
2181         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2182
2183         /* test for parallel restart migrate away object**/
2184         if(fastRecovery){
2185                 distributeRestartedObjects();
2186                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2187         }
2188         
2189         /*      To make restart work for load balancing.. should only
2190         be used when checkpoint happens along with load balancing
2191         **/
2192 //      forAllCharesDo(resumeFromSyncRestart,NULL);
2193
2194         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2195         CpvAccess(_currentObj) = lb;
2196         lb->ReceiveDummyMigration(restartDecisionNumber);
2197
2198 //HERE  sleep(10);
2199         
2200         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2201         for(int i=0;i<CkNumPes();i++){
2202                 if(i != CkMyPe()){
2203                         CmiSyncSend(i,totalSize,resendMsg);
2204                 }
2205         }
2206         _resendMessagesHandler(resendMsg);
2207         CmiFree(resendMsg);
2208
2209 };
2210
2211 /**
2212  * @brief Initializes variables and flags for restarting procedure.
2213  */
2214 void initializeRestart(void *data, ChareMlogData *mlogData){
2215         mlogData->resendReplyRecvd = 0;
2216         mlogData->receivedTNs = new CkVec<MCount>;
2217         mlogData->restartFlag = 1;
2218 };
2219
2220 /**
2221  * Updates the homePe of chare array elements.
2222  */
2223 void updateHomePE(void *data,ChareMlogData *mlogData){
2224         RestartRequest *updateRequest = (RestartRequest *)data;
2225         int PE = updateRequest->PE; //restarted PE
2226         //if this object is an array Element and its home is the restarted processor
2227         // the home processor needs to know its current location
2228         if(mlogData->objID.type == TypeArray){
2229                 //it is an array element
2230                 CkGroupID myGID = mlogData->objID.data.array.id;
2231                 CkArrayIndexMax myIdx =  mlogData->objID.data.array.idx.asChild();
2232                 CkArrayID aid(mlogData->objID.data.array.id);           
2233                 //check if the restarted processor is the home processor for this object
2234                 CkLocMgr *locMgr = aid.ckLocalBranch()->getLocMgr();
2235                 if(locMgr->homePe(myIdx) == PE){
2236                         DEBUG_RESTART(printf("[%d] Tell %d of current location of array element",CkMyPe(),PE));
2237                         DEBUG_RESTART(myIdx.print());
2238                         informLocationHome(locMgr->getGroupID(),myIdx,PE,CkMyPe());
2239                 }
2240         }
2241 };
2242
2243
2244 /**
2245  * Updates the homePe for all chares in this processor.
2246  */
2247 void _updateHomeRequestHandler(RestartRequest *updateRequest){
2248         int sender = updateRequest->PE;
2249         
2250         forAllCharesDo(updateHomePE,updateRequest);
2251         
2252         updateRequest->PE = CmiMyPe();
2253         CmiSetHandler(updateRequest,_updateHomeAckHandlerIdx);
2254         CmiSyncSendAndFree(sender,sizeof(RestartRequest),(char *)updateRequest);
2255         if(sender == getCheckPointPE() && unAckedCheckpoint==1){
2256                 CmiPrintf("[%d] Crashed processor did not ack so need to checkpoint again\n",CmiMyPe());
2257                 checkpointCount--;
2258                 startMlogCheckpoint(NULL,0);
2259         }
2260         if(sender == getCheckPointPE()){
2261                 for(int i=0;i<retainedObjectList.size();i++){
2262                         if(retainedObjectList[i]->acked == 0){
2263                                 MigrationNotice migMsg;
2264                                 migMsg.migRecord = retainedObjectList[i]->migRecord;
2265                                 migMsg.record = retainedObjectList[i];
2266                                 CmiSetHandler((void *)&migMsg,_receiveMigrationNoticeHandlerIdx);
2267                                 CmiSyncSend(getCheckPointPE(),sizeof(migMsg),(char *)&migMsg);
2268                         }
2269                 }
2270         }
2271 }
2272
2273 /**
2274  * @brief Fills up the ticket vector for each chare.
2275  */
2276 void fillTicketForChare(void *data, ChareMlogData *mlogData){
2277         DEBUG(char name[100]);
2278         ResendData *resendData = (ResendData *)data;
2279         int PE = resendData->PE; //restarted PE
2280         int count=0;
2281         CkHashtableIterator *iterator;
2282         void *objp;
2283         void *objkey;
2284         CkObjID *objID;
2285         SNToTicket *snToTicket;
2286         Ticket ticket;
2287         
2288         // traversing the team table looking up for the maximum TN received     
2289         iterator = mlogData->teamTable.iterator();
2290         while( (objp = iterator->next(&objkey)) != NULL ){
2291                 objID = (CkObjID *)objkey;
2292         
2293                 // traversing the resendData structure to add ticket numbers
2294                 for(int j=0;j<resendData->numberObjects;j++){
2295                         if((*objID) == (resendData->listObjects)[j].recver){
2296                                 snToTicket = *(SNToTicket **)objp;
2297 //CkPrintf("[%d] ---> Traversing the resendData for %s start=%u finish=%u \n",CkMyPe(),objID->toString(name),snToTicket->getStartSN(),snToTicket->getFinishSN());
2298                                 for(MCount snIndex=snToTicket->getStartSN(); snIndex<=snToTicket->getFinishSN(); snIndex++){
2299                                         ticket = snToTicket->get(snIndex);      
2300                                 if(ticket.TN >= (resendData->listObjects)[j].tProcessed){
2301                                                 //store the TNs that have been since the recver last checkpointed
2302                                                 resendData->ticketVecs[j].push_back(ticket.TN);
2303                                         }
2304                                 }
2305                         }
2306                 }
2307         }
2308
2309         //releasing the memory for the iterator
2310         delete iterator;
2311 }
2312
2313
2314 /**
2315  * @brief Turns on the flag for team recovery that selectively restores
2316  * particular metadata information.
2317  */
2318 void setTeamRecovery(void *data, ChareMlogData *mlogData){
2319         char name[100];
2320         mlogData->teamRecoveryFlag = 1; 
2321 }
2322
2323 /**
2324  * @brief Turns off the flag for team recovery.
2325  */
2326 void unsetTeamRecovery(void *data, ChareMlogData *mlogData){
2327         mlogData->teamRecoveryFlag = 0;
2328 }
2329
2330 /**
2331  * Prints a processed log.
2332  */
2333 void printLog(TProcessedLog *log){
2334         char recverString[100];
2335         CkPrintf("[RECOVERY] [%d] OBJECT=\"%s\" TN=%d\n",CkMyPe(),log->recver.toString(recverString),log->tProcessed);
2336 }
2337
2338 /**
2339  * Prints information about a message.
2340  */
2341 void printMsg(envelope *env, const char* par){
2342         char senderString[100];
2343         char recverString[100];
2344         CkPrintf("[RECOVERY] [%d] MSG-%s FROM=\"%s\" TO=\"%s\" SN=%d\n",CkMyPe(),par,env->sender.toString(senderString),env->recver.toString(recverString),env->SN);
2345 }
2346
2347 /**
2348  * Prints information about a determinant.
2349  */
2350 void printDet(Determinant *det, const char* par){
2351         char senderString[100];
2352         char recverString[100];
2353         CkPrintf("[RECOVERY] [%d] DET-%s FROM=\"%s\" TO=\"%s\" SN=%d TN=%d\n",CkMyPe(),par,det->sender.toString(senderString),det->receiver.toString(recverString),det->SN,det->TN);
2354 }
2355
2356 /**
2357  * @brief Resends all the logged messages to a particular chare list.
2358  * @param data is of type ResendData which contains the array of objects on  the restartedProcessor.
2359  * @param mlogData a particular chare living in this processor.
2360  */
2361 void resendMessageForChare(void *data, ChareMlogData *mlogData){
2362         DEBUG_RESTART(char nameString[100]);
2363         DEBUG_RESTART(char recverString[100]);
2364         DEBUG_RESTART(char senderString[100]);
2365
2366         ResendData *resendData = (ResendData *)data;
2367         int PE = resendData->PE; //restarted PE
2368         int count=0;
2369         int ticketRequests=0;
2370         CkQ<MlogEntry *> *log = mlogData->getMlog();
2371
2372         DEBUG_RESTART(printf("[%d] Resend message from %s to processor %d \n",CkMyPe(),mlogData->objID.toString(nameString),PE);)
2373
2374         // traversing the message log to see if we must resend a message        
2375         for(int i=0;i<log->length();i++){
2376                 MlogEntry *logEntry = (*log)[i];
2377                 
2378                 // if we sent out the logs of a local message to buddy and it crashed
2379                 //before acknowledging 
2380                 envelope *env = logEntry->env;
2381                 if(env == NULL){
2382                         continue;
2383                 }
2384         
2385                 // resend if type is not invalid        
2386                 if(env->recver.type != TypeInvalid){
2387                         for(int j=0;j<resendData->numberObjects;j++){
2388                                 if(env->recver == (resendData->listObjects)[j].recver){
2389                                         if(PE != CkMyPe()){
2390                                                 DEBUG_RECOVERY(printMsg(env,RECOVERY_SEND));
2391                                                 if(env->recver.type == TypeNodeGroup){
2392                                                         CmiSyncNodeSend(PE,env->getTotalsize(),(char *)env);
2393                                                 }else{
2394                                                         CmiSetHandler(env,CmiGetXHandler(env));
2395                                                         CmiSyncSend(PE,env->getTotalsize(),(char *)env);
2396                                                 }
2397                                         }else{
2398                                                 envelope *copyEnv = copyEnvelope(env);
2399                                                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),copyEnv, copyEnv->getQueueing(),copyEnv->getPriobits(),(unsigned int *)copyEnv->getPrioPtr());
2400                                         }
2401                                         DEBUG_RESTART(printf("[%d] Resent message sender %s recver %s SN %d TN %d \n",CkMyPe(),env->sender.toString(senderString),env->recver.toString(nameString),env->SN,env->TN));
2402                                         count++;
2403                                 }
2404                         }//end of for loop of objects
2405                         
2406                 }       
2407         }
2408         DEBUG_RESTART(printf("[%d] Resent  %d/%d (%d) messages  from %s to processor %d \n",CkMyPe(),count,log->length(),ticketRequests,mlogData->objID.toString(nameString),PE);)      
2409 }
2410
2411 /**
2412  * Send all remote determinants to a particular failed PE.
2413  * It only sends determinants to those objects on the list.
2414  */
2415 void _sendDetsHandler(char *msg){
2416         ResendData d;
2417         CkVec<Determinant> *detVec;
2418         ResendRequest *resendReq = (ResendRequest *)msg;
2419
2420         // CkPrintf("[%d] Sending determinants\n",CkMyPe());
2421
2422         // building the reply message
2423         char *listObjects = &msg[sizeof(ResendRequest)];
2424         d.numberObjects = resendReq->numberObjects;
2425         d.PE = resendReq->PE;
2426         d.listObjects = (TProcessedLog *)listObjects;
2427         d.ticketVecs = new CkVec<MCount>[d.numberObjects];
2428         detVec = new CkVec<Determinant>[d.numberObjects];
2429
2430         // adding the remote determinants to the resendReplyMsg
2431         // traversing all the remote determinants
2432         CkVec<Determinant> *vec;
2433         for(int i=0; i<d.numberObjects; i++){
2434                 vec = CpvAccess(_remoteDets)->get(d.listObjects[i].recver);
2435                 if(vec != NULL){
2436                         for(int j=0; j<vec->size(); j++){
2437
2438                                 // only relevant determinants are to be sent
2439                                 if((*vec)[j].TN > d.listObjects[i].tProcessed){
2440
2441                                         // adding the ticket in the ticket vector
2442                                         d.ticketVecs[i].push_back((*vec)[j].TN);
2443
2444                                         // adding the determinant in the determinant vector
2445                                         detVec[i].push_back((*vec)[j]);
2446
2447                                         DEBUG_RECOVERY(printDet(&(*vec)[j],RECOVERY_SEND));
2448                                 }
2449                         }
2450                 }
2451         }
2452
2453         int totalDetStored = 0;
2454         for(int i=0;i<d.numberObjects;i++){
2455                 totalDetStored += detVec[i].size();
2456         }
2457
2458         //send back the maximum ticket number for a message sent to each object on the 
2459         //restarted processor
2460         //Message: |ResendRequest|List of CkObjIDs|List<#number of objects in vec,TN of tickets seen>|
2461         
2462         int totalTNStored=0;
2463         for(int i=0;i<d.numberObjects;i++){
2464                 totalTNStored += d.ticketVecs[i].size();
2465         }
2466         
2467         int totalSize = sizeof(ResendRequest) + d.numberObjects*(sizeof(CkObjID)+sizeof(int)+sizeof(int)) + totalTNStored*sizeof(MCount) + totalDetStored * sizeof(Determinant);
2468         char *resendReplyMsg = (char *)CmiAlloc(totalSize);
2469         
2470         ResendRequest *resendReply = (ResendRequest *)resendReplyMsg;
2471         resendReply->PE = CkMyPe();
2472         resendReply->numberObjects = d.numberObjects;
2473         
2474         char *replyListObjects = &resendReplyMsg[sizeof(ResendRequest)];
2475         CkObjID *replyObjects = (CkObjID *)replyListObjects;
2476         for(int i=0;i<d.numberObjects;i++){
2477                 replyObjects[i] = d.listObjects[i].recver;
2478         }
2479         
2480         char *ticketList = &replyListObjects[sizeof(CkObjID)*d.numberObjects];
2481         for(int i=0;i<d.numberObjects;i++){
2482                 int vecsize = d.ticketVecs[i].size();
2483                 memcpy(ticketList,&vecsize,sizeof(int));
2484                 ticketList = &ticketList[sizeof(int)];
2485                 memcpy(ticketList,d.ticketVecs[i].getVec(),sizeof(MCount)*vecsize);
2486                 ticketList = &ticketList[sizeof(MCount)*vecsize];
2487         }
2488
2489         // adding the stored remote determinants to the message
2490         for(int i=0;i<d.numberObjects;i++){
2491                 int vecsize = detVec[i].size();
2492                 memcpy(ticketList,&vecsize,sizeof(int));
2493                 ticketList = &ticketList[sizeof(int)];
2494                 memcpy(ticketList,detVec[i].getVec(),sizeof(Determinant)*vecsize);
2495                 ticketList = &ticketList[sizeof(Determinant)*vecsize];
2496         }
2497
2498         CmiSetHandler(resendReplyMsg,_sendDetsReplyHandlerIdx);
2499         CmiSyncSendAndFree(d.PE,totalSize,(char *)resendReplyMsg);
2500
2501         delete [] detVec;
2502         delete [] d.ticketVecs;
2503
2504         DEBUG_MEM(CmiMemoryCheck());
2505
2506         if(resendReq->PE != CkMyPe()){
2507                 CmiFree(msg);
2508         }       
2509 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2510         lastRestart = CmiWallTimer();
2511
2512 }
2513
2514 /**
2515  * Resends messages since last checkpoint to the list of objects included in the 
2516  * request. It also sends stored remote determinants to the particular failed PE.
2517  */
2518 void _resendMessagesHandler(char *msg){
2519         ResendData d;
2520         ResendRequest *resendReq = (ResendRequest *)msg;
2521
2522         //CkPrintf("[%d] Resending messages\n",CkMyPe());
2523
2524         // building the reply message
2525         char *listObjects = &msg[sizeof(ResendRequest)];
2526         d.numberObjects = resendReq->numberObjects;
2527         d.PE = resendReq->PE;
2528         d.listObjects = (TProcessedLog *)listObjects;
2529         
2530         DEBUG(printf("[%d] Received request to Resend Messages to processor %d numberObjects %d at %.6lf\n",CkMyPe(),resendReq->PE,resendReq->numberObjects,CmiWallTimer()));
2531
2532         //TML: examines the origin processor to determine if it belongs to the same group.
2533         // In that case, it only returns the maximum ticket received for each object in the list.
2534         if(isTeamLocal(resendReq->PE) && CkMyPe() != resendReq->PE)
2535                 forAllCharesDo(fillTicketForChare,&d);
2536         else
2537                 forAllCharesDo(resendMessageForChare,&d);
2538
2539         DEBUG_MEM(CmiMemoryCheck());
2540
2541         if(resendReq->PE != CkMyPe()){
2542                 CmiFree(msg);
2543         }       
2544 //      CmiPrintf("[%d] End of resend Request \n",CmiMyPe());
2545         lastRestart = CmiWallTimer();
2546 }
2547
2548 MCount maxVec(CkVec<MCount> *TNvec);
2549 void sortVec(CkVec<MCount> *TNvec);
2550 int searchVec(CkVec<MCount> *TNVec,MCount searchTN);
2551
2552 /**
2553  * @brief Processes the messages in the delayed remote message queue
2554  */
2555 void processDelayedRemoteMsgQueue(){
2556         DEBUG(printf("[%d] Processing delayed remote messages\n",CkMyPe()));
2557         
2558         while(!CqsEmpty(CpvAccess(_delayedRemoteMessageQueue))){
2559                 void *qMsgPtr;
2560                 CqsDequeue(CpvAccess(_delayedRemoteMessageQueue),&qMsgPtr);
2561                 envelope *qEnv = (envelope *)qMsgPtr;
2562                 DEBUG_RECOVERY(printMsg(qEnv,RECOVERY_PROCESS));
2563                 CqsEnqueueGeneral((Queue)CpvAccess(CsdSchedQueue),qEnv,CQS_QUEUEING_FIFO,qEnv->getPriobits(),(unsigned int *)qEnv->getPrioPtr());
2564                 DEBUG_MEM(CmiMemoryCheck());
2565         }
2566
2567 }
2568
2569 /**
2570  * @brief Receives determinants stored on remote nodes.
2571  * Message format: |Header|ObjID list|TN list|Determinant list|
2572  * TN list = |number of TNs|list of TNs|...|
2573  */
2574 void _sendDetsReplyHandler(char *msg){
2575         ResendRequest *resendReply = (ResendRequest *)msg;
2576         CkObjID *listObjects = (CkObjID *)(&msg[sizeof(ResendRequest)]);
2577         char *listTickets = (char *)(&listObjects[resendReply->numberObjects]);
2578         
2579 //      DEBUG_RESTART(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2580         DEBUG_TEAM(printf("[%d] _resendReply from %d \n",CmiMyPe(),resendReply->PE));
2581         
2582         for(int i =0; i< resendReply->numberObjects;i++){
2583                 Chare *obj = (Chare *)listObjects[i].getObject();
2584                 
2585                 int vecsize;
2586                 memcpy(&vecsize,listTickets,sizeof(int));
2587                 listTickets = &listTickets[sizeof(int)];
2588                 MCount *listTNs = (MCount *)listTickets;
2589                 listTickets = &listTickets[vecsize*sizeof(MCount)];
2590         
2591                 if(obj != NULL){
2592                         //the object was restarted on the processor on which it existed
2593                         processReceivedTN(obj,vecsize,listTNs);
2594                 }else{
2595                         //pack up objID vecsize and listTNs and send it to the correct processor
2596                         int totalSize = sizeof(ReceivedTNData)+vecsize*sizeof(MCount);
2597                         char *TNMsg = (char *)CmiAlloc(totalSize);
2598                         ReceivedTNData *receivedTNData = (ReceivedTNData *)TNMsg;
2599                         receivedTNData->recver = listObjects[i];
2600                         receivedTNData->numTNs = vecsize;
2601                         char *tnList = &TNMsg[sizeof(ReceivedTNData)];
2602                         memcpy(tnList,listTNs,sizeof(MCount)*vecsize);
2603                         CmiSetHandler(TNMsg,_receivedTNDataHandlerIdx);
2604                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,TNMsg);
2605                 }
2606
2607         }
2608
2609         // traversing all the retrieved determinants
2610         for(int i = 0; i < resendReply->numberObjects; i++){
2611                 Chare *obj = (Chare *)listObjects[i].getObject();
2612                 
2613                 int vecsize;
2614                 memcpy(&vecsize,listTickets,sizeof(int));
2615                 listTickets = &listTickets[sizeof(int)];
2616                 Determinant *listDets = (Determinant *)listTickets;     
2617                 listTickets = &listTickets[vecsize*sizeof(Determinant)];
2618         
2619                 if(obj != NULL){
2620                         //the object was restarted on the processor on which it existed
2621                         processReceivedDet(obj,vecsize,listDets);
2622                 } else {
2623                         // pack the determinants and ship them to the other processor
2624                         // pack up objID vecsize and listDets and send it to the correct processor
2625                         int totalSize = sizeof(ReceivedDetData) + vecsize*sizeof(Determinant);
2626                         char *detMsg = (char *)CmiAlloc(totalSize);
2627                         ReceivedDetData *receivedDetData = (ReceivedDetData *)detMsg;
2628                         receivedDetData->recver = listObjects[i];
2629                         receivedDetData->numDets = vecsize;
2630                         char *detList = &detMsg[sizeof(ReceivedDetData)];
2631                         memcpy(detList,listDets,sizeof(Determinant)*vecsize);
2632                         CmiSetHandler(detMsg,_receivedDetDataHandlerIdx);
2633                         CmiSyncSendAndFree(listObjects[i].guessPE(),totalSize,detMsg);
2634                 }
2635
2636         }
2637
2638         // checking if we have received all replies
2639         _numRestartResponses++;
2640         if(_numRestartResponses != CkNumPes())
2641                 return;
2642         else 
2643                 _numRestartResponses = 0;
2644
2645         // continuing with restart process; send out the request to resend logged messages to all other processors
2646         CkVec<TProcessedLog> objectVec;
2647         forAllCharesDo(createObjIDList, (void *)&objectVec);
2648         int numberObjects = objectVec.size();
2649         
2650         //      resendMsg layout: |ResendRequest|Array of TProcessedLog|
2651         int totalSize = sizeof(ResendRequest)+numberObjects*sizeof(TProcessedLog);
2652         char *resendMsg = (char *)CmiAlloc(totalSize);  
2653
2654         ResendRequest *resendReq = (ResendRequest *)resendMsg;
2655         resendReq->PE =CkMyPe(); 
2656         resendReq->numberObjects = numberObjects;
2657         char *objList = &resendMsg[sizeof(ResendRequest)];
2658         memcpy(objList,objectVec.getVec(),numberObjects*sizeof(TProcessedLog)); 
2659
2660         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(globalLBID).getObj();
2661         CpvAccess(_currentObj) = lb;
2662         lb->ReceiveDummyMigration(restartDecisionNumber);
2663
2664 //HERE  sleep(10);
2665 //      CkPrintf("[%d] RESUMING RECOVERY with %d \n",CkMyPe(),restartDecisionNumber);
2666         
2667         CmiSetHandler(resendMsg,_resendMessagesHandlerIdx);
2668         for(int i=0;i<CkNumPes();i++){
2669                 if(i != CkMyPe()){
2670                         CmiSyncSend(i,totalSize,resendMsg);
2671                 }
2672         }
2673         _resendMessagesHandler(resendMsg);
2674         CmiFree(resendMsg);
2675
2676         /* test for parallel restart migrate away object**/
2677         if(fastRecovery){
2678                 distributeRestartedObjects();
2679                 printf("[%d] Redistribution of objects done at %.6lf \n",CkMyPe(),CmiWallTimer());
2680         }
2681
2682 //      processDelayedRemoteMsgQueue();
2683
2684 };
2685
2686 /**
2687  * @brief Receives a list of determinants coming from the home PE of a migrated object (parallel restart).
2688  */
2689 void _receivedDetDataHandler(ReceivedDetData *msg){
2690         DEBUG_NOW(char objName[100]);
2691         Chare *obj = (Chare *) msg->recver.getObject();
2692         if(obj){                
2693                 char *_msg = (char *)msg;
2694                 DEBUG(printf("[%d] receivedDetDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2695                 Determinant *listDets = (Determinant *)(&_msg[sizeof(ReceivedDetData)]);
2696                 processReceivedDet(obj,msg->numDets,listDets);
2697                 CmiFree(msg);
2698         }else{
2699                 int totalSize = sizeof(ReceivedDetData)+sizeof(Determinant)*msg->numDets;
2700                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2701         }
2702 }
2703
2704 /**
2705  * @brief Receives a list of TNs coming from the home PE of a migrated object (parallel restart).
2706  */
2707 void _receivedTNDataHandler(ReceivedTNData *msg){
2708         DEBUG_NOW(char objName[100]);
2709         Chare *obj = (Chare *) msg->recver.getObject();
2710         if(obj){                
2711                 char *_msg = (char *)msg;
2712                 DEBUG(printf("[%d] receivedTNDataHandler for %s\n",CmiMyPe(),obj->mlogData->objID.toString(objName)));
2713                 MCount *listTNs = (MCount *)(&_msg[sizeof(ReceivedTNData)]);
2714                 processReceivedTN(obj,msg->numTNs,listTNs);
2715                 CmiFree(msg);
2716         }else{
2717                 int totalSize = sizeof(ReceivedTNData)+sizeof(MCount)*msg->numTNs;
2718                 CmiSyncSendAndFree(msg->recver.guessPE(),totalSize,(char *)msg);
2719         }
2720 };
2721
2722 /**
2723  * @brief Processes the received list of determinants from a particular PE.
2724  */
2725 void processReceivedDet(Chare *obj, int listSize, Determinant *listDets){
2726         Determinant *det;
2727
2728         // traversing the whole list of determinants
2729         for(int i=0; i<listSize; i++){
2730                 det = &listDets[i];
2731                 if(CkMyPe() == 4) printDet(det,"RECOVERY");
2732                 obj->mlogData->verifyTicket(det->sender, det->SN, det->TN);
2733                 DEBUG_RECOVERY(printDet(det,RECOVERY_PROCESS));
2734         }
2735         
2736         DEBUG_MEM(CmiMemoryCheck());
2737 }
2738         
2739 /**
2740  * @brief Processes the received list of tickets from a particular PE.
2741  */
2742 void processReceivedTN(Chare *obj, int listSize, MCount *listTNs){
2743         // increases the number of resendReply received
2744         obj->mlogData->resendReplyRecvd++;
2745
2746         DEBUG(char objName[100]);
2747         DEBUG(CkPrintf("[%d] processReceivedTN obj->mlogData->resendReplyRecvd=%d CkNumPes()=%d\n",CkMyPe(),obj->mlogData->resendReplyRecvd,CkNumPes()));
2748         //CkPrintf("[%d] processReceivedTN with %d listSize by %s\n",CkMyPe(),listSize,obj->mlogData->objID.toString(objName));
2749         //if(obj->mlogData->receivedTNs == NULL)
2750         //      CkPrintf("NULL\n");     
2751         //CkPrintf("using %d entries\n",obj->mlogData->receivedTNs->length());  
2752
2753         // includes the tickets into the receivedTN structure
2754         for(int j=0;j<listSize;j++){
2755                 obj->mlogData->receivedTNs->push_back(listTNs[j]);
2756         }
2757         
2758         //if this object has received all the replies find the ticket numbers
2759         //that senders know about. Those less than the ticket number processed 
2760         //by the receiver can be thrown away. The rest need not be consecutive
2761         // ie there can be holes in the list of ticket numbers seen by senders
2762         if(obj->mlogData->resendReplyRecvd == (CkNumPes() -1)){
2763                 obj->mlogData->resendReplyRecvd = 0;
2764
2765 #if VERIFY_DETS
2766                 //sort the received TNS
2767                 sortVec(obj->mlogData->receivedTNs);
2768         
2769                 //after all the received tickets are in we need to sort them and then 
2770                 // calculate the holes  
2771                 if(obj->mlogData->receivedTNs->size() > 0){
2772                         int tProcessedIndex = searchVec(obj->mlogData->receivedTNs,obj->mlogData->tProcessed);
2773                         int vecsize = obj->mlogData->receivedTNs->size();
2774                         int numberHoles = ((*obj->mlogData->receivedTNs)[vecsize-1] - obj->mlogData->tProcessed)-(vecsize -1 - tProcessedIndex);
2775                         
2776                         // updating tCount with the highest ticket handed out
2777                         if(teamSize > 1){
2778                                 if(obj->mlogData->tCount < (*obj->mlogData->receivedTNs)[vecsize-1])
2779                                         obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2780                         }else{
2781                                 obj->mlogData->tCount = (*obj->mlogData->receivedTNs)[vecsize-1];
2782                         }
2783                         
2784                         if(numberHoles == 0){
2785                         }else{
2786                                 char objName[100];                                      
2787                                 printf("[%d] Holes detected in the TNs for %s number %d \n",CkMyPe(),obj->mlogData->objID.toString(objName),numberHoles);
2788                                 obj->mlogData->numberHoles = numberHoles;
2789                                 obj->mlogData->ticketHoles = new MCount[numberHoles];
2790                                 int countHoles=0;
2791                                 for(int k=tProcessedIndex+1;k<vecsize;k++){
2792                                         if((*obj->mlogData->receivedTNs)[k] != (*obj->mlogData->receivedTNs)[k-1]+1){
2793                                                 //the TNs are not consecutive at this point
2794                                                 for(MCount newTN=(*obj->mlogData->receivedTNs)[k-1]+1;newTN<(*obj->mlogData->receivedTNs)[k];newTN++){
2795                                                         DEBUG(CKPrintf("hole no %d at %d next available ticket %d \n",countHoles,newTN,(*obj->mlogData->receivedTNs)[k]));
2796                                                         obj->mlogData->ticketHoles[countHoles] = newTN;
2797                                                         countHoles++;
2798                                                 }       
2799                                         }
2800                                 }
2801                                 //Holes have been given new TN
2802                                 if(countHoles != numberHoles){
2803                                         char str[100];
2804                                         printf("[%d] Obj %s countHoles %d numberHoles %d\n",CmiMyPe(),obj->mlogData->objID.toString(str),countHoles,numberHoles);
2805                                 }
2806                                 CkAssert(countHoles == numberHoles);                                    
2807                                 obj->mlogData->currentHoles = numberHoles;
2808                         }
2809                 }
2810 #else
2811                 if(obj->mlogData->receivedTNs->size() > 0){
2812                         obj->mlogData->tCount = maxVec(obj->mlogData->receivedTNs);
2813                 }
2814 #endif
2815                 // cleaning up structures and getting ready to continue execution       
2816                 delete obj->mlogData->receivedTNs;
2817                 DEBUG(CkPrintf("[%d] Resetting receivedTNs\n",CkMyPe()));
2818                 obj->mlogData->receivedTNs = NULL;
2819                 obj->mlogData->restartFlag = 0;
2820
2821                 // processDelayedRemoteMsgQueue();
2822
2823                 DEBUG_RESTART(char objString[100]);
2824                 DEBUG_RESTART(CkPrintf("[%d] Can restart handing out tickets again at %.6lf for %s\n",CkMyPe(),CmiWallTimer(),obj->mlogData->objID.toString(objString)));
2825         }
2826
2827 }
2828
2829 /** @brief Returns the maximum ticket from a vector */
2830 MCount maxVec(CkVec<MCount> *TNvec){
2831         MCount max = 0;
2832         for(int i=0; i<TNvec->size(); i++){
2833                 if((*TNvec)[i] > max)
2834                         max = (*TNvec)[i];
2835         }
2836         return max;
2837 }
2838
2839 void sortVec(CkVec<MCount> *TNvec){
2840         //sort it ->its bloddy bubble sort
2841         //TODO: use quicksort
2842         for(int i=0;i<TNvec->size();i++){
2843                 for(int j=i+1;j<TNvec->size();j++){
2844                         if((*TNvec)[j] < (*TNvec)[i]){
2845                                 MCount temp;
2846                                 temp = (*TNvec)[i];
2847                                 (*TNvec)[i] = (*TNvec)[j];
2848                                 (*TNvec)[j] = temp;
2849                         }
2850                 }
2851         }
2852         //make it unique .. since its sorted all equal units will be consecutive
2853         MCount *tempArray = new MCount[TNvec->size()];
2854         int     uniqueCount=-1;
2855         for(int i=0;i<TNvec->size();i++){
2856                 tempArray[i] = 0;
2857                 if(uniqueCount == -1 || tempArray[uniqueCount] != (*TNvec)[i]){
2858                         uniqueCount++;
2859                         tempArray[uniqueCount] = (*TNvec)[i];
2860                 }
2861         }
2862         uniqueCount++;
2863         TNvec->removeAll();
2864         for(int i=0;i<uniqueCount;i++){
2865                 TNvec->push_back(tempArray[i]);
2866         }
2867         delete [] tempArray;
2868 }       
2869
2870 int searchVec(CkVec<MCount> *TNVec,MCount searchTN){
2871         if(TNVec->size() == 0){
2872                 return -1; //not found in an empty vec
2873         }
2874         //binary search to find 
2875         int left=0;
2876         int right = TNVec->size();
2877         int mid = (left +right)/2;
2878         while(searchTN != (*TNVec)[mid] && left < right){
2879                 if((*TNVec)[mid] > searchTN){
2880                         right = mid-1;
2881                 }else{
2882                         left = mid+1;
2883                 }
2884                 mid = (left + right)/2;
2885         }
2886         if(left < right){
2887                 //mid is the element to be returned
2888                 return mid;
2889         }else{
2890                 if(mid < TNVec->size() && mid >=0){
2891                         if((*TNVec)[mid] == searchTN){
2892                                 return mid;
2893                         }else{
2894                                 return -1;
2895                         }
2896                 }else{
2897                         return -1;
2898                 }
2899         }
2900 };
2901
2902
2903 /*
2904         Method to do parallel restart. Distribute some of the array elements to other processors.
2905         The problem is that we cant use to charm entry methods to do migration as it will get
2906         stuck in the protocol that is going to restart
2907         Note: in order to avoid interference between the objects being recovered, the current PE
2908     will NOT keep any object. It will be devoted to forward the messages to recovering objects.    Otherwise, the current PE has to do both things, recover objects and forward messages and 
2909     objects end up stepping into each other's shoes (interference).
2910 */
2911
2912 class ElementDistributor: public CkLocIterator{
2913         CkLocMgr *locMgr;
2914         int *targetPE;
2915
2916         void pupLocation(CkLocation &loc,PUP::er &p){
2917                 CkArrayIndexMax idx=loc.getIndex();
2918                 CkGroupID gID = locMgr->ckGetGroupID();
2919                 p|gID;      // store loc mgr's GID as well for easier restore
2920                 p|idx;
2921                 p|loc;
2922         };
2923 public:
2924         ElementDistributor(CkLocMgr *mgr_,int *toPE_):locMgr(mgr_),targetPE(toPE_){};
2925
2926         void addLocation(CkLocation &loc){
2927
2928                 // leaving object on this PE
2929                 if(*targetPE == CkMyPe()){
2930                         *targetPE = (*targetPE +1)%CkNumPes();
2931                         return;
2932                 }
2933                         
2934                 CkArrayIndexMax idx = loc.getIndex();
2935                 CkLocRec_local *rec = loc.getLocalRecord();
2936                 CkLocMgr *locMgr = loc.getManager();
2937                 CkVec<CkMigratable *> eltList;
2938                         
2939                 CkPrintf("[%d] Distributing objects to Processor %d: ",CkMyPe(),*targetPE);
2940                 idx.print();
2941
2942                 // incrementing number of emigrant objects
2943                 CpvAccess(_numEmigrantRecObjs)++;
2944         locMgr->migratableList((CkLocRec_local *)rec,eltList);
2945                 CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
2946                 
2947                 // let everybody else know the object is leaving
2948                 locMgr->callMethod(rec,&CkMigratable::ckAboutToMigrate);
2949                 reductionMgr->incNumEmigrantRecObjs();
2950         
2951                 //pack up this location and send it across
2952                 PUP::sizer psizer;
2953                 pupLocation(loc,psizer);
2954                 int totalSize = psizer.size() + sizeof(DistributeObjectMsg);
2955                 char *msg = (char *)CmiAlloc(totalSize);
2956                 DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)msg;
2957                 distributeMsg->PE = CkMyPe();
2958                 char *buf = &msg[sizeof(DistributeObjectMsg)];
2959                 PUP::toMem pmem(buf);
2960                 pmem.becomeDeleting();
2961                 pupLocation(loc,pmem);
2962                         
2963                 locMgr->setDuringMigration(CmiTrue);
2964                 delete rec;
2965                 locMgr->setDuringMigration(CmiFalse);
2966                 locMgr->inform(idx,*targetPE);
2967
2968                 CmiSetHandler(msg,_distributedLocationHandlerIdx);
2969                 CmiSyncSendAndFree(*targetPE,totalSize,msg);
2970
2971                 CmiAssert(locMgr->lastKnown(idx) == *targetPE);
2972
2973                 //decide on the target processor for the next object
2974                 *targetPE = *targetPE + 1;
2975                 if(*targetPE > (CkMyPe() + parallelRecovery)){
2976                         *targetPE = CkMyPe() + 1;
2977                 }
2978         }
2979
2980 };
2981
2982 /**
2983  * Distributes objects to accelerate recovery after a failure.
2984  */
2985 void distributeRestartedObjects(){
2986         int numGroups = CkpvAccess(_groupIDTable)->size();      
2987         int i;
2988         int targetPE=CkMyPe()+1;
2989         CKLOCMGR_LOOP(ElementDistributor distributor(mgr,&targetPE);mgr->iterate(distributor););
2990 };
2991
2992 /**
2993  * Handler to receive back a location.
2994  */
2995 void _sendBackLocationHandler(char *receivedMsg){
2996         printf("Array element received at processor %d after recovery\n",CkMyPe());
2997         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
2998         int sourcePE = distributeMsg->PE;
2999         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
3000         PUP::fromMem pmem(buf);
3001         CkGroupID gID;
3002         CkArrayIndexMax idx;
3003         pmem |gID;
3004         pmem |idx;
3005         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3006         donotCountMigration=1;
3007         mgr->resume(idx,pmem,CmiTrue);
3008         donotCountMigration=0;
3009         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3010         printf("Array element inserted at processor %d after parallel recovery\n",CkMyPe());
3011         idx.print();
3012
3013         // decrementing number of emigrant objects at reduction manager
3014         CkVec<CkMigratable *> eltList;
3015         CkLocRec *rec = mgr->elementRec(idx);
3016         mgr->migratableList((CkLocRec_local *)rec,eltList);
3017         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[0]->mlogData->objID.data.array.id).getObj();
3018         reductionMgr->decNumEmigrantRecObjs();
3019         reductionMgr->decGCount();
3020
3021         // checking if it has received all emigrant recovering objects
3022         CpvAccess(_numEmigrantRecObjs)--;
3023         if(CpvAccess(_numEmigrantRecObjs) == 0){
3024                 (*resumeLbFnPtr)(centralLb);
3025         }
3026
3027 }
3028
3029 /**
3030  * Handler to update information about an object just received.
3031  */
3032 void _distributedLocationHandler(char *receivedMsg){
3033         printf("Array element received at processor %d after distribution at restart\n",CkMyPe());
3034         DistributeObjectMsg *distributeMsg = (DistributeObjectMsg *)receivedMsg;
3035         int sourcePE = distributeMsg->PE;
3036         char *buf = &receivedMsg[sizeof(DistributeObjectMsg)];
3037         PUP::fromMem pmem(buf);
3038         CkGroupID gID;
3039         CkArrayIndexMax idx;
3040         pmem |gID;
3041         pmem |idx;
3042         CkLocMgr *mgr = (CkLocMgr*)CkpvAccess(_groupTable)->find(gID).getObj();
3043         donotCountMigration=1;
3044         mgr->resume(idx,pmem,CmiTrue);
3045         donotCountMigration=0;
3046         informLocationHome(gID,idx,mgr->homePe(idx),CkMyPe());
3047         printf("Array element inserted at processor %d after distribution at restart ",CkMyPe());
3048         idx.print();
3049
3050         CkLocRec *rec = mgr->elementRec(idx);
3051         CmiAssert(rec->type() == CkLocRec::local);
3052
3053         // adding object to the list of immigrant recovery objects
3054         CpvAccess(_immigrantRecObjs)->push_back(new CkLocation(mgr,(CkLocRec_local *)rec));
3055         CpvAccess(_numImmigrantRecObjs)++;
3056         
3057         CkVec<CkMigratable *> eltList;
3058         mgr->migratableList((CkLocRec_local *)rec,eltList);
3059         for(int i=0;i<eltList.size();i++){
3060                 if(eltList[i]->mlogData->toResumeOrNot == 1 && eltList[i]->mlogData->resumeCount < globalResumeCount){
3061                         CpvAccess(_currentObj) = eltList[i];
3062                         eltList[i]->mlogData->immigrantRecFlag = 1;
3063                         eltList[i]->mlogData->immigrantSourcePE = sourcePE;
3064
3065                         // incrementing immigrant counter at reduction manager
3066                         CkReductionMgr *reductionMgr = (CkReductionMgr*)CkpvAccess(_groupTable)->find(eltList[i]->mlogData->objID.data.array.id).getObj();
3067                         reductionMgr->incNumImmigrantRecObjs();
3068                         reductionMgr->decGCount();
3069
3070                         eltList[i]->ResumeFromSync();
3071                 }
3072         }
3073 }
3074
3075
3076 /** this method is used to send messages to a restarted processor to tell
3077  * it that a particular expected object is not going to get to it */
3078 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE){
3079         DummyMigrationMsg buf;
3080         buf.flag = MLOG_OBJECT;
3081         buf.lbID = lbID;
3082         buf.mgrID = locMgrID;
3083         buf.idx = idx;
3084         buf.locationPE = locationPE;
3085         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3086         CmiSyncSend(restartPE,sizeof(DummyMigrationMsg),(char *)&buf);
3087 };
3088
3089
3090 /**this method is used by a restarted processor to tell other processors
3091  * that they are not going to receive these many objects.. just the count
3092  * not the objects themselves ***/
3093
3094 void sendDummyMigrationCounts(int *dummyCounts){
3095         DummyMigrationMsg buf;
3096         buf.flag = MLOG_COUNT;
3097         buf.lbID = globalLBID;
3098         CmiSetHandler(&buf,_dummyMigrationHandlerIdx);
3099         for(int i=0;i<CmiNumPes();i++){
3100                 if(i != CmiMyPe() && dummyCounts[i] != 0){
3101                         buf.count = dummyCounts[i];
3102                         CmiSyncSend(i,sizeof(DummyMigrationMsg),(char *)&buf);
3103                 }
3104         }
3105 }
3106
3107
3108 /** this handler is used to process a dummy migration msg.
3109  * it looks up the load balancer and calls migrated for it */
3110
3111 void _dummyMigrationHandler(DummyMigrationMsg *msg){
3112         CentralLB *lb = (CentralLB *)CkpvAccess(_groupTable)->find(msg->lbID).getObj();
3113         if(msg->flag == MLOG_OBJECT){
3114                 DEBUG_RESTART(CmiPrintf("[%d] dummy Migration received from pe %d for %d:%s \n",CmiMyPe(),msg->locationPE,msg->mgrID.idx,idx2str(msg->idx)));
3115                 LDObjHandle h;
3116                 lb->Migrated(h,1);
3117         }
3118         if(msg->flag == MLOG_COUNT){
3119                 DEBUG_RESTART(CmiPrintf("[%d] dummyMigration count %d received from restarted processor\n",CmiMyPe(),msg->count));
3120                 msg->count -= verifyAckedRequests;
3121                 for(int i=0;i<msg->count;i++){
3122                         LDObjHandle h;
3123                         lb->Migrated(h,1);
3124                 }
3125         }
3126         verifyAckedRequests=0;
3127         CmiFree(msg);
3128 };
3129
3130 /*****************************************************
3131         Implementation of a method that can be used to call
3132         any method on the ChareMlogData of all the chares on
3133         a processor currently
3134 ******************************************************/
3135
3136
3137 class ElementCaller :  public CkLocIterator {
3138 private:
3139         CkLocMgr *locMgr;
3140         MlogFn fnPointer;
3141         void *data;
3142 public:
3143         ElementCaller(CkLocMgr * _locMgr, MlogFn _fnPointer,void *_data){
3144                 locMgr = _locMgr;
3145                 fnPointer = _fnPointer;
3146                 data = _data;
3147         };
3148         void addLocation(CkLocation &loc){
3149                 CkVec<CkMigratable *> list;
3150                 CkLocRec_local *local = loc.getLocalRecord();
3151                 locMgr->migratableList (local,list);
3152                 for(int i=0;i<list.size();i++){
3153                         CkMigratable *migratableElement = list[i];
3154                         fnPointer(data,migratableElement->mlogData);
3155                 }
3156         }
3157 };
3158
3159 /**
3160  * Map function pointed by fnPointer over all the chares living in this processor.
3161  */
3162 void forAllCharesDo(MlogFn fnPointer, void *data){
3163         int numGroups = CkpvAccess(_groupIDTable)->size();
3164         for(int i=0;i<numGroups;i++){
3165                 Chare *obj = (Chare *)CkpvAccess(_groupTable)->find((*CkpvAccess(_groupIDTable))[i]).getObj();
3166                 fnPointer(data,obj->mlogData);
3167         }
3168         int numNodeGroups = CksvAccess(_nodeGroupIDTable).size();
3169         for(int i=0;i<numNodeGroups;i++){
3170                 Chare *obj = (Chare *)CksvAccess(_nodeGroupTable)->find(CksvAccess(_nodeGroupIDTable)[i]).getObj();
3171                 fnPointer(data,obj->mlogData);
3172         }
3173         int i;
3174         CKLOCMGR_LOOP(ElementCaller caller(mgr, fnPointer,data); mgr->iterate(caller););
3175 };
3176
3177
3178 /******************************************************************
3179  Load Balancing
3180 ******************************************************************/
3181
3182 /**
3183  * This is the first time Converse is called after AtSync method has been called by every local object.
3184  * It is a good place to insert some optimizations for synchronized checkpoint. In the case of causal
3185  * message logging, we can take advantage of this situation and garbage collect at this point.
3186  */
3187 void initMlogLBStep(CkGroupID gid){
3188         DEBUGLB(CkPrintf("[%d] INIT MLOG STEP\n",CkMyPe()));
3189         countLBMigratedAway = 0;
3190         countLBToMigrate=0;
3191         onGoingLoadBalancing=1;
3192         migrationDoneCalled=0;
3193         checkpointBarrierCount=0;
3194         if(globalLBID.idx != 0){
3195                 CmiAssert(globalLBID.idx == gid.idx);
3196         }
3197         globalLBID = gid;
3198 #if SYNCHRONIZED_CHECKPOINT
3199         garbageCollectMlog();
3200 #endif
3201 }
3202
3203 /**
3204  * Pups a location
3205  */
3206 void pupLocation(CkLocation *loc, CkLocMgr *locMgr, PUP::er &p){
3207         CkArrayIndexMax idx = loc->getIndex();
3208         CkGroupID gID = locMgr->ckGetGroupID();
3209         p|gID;      // store loc mgr's GID as well for easier restore
3210         p|idx;
3211         p|*loc;
3212 };
3213
3214 /**
3215  * Sends back the immigrant recovering object to their origin PE.
3216  */
3217 void sendBackImmigrantRecObjs(){
3218         CkLocation *loc;
3219         CkLocMgr *locMgr;
3220         CkArrayIndexMax idx;
3221         CkLocRec_local *rec;
3222         PUP::sizer psizer;
3223         int targetPE;
3224         CkVec<CkMigratable *> eltList;
3225         CkReductionMgr *reductionMgr;
3226  
3227         // looping through all elements in immigrant recovery objects vector
3228         for(int i=0; i<CpvAccess(_numImmigrantRecObjs); i++){
3229
3230                 // getting the components of each&nbs