84a2db1e9cd9f0516f8d082308b608fcae3dde2a
[charm.git] / src / ck-core / ckcausalmlog.h
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
3
4 #include "ckobjid.h"
5
6 #if CMK_HAS_STRINGS_H
7 #include <strings.h>            /* defines bzero */
8 #else
9 #define bzero(s,n)   memset(s,0,n)
10 #endif
11
12 CpvExtern(Chare *,_currentObj);
13
14 //states of a ticket sent as a reply to a request
15 #define NEW_TICKET 1
16 #define OLD_TICKET 2
17 #define FORWARDED_TICKET 0x8000
18
19 //TML: global variable for the size of the team
20 #define MLOG_RESTARTED 0
21 #define MLOG_CRASHED 1
22 #define MEGABYTE 1048576
23
24 //array on which we print the formatted string representing an object id
25 extern char objString[100];
26
27 // defines the initial size of _bufferedDets
28 #define INITIAL_BUFFERED_DETERMINANTS 1024
29
30 // constant to define the type of checkpoint used (synchronized or not)
31 #define SYNCHRONIZED_CHECKPOINT 1
32
33 /**
34  * @brief Struct to store the determinant of a particular message.
35  * The determinant remembers all the necessary information for a 
36  * message to be replayed in the same order as in the execution prior
37  * the failure.
38  */
39 typedef struct {
40         // sender ID
41         CkObjID sender;
42         // receiver ID
43         CkObjID receiver;
44         // SSN: sender sequence number
45         MCount SN;
46         // TN: ticket number (RSN: receiver sequence number)
47         MCount TN;
48 } Determinant;
49
50 /**
51  * @brief Typedef for the hashtable type that maps object IDs to determinants.
52  */
53 typedef CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> CkDeterminantHashtableT;
54
55 /**
56  * @brief Struct for the header of the removeDeterminants handler
57  */
58 typedef struct {
59         char header[CmiMsgHeaderSizeBytes];
60         int phase;
61         int index;
62 } RemoveDeterminantsHeader;
63
64
65 /**
66  * @brief Struct for the header of the storeDeterminants handler
67  */
68 typedef struct {
69         char header[CmiMsgHeaderSizeBytes];
70         int number;
71         int index;
72         int phase;
73         int PE;
74 } StoreDeterminantsHeader;
75
76 /**
77  * @brief Structure for a ticket assigned to a particular message.
78  */
79 class Ticket {
80 public:
81         MCount TN;
82         int state;
83         Ticket(){
84                 TN = 0;
85                 state = 0;
86         }
87         Ticket(int x){
88                 TN = x;
89                 state = 0;
90         }
91 };
92 PUPbytes(Ticket)
93 class MlogEntry;
94
95 class RestoredLocalMap;
96
97 #define INITSIZE_SNTOTICKET 100
98
99 /**
100  * @brief Class that maps SN (sequence numbers) to TN (ticket numbers)
101  * for a particular object.
102  */
103 class SNToTicket{
104         private:
105                 Ticket initial[INITSIZE_SNTOTICKET];
106                 Ticket *ticketVec;
107                 MCount startSN;
108                 int currentSize;
109                 MCount finishSN;
110         public:
111                 SNToTicket(){
112                         currentSize = INITSIZE_SNTOTICKET;
113                         ticketVec = &initial[0];
114                         bzero(ticketVec,sizeof(Ticket)*currentSize);
115                         startSN = 0;
116                         finishSN = 0;
117                 }
118                 /**
119                  * Gets the finishSN value.
120                  */ 
121                 inline MCount getFinishSN(){
122                         return finishSN;
123                 }
124                 /**
125                  * Gets the startSN value.
126                  */      
127                 inline MCount getStartSN(){
128                         return startSN;
129                 }
130                 //assume indices start from 1.. true for MCounts
131                 inline Ticket &put(MCount SN){
132                         if(SN > finishSN) finishSN = SN;
133                         if(startSN == 0){
134                                 startSN = SN;                           
135                         }
136                         int index = SN-startSN;
137                         if(index >= currentSize){
138                                 int oldSize = currentSize;
139                                 Ticket *old = ticketVec;
140                                 
141                                 currentSize = index*2;
142                                 ticketVec = new Ticket[currentSize];
143                                 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
144                                 if(old != &initial[0]){                                 
145                                         delete [] old;
146                                 }
147                         }
148                         return ticketVec[index];
149                 }
150
151                 inline Ticket get(MCount SN){
152                         int index = SN-startSN;
153                         CmiAssert(index >= 0);
154                         if(index >= currentSize){
155                                 Ticket tn;
156                                 return tn;
157                         }else{
158                                 return ticketVec[index];
159                         }
160                 }
161
162                 inline void pup(PUP::er &p){
163                         p | startSN;
164                         p | currentSize;
165                         if(p.isUnpacking()){
166                                 if(currentSize > INITSIZE_SNTOTICKET){
167                                         ticketVec = new Ticket[currentSize];
168                                 }
169                         }
170                         for(int i=0;i<currentSize;i++){
171                                 p | ticketVec[i];
172                         }
173                 }       
174 };
175
176
177 /**
178  * This file includes the definition of the class for storing the meta data
179  * associdated with the message logging protocol.
180  */
181
182
183 /**
184  * @brief This class stores all the message logging related data for a chare.
185  */
186 class ChareMlogData{
187 public:
188         // Object unique ID.
189         CkObjID objID;
190         // Counts how many tickets have been handed out.
191         MCount tCount; 
192         // Stores the highest ticket that has been processed.
193         MCount tProcessed;
194         
195         //TODO: pup receivedTNs
196         CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
197         MCount *ticketHoles;
198         int numberHoles;
199         int currentHoles;
200         // variable that keeps a count of the processors that have replied to a requests to resend messages. 
201         int resendReplyRecvd;
202         // 0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time 
203         int restartFlag;
204         // 0 -> normal state .. 1 -> recovery of a team member 
205     int teamRecoveryFlag;       
206         //TML: teamTable, stores the SN to TN mapping for messages intra team
207         CkHashtableT<CkHashtableAdaptorT<CkObjID>, SNToTicket *> teamTable;
208
209         int toResumeOrNot;
210         int resumeCount;
211
212 private:
213
214         // SNTable, stores the number of messages sent (sequence numbers) to other objects.
215         CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
216         // TNTable, stores the ticket associated with a particular combination <ObjectID,SN>.
217         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
218         // Log of messages sent.
219         CkQ<MlogEntry *> mlog;
220         
221                 
222         inline MCount newTN();
223
224 public:
225         /**
226          * Default constructor.
227          */ 
228         ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
229                 tCount = 0;
230                 tProcessed = 0;
231                 numberHoles = 0;
232                 ticketHoles = NULL;
233                 currentHoles = 0;
234                 restartFlag=0;
235                 teamRecoveryFlag=0;
236                 receivedTNs = NULL;
237                 resendReplyRecvd=0;
238                 toResumeOrNot=0;
239                 resumeCount=0;
240         };
241         inline MCount nextSN(const CkObjID &recver);
242         inline Ticket next_ticket(CkObjID &sender,MCount SN);
243         inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
244         inline Ticket getTicket(CkObjID &sender, MCount SN);
245         void addLogEntry(MlogEntry *entry);
246         virtual void pup(PUP::er &p);
247         CkQ<MlogEntry *> *getMlog(){ return &mlog;};
248         MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
249 };
250
251 /**
252  * @brief Entry in a message log. It also includes the index of the buffered
253  * determinants array and the number of appended determinants.
254  * @note: this message appended numBufDets counting downwards from indexBufDets.
255  * In other words, if indexBufDets == 5 and numBufDets = 3, it means that
256  * determinants bufDets[2], bufDets[3] and bufDets[4] were piggybacked.
257  */
258 class MlogEntry{
259 public:
260         envelope *env;
261         int destPE;
262         int _infoIdx;
263         int indexBufDets;
264         int numBufDets;
265         
266         MlogEntry(envelope *_env,int _destPE,int __infoIdx){
267                 env = _env;
268                 destPE = _destPE;
269                 _infoIdx = __infoIdx;
270         }
271         MlogEntry(){
272                 env = 0;
273                 destPE = -1;
274                 _infoIdx = 0;
275         }
276         ~MlogEntry(){
277                 if(env){
278                         CmiFree(env);
279                 }
280         }
281         virtual void pup(PUP::er &p);
282 };
283
284 /**
285  * @brief 
286  */
287 class LocationID{
288 public:
289         CkArrayIndexMax idx;
290         CkGroupID gid;
291 };
292
293 /**
294  * @brief
295  */
296 class StoredCheckpoint{
297 public:
298         char *buf;
299         int bufSize;
300         int PE;
301         StoredCheckpoint(){
302                 buf = NULL;
303                 bufSize = 0;
304                 PE = -1;
305         };
306 };
307
308 /**
309  *  @brief Class for storing metadata of local messages.
310  *  It maps sequence numbers to ticket numbers.
311  *  It is used after a restart to maintain the same ticket numbers.
312  */
313 class RestoredLocalMap {
314 public:
315         MCount minSN,maxSN,count;
316         MCount *TNArray;
317         RestoredLocalMap(){
318                 minSN=maxSN=count=0;
319                 TNArray=NULL;
320         };
321         RestoredLocalMap(int i){
322                 minSN=maxSN=count=0;
323                 TNArray=NULL;
324         };
325
326         virtual void pup(PUP::er &p);
327 };
328
329
330 typedef struct {
331         char header[CmiMsgHeaderSizeBytes];
332         CkObjID sender;
333         CkObjID recver;
334         MlogEntry *logEntry;
335         MCount SN;
336         MCount TN;
337         int senderPE;
338 } TicketRequest;
339 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
340 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
341
342 typedef struct{
343         TicketRequest request;
344         Ticket ticket;
345         int recverPE;
346 } TicketReply;
347
348 CpvExtern(char**,_bufferedTicketRequests);
349 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
350
351
352
353 typedef struct {
354         char header[CmiMsgHeaderSizeBytes];
355         int numberLogs;
356 } BufferedLocalLogHeader;
357
358 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
359
360 typedef struct{
361         char header[CmiMsgHeaderSizeBytes];
362         int PE;
363         int dataSize;
364 } CheckPointDataMsg;
365
366 /*typedef struct{
367         char header[CmiMsgHeaderSizeBytes];
368         int PE;
369         int dataSize;
370 } CheckPointAck;*/
371
372 typedef CheckPointDataMsg CheckPointAck;
373
374 typedef struct{
375         CkObjID recver;
376         MCount tProcessed;
377 } TProcessedLog;
378
379
380 /**
381  * Struct to request a particular action during restart.
382  */
383 typedef struct{
384         char header[CmiMsgHeaderSizeBytes];
385         int PE;
386 } RestartRequest;
387
388 typedef RestartRequest CkPingMsg;
389 typedef RestartRequest CheckpointRequest;
390
391 typedef struct{
392         char header[CmiMsgHeaderSizeBytes];
393         int PE;
394         double restartWallTime;
395         int checkPointSize;
396         int numMigratedAwayElements;
397         int numMigratedInElements;
398         int migratedElementSize;
399         int numLocalMessages;   
400         CkGroupID lbGroupID;
401 } RestartProcessorData;
402
403 typedef struct{
404         char header[CmiMsgHeaderSizeBytes];
405         int PE;
406         int numberObjects;
407 } ResendRequest;
408
409 typedef ResendRequest RemoveLogRequest;
410
411 typedef struct {
412         char header[CmiMsgHeaderSizeBytes];
413         CkObjID recver;
414         int numTNs;
415 } ReceivedTNData;
416
417 // Structure to forward determinants in parallel restart
418 typedef struct {
419         char header[CmiMsgHeaderSizeBytes];
420         CkObjID recver;
421         int numDets;
422 } ReceivedDetData;
423
424 typedef struct{
425         int PE;
426         int numberObjects;
427         TProcessedLog *listObjects;
428         CkVec<MCount> *ticketVecs;
429 } ResendData;
430
431 typedef struct {
432         CkGroupID gID;
433         CkArrayIndexMax idx;
434         int fromPE,toPE;
435         char ackFrom,ackTo;
436 } MigrationRecord;
437
438 typedef struct {
439         char header[CmiMsgHeaderSizeBytes];
440         MigrationRecord migRecord;
441         void *record;
442 } MigrationNotice;
443
444 typedef struct {
445         char header[CmiMsgHeaderSizeBytes];
446         void *record;
447 } MigrationNoticeAck;
448
449 typedef struct {
450         MigrationRecord migRecord;
451         void *msg;
452         int size;
453         char acked;
454 } RetainedMigratedObject;
455
456 typedef struct {
457         char header[CmiMsgHeaderSizeBytes];
458         MigrationRecord migRecord;
459         int index;
460         int fromPE;
461 } VerifyAckMsg;
462
463 typedef struct {
464         char header[CmiMsgHeaderSizeBytes];
465         int checkpointCount;
466         int fromPE;
467 } CheckpointBarrierMsg;
468
469
470 //message used to inform a locmgr of an object's current location
471 typedef struct {
472         char header[CmiMsgHeaderSizeBytes];
473         CkGroupID mgrID;
474         CkArrayIndexMax idx;
475         int locationPE;
476         int fromPE;
477 } CurrentLocationMsg;
478
479 typedef struct {
480         char header[CmiMsgHeaderSizeBytes];
481         CkGroupID lbID;
482         int fromPE;
483         int step;
484 } LBStepMsg;
485
486
487 #define MLOG_OBJECT 1
488 #define MLOG_COUNT 2
489
490 typedef struct {
491         char header[CmiMsgHeaderSizeBytes];
492         int flag;// specific object(1) or count(2)
493         CkGroupID lbID;
494         int count;// if just count
495         /**if object **/
496         CkGroupID mgrID;
497         CkArrayIndexMax idx;
498         int locationPE;
499 } DummyMigrationMsg;
500
501
502 //function pointer passed to the forAllCharesDo method.
503 //It takes a void *data and a ChareMlogData pointer 
504 //It gets called for each chare
505 typedef void (*MlogFn)(void *,ChareMlogData *);
506
507 void _messageLoggingInit();
508
509 //Methods for sending ticket requests
510 void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
511 void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
512 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid);
513 void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
514 void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
515 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
516 void sendLocalMsg(MlogEntry *entry);
517
518 //handler functions
519 void _ticketRequestHandler(TicketRequest *);
520 void _ticketHandler(TicketReply *);
521 void _pingHandler(CkPingMsg *msg);
522 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
523 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
524 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
525 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
526 void _storeDeterminantsHandler(char *buffer);
527 void _removeDeterminantsHandler(char *buffer);
528
529
530 //methods for sending messages
531 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
532 extern void _noCldNodeEnqueue(int node, envelope *env);
533 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
534 void retryTicketRequest(void *_ticketRequest,double curWallTime);
535
536 //methods to process received messages with respect to mlog
537 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
538 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
539
540
541 //Checkpoint
542 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
543
544 //methods for checkpointing
545 void checkpointAlarm(void *_dummy,double curWallTime);
546 void startMlogCheckpoint(void *_dummy,double curWallTime);
547 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listSize=0);
548
549 //handler functions for checkpoint
550 void _checkpointRequestHandler(CheckpointRequest *request);
551 void _storeCheckpointHandler(char *msg);
552 void _checkpointAckHandler(CheckPointAck *ackMsg);
553 void _removeProcessedLogHandler(char *requestMsg);
554 void garbageCollectMlog();
555
556 //handler idxs for checkpoint
557 extern int _checkpointRequestHandlerIdx;
558 extern int _storeCheckpointHandlerIdx;
559 extern int _checkpointAckHandlerIdx;
560 extern int _removeProcessedLogHandlerIdx;
561
562 //Restart 
563
564
565 //methods for restart
566 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
567 void CkMlogRestartDouble(void *,double);
568 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
569 void processReceivedDet(Chare *obj,int vecsize, Determinant *listDets);
570 void initializeRestart(void *data,ChareMlogData *mlogData);
571 void distributeRestartedObjects();
572 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
573
574 //TML: function for locally calling the restart
575 void CkMlogRestartLocal();
576
577 //handler functions for restart
578 void _getCheckpointHandler(RestartRequest *restartMsg);
579 void _recvCheckpointHandler(char *_restartData);
580 void _resendMessagesHandler(char *msg);
581 void _resendReplyHandler(char *msg);
582 void _receivedTNDataHandler(ReceivedTNData *msg);
583 void _receivedDetDataHandler(ReceivedDetData *msg);
584 void _distributedLocationHandler(char *receivedMsg);
585 void _updateHomeRequestHandler(RestartRequest *updateRequest);
586 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
587 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
588 void _verifyAckHandler(VerifyAckMsg *verifyReply);
589 void _dummyMigrationHandler(DummyMigrationMsg *msg);
590
591 //TML: new functions for group-based message logging
592 void _restartHandler(RestartRequest *restartMsg);
593 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
594 void _recvRestartCheckpointHandler(char *_restartData);
595
596 //handler idxs for restart
597 extern int _getCheckpointHandlerIdx;
598 extern int _recvCheckpointHandlerIdx;
599 extern int _resendMessagesHandlerIdx;
600 extern int _resendReplyHandlerIdx;
601 extern int _receivedTNDataHandlerIdx;
602 extern int _receivedDetDataHandlerIdx;
603 extern int _distributedLocationHandlerIdx;
604 extern int _updateHomeRequestHandlerIdx;
605 extern int _updateHomeAckHandlerIdx;
606 extern int _verifyAckRequestHandlerIdx;
607 extern int _verifyAckHandlerIdx;
608 extern int _dummyMigrationHandlerIdx;
609
610 /// Load Balancing
611
612 //methods for load balancing
613 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
614 void finishedCheckpointLoadBalancing();
615 void sendMlogLocation(int targetPE,envelope *env);
616 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
617
618 //handlers for Load Balancing
619 void _receiveMlogLocationHandler(void *buf);
620 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
621 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
622 void _getGlobalStepHandler(LBStepMsg *msg);
623 void _recvGlobalStepHandler(LBStepMsg *msg);
624 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
625 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
626
627 //globals used for loadBalancing
628 extern int onGoingLoadBalancing;
629 extern void *centralLb;
630 extern void (*resumeLbFnPtr)(void *) ;
631 extern int _receiveMlogLocationHandlerIdx;
632 extern int _receiveMigrationNoticeHandlerIdx;
633 extern int _receiveMigrationNoticeAckHandlerIdx;
634 extern int _getGlobalStepHandlerIdx;
635 extern int _recvGlobalStepHandlerIdx;
636 extern int _checkpointBarrierHandlerIdx;
637 extern int _checkpointBarrierAckHandlerIdx;
638
639 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
640 extern CkVec<MigrationRecord> migratedNoticeList;
641 extern CkVec<RetainedMigratedObject *> retainedObjectList;
642
643 int getCheckPointPE();
644 inline int isSameDet(Determinant *first, Determinant *second);
645 void forAllCharesDo(MlogFn fnPointer,void *data);
646 envelope *copyEnvelope(envelope *env);
647 extern void _initDone(void);
648
649 //TML: needed for group restart
650 extern void _resetNodeBocInitVec(void);
651
652 //methods for updating location
653 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
654
655 //handlers for updating locations
656 void _receiveLocationHandler(CurrentLocationMsg *data);
657
658 //globals for updating locations
659 extern int _receiveLocationHandlerIdx;
660
661
662 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
663
664 #endif