4c44bbe6cccb4a6a9ba3a31065fecf8d804d07aa
[charm.git] / src / ck-core / ckcausalmlog.h
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
3
4 #include "ckobjid.h"
5
6 #if CMK_HAS_STRINGS_H
7 #include <strings.h>            /* defines bzero */
8 #else
9 #define bzero(s,n)   memset(s,0,n)
10 #endif
11
12 CpvExtern(Chare *,_currentObj);
13
14 //states of a ticket sent as a reply to a request
15 #define NEW_TICKET 1
16 #define OLD_TICKET 2
17 #define FORWARDED_TICKET 0x8000
18
19 //TML: global variable for the size of the team
20 #define MLOG_RESTARTED 0
21 #define MLOG_CRASHED 1
22 #define MEGABYTE 1048576
23
24 //array on which we print the formatted string representing an object id
25 extern char objString[100];
26
27 /**
28  * @brief
29  */
30 class Ticket {
31 public:
32         MCount TN;
33         int state;
34         Ticket(){
35                 TN = 0;
36                 state = 0;
37         }
38         Ticket(int x){
39                 TN = x;
40                 state = 0;
41         }
42 };
43 PUPbytes(Ticket)
44 class MlogEntry;
45
46 /**
47  * Log entry for local messages, can also be sent as a message.
48  * A message is local if:
49  * 1) It is sent between objects in the same processor.
50  * 2) It is sent between objects residing in processors of the same group
51  * (whenever group-based message logging is used).
52  */
53 typedef struct{
54         char header[CmiMsgHeaderSizeBytes];
55         CkObjID sender;
56         CkObjID recver;
57         MCount SN;
58         MCount TN;
59         MlogEntry *entry;
60         int senderPE;
61         int recverPE;
62 } LocalMessageLog;
63 PUPbytes(LocalMessageLog)
64
65 class MlogEntry;
66 class RestoredLocalMap;
67
68 #define INITSIZE_SNTOTICKET 100
69
70 /**
71  * @brief Class that maps SN (sequence numbers) to TN (ticket numbers)
72  * for a particular object.
73  */
74 class SNToTicket{
75         private:
76                 Ticket initial[INITSIZE_SNTOTICKET];
77                 Ticket *ticketVec;
78                 MCount startSN;
79                 int currentSize;
80                 MCount finishSN;
81         public:
82                 SNToTicket(){
83                         currentSize = INITSIZE_SNTOTICKET;
84                         ticketVec = &initial[0];
85                         bzero(ticketVec,sizeof(Ticket)*currentSize);
86                         startSN = 0;
87                         finishSN = 0;
88                 }
89                 /**
90                  * Gets the finishSN value.
91                  */ 
92                 inline MCount getFinishSN(){
93                         return finishSN;
94                 }
95                 /**
96                  * Gets the startSN value.
97                  */      
98                 inline MCount getStartSN(){
99                         return startSN;
100                 }
101                 //assume indices start from 1.. true for MCounts
102                 inline Ticket &put(MCount SN){
103                         if(SN > finishSN) finishSN = SN;
104                         if(startSN == 0){
105                                 startSN = SN;                           
106                         }
107                         int index = SN-startSN;
108                         if(index >= currentSize){
109                                 int oldSize = currentSize;
110                                 Ticket *old = ticketVec;
111                                 
112                                 currentSize = index*2;
113                                 ticketVec = new Ticket[currentSize];
114                                 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
115                                 if(old != &initial[0]){                                 
116                                         delete [] old;
117                                 }
118                         }
119                         return ticketVec[index];
120                 }
121
122                 inline Ticket get(MCount SN){
123                         int index = SN-startSN;
124                         CmiAssert(index >= 0);
125                         if(index >= currentSize){
126                                 Ticket tn;
127                                 return tn;
128                         }else{
129                                 return ticketVec[index];
130                         }
131                 }
132
133                 inline void pup(PUP::er &p){
134                         p | startSN;
135                         p | currentSize;
136                         if(p.isUnpacking()){
137                                 if(currentSize > INITSIZE_SNTOTICKET){
138                                         ticketVec = new Ticket[currentSize];
139                                 }
140                         }
141                         for(int i=0;i<currentSize;i++){
142                                 p | ticketVec[i];
143                         }
144                 }       
145 };
146
147
148 /**
149  * This file includes the definition of the class for storing the meta data
150  * associdated with the message logging protocol.
151  */
152
153
154 /**
155  * @brief This class stores all the message logging related data for a chare.
156  */
157 class ChareMlogData{
158 public:
159         // Object unique ID.
160         CkObjID objID;
161         // Counts how many tickets have been handed out.
162         MCount tCount; 
163         // Stores the highest ticket that has been processed.
164         MCount tProcessed;
165         
166         //TODO: pup receivedTNs
167         CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
168         MCount *ticketHoles;
169         int numberHoles;
170         int currentHoles;
171         CkVec<LocalMessageLog> restoredLocalMsgLog;
172         int maxRestoredLocalTN;
173         int resendReplyRecvd;// variable that keeps a count of the processors that have replied to a requests to resend messages. 
174         int restartFlag; /*0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time */
175     int teamRecoveryFlag; // 0 -> normal state .. 1 -> recovery of a team member        
176         CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable;
177         //TML: teamTable, stores the SN to TN mapping for messages intra team
178         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> teamTable;
179
180         int toResumeOrNot;
181         int resumeCount;
182
183 private:
184
185         // SNTable, stores the number of messages sent (sequence numbers) to other objects.
186         CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
187         // TNTable, stores the ticket associated with a particular combination <ObjectID,SN>.
188         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
189         // Log of messages sent.
190         CkQ<MlogEntry *> mlog;
191         
192                 
193         inline MCount newTN();
194
195 public:
196         /**
197          * Default constructor.
198          */ 
199         ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
200                 tCount = 0;
201                 tProcessed = 0;
202                 numberHoles = 0;
203                 ticketHoles = NULL;
204                 currentHoles = 0;
205                 restartFlag=0;
206                 teamRecoveryFlag=0;
207                 receivedTNs = NULL;
208                 resendReplyRecvd=0;
209                 maxRestoredLocalTN=0;
210                 toResumeOrNot=0;
211                 resumeCount=0;
212         };
213         inline MCount nextSN(const CkObjID &recver);
214         inline Ticket next_ticket(CkObjID &sender,MCount SN);
215         inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
216         void addLogEntry(MlogEntry *entry);
217         virtual void pup(PUP::er &p);
218         CkQ<MlogEntry *> *getMlog(){ return &mlog;};
219         MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
220         void addToRestoredLocalQ(LocalMessageLog *logEntry);
221         void sortRestoredLocalMsgLog();
222 };
223
224 /**
225  * @brief Entry in a message log
226  */
227 class MlogEntry{
228 public:
229         envelope *env;
230         int destPE;
231         int _infoIdx;
232         char unackedLocal;
233         
234         MlogEntry(envelope *_env,int _destPE,int __infoIdx){
235                 env = _env;
236                 destPE = _destPE;
237                 _infoIdx = __infoIdx;
238                 unackedLocal = 0;
239         }
240         MlogEntry(){
241                 env = 0;
242                 destPE = -1;
243                 _infoIdx = 0;
244                 unackedLocal = 0;
245         }
246         ~MlogEntry(){
247                 if(env){
248                         CmiFree(env);
249                 }
250         }
251         virtual void pup(PUP::er &p);
252 };
253
254 /**
255  * @brief 
256  */
257 class LocationID{
258 public:
259         CkArrayIndex idx;
260         CkGroupID gid;
261 };
262
263 /**
264  * @brief
265  */
266 class StoredCheckpoint{
267 public:
268         char *buf;
269         int bufSize;
270         int PE;
271         StoredCheckpoint(){
272                 buf = NULL;
273                 bufSize = 0;
274                 PE = -1;
275         };
276 };
277
278 /**
279  *  @brief Class for storing metadata of local messages.
280  *  It maps sequence numbers to ticket numbers.
281  *  It is used after a restart to maintain the same ticket numbers.
282  */
283 class RestoredLocalMap {
284 public:
285         MCount minSN,maxSN,count;
286         MCount *TNArray;
287         RestoredLocalMap(){
288                 minSN=maxSN=count=0;
289                 TNArray=NULL;
290         };
291         RestoredLocalMap(int i){
292                 minSN=maxSN=count=0;
293                 TNArray=NULL;
294         };
295
296         virtual void pup(PUP::er &p);
297 };
298
299
300 typedef struct {
301         char header[CmiMsgHeaderSizeBytes];
302         CkObjID sender;
303         CkObjID recver;
304         MlogEntry *logEntry;
305         MCount SN;
306         MCount TN;
307         int senderPE;
308 } TicketRequest;
309 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
310 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
311
312 typedef struct{
313         TicketRequest request;
314         Ticket ticket;
315         int recverPE;
316 } TicketReply;
317
318
319 CpvExtern(CkQ<LocalMessageLog> *,_localMessageLog); // used on buddy to store local message logs
320
321 CpvExtern(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
322 extern int _maxBufferedMessages; //Number of local message logs  to be buffered
323
324 CpvExtern(char**,_bufferedTicketRequests);
325 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
326
327
328
329 typedef struct {
330         char header[CmiMsgHeaderSizeBytes];
331         int numberLogs;
332 } BufferedLocalLogHeader;
333
334 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
335
336 typedef struct{
337         char header[CmiMsgHeaderSizeBytes];
338         MlogEntry *entry;               
339 } LocalMessageLogAck;
340
341 typedef struct{
342         char header[CmiMsgHeaderSizeBytes];
343         int PE;
344         int dataSize;
345 } CheckPointDataMsg;
346
347 /*typedef struct{
348         char header[CmiMsgHeaderSizeBytes];
349         int PE;
350         int dataSize;
351 } CheckPointAck;*/
352
353 typedef CheckPointDataMsg CheckPointAck;
354
355 typedef struct{
356         CkObjID recver;
357         MCount tProcessed;
358 } TProcessedLog;
359
360 /**
361  * Struct to request a particular action during restart.
362  */
363 typedef struct{
364         char header[CmiMsgHeaderSizeBytes];
365         int PE;
366 } RestartRequest;
367
368 typedef RestartRequest CkPingMsg;
369 typedef RestartRequest CheckpointRequest;
370
371 typedef struct{
372         char header[CmiMsgHeaderSizeBytes];
373         int PE;
374         double restartWallTime;
375         int checkPointSize;
376         int numMigratedAwayElements;
377         int numMigratedInElements;
378         int migratedElementSize;
379         int numLocalMessages;   
380         CkGroupID lbGroupID;
381 } RestartProcessorData;
382
383 typedef struct{
384         char header[CmiMsgHeaderSizeBytes];
385         int PE;
386         int numberObjects;
387 } ResendRequest;
388
389 typedef ResendRequest RemoveLogRequest;
390
391 typedef struct {
392         char header[CmiMsgHeaderSizeBytes];
393         CkObjID recver;
394         int numTNs;
395 } ReceivedTNData;
396
397 typedef struct{
398         int PE;
399         int numberObjects;
400         TProcessedLog *listObjects;
401         MCount *maxTickets;
402         CkVec<MCount> *ticketVecs;
403 } ResendData;
404
405 typedef struct {
406         CkGroupID gID;
407         CkArrayIndex idx;
408         int fromPE,toPE;
409         char ackFrom,ackTo;
410 } MigrationRecord;
411
412 typedef struct {
413         char header[CmiMsgHeaderSizeBytes];
414         MigrationRecord migRecord;
415         void *record;
416 } MigrationNotice;
417
418 typedef struct {
419         char header[CmiMsgHeaderSizeBytes];
420         void *record;
421 } MigrationNoticeAck;
422
423 typedef struct {
424         MigrationRecord migRecord;
425         void *msg;
426         int size;
427         char acked;
428 } RetainedMigratedObject;
429
430 typedef struct {
431         char header[CmiMsgHeaderSizeBytes];
432         MigrationRecord migRecord;
433         int index;
434         int fromPE;
435 } VerifyAckMsg;
436
437 typedef struct {
438         char header[CmiMsgHeaderSizeBytes];
439         int checkpointCount;
440         int fromPE;
441 } CheckpointBarrierMsg;
442
443
444 //message used to inform a locmgr of an object's current location
445 typedef struct {
446         char header[CmiMsgHeaderSizeBytes];
447         CkGroupID mgrID;
448         CkArrayIndex idx;
449         int locationPE;
450         int fromPE;
451 } CurrentLocationMsg;
452
453 typedef struct {
454         char header[CmiMsgHeaderSizeBytes];
455         CkGroupID lbID;
456         int fromPE;
457         int step;
458 } LBStepMsg;
459
460
461 #define MLOG_OBJECT 1
462 #define MLOG_COUNT 2
463
464 typedef struct {
465         char header[CmiMsgHeaderSizeBytes];
466         int flag;// specific object(1) or count(2)
467         CkGroupID lbID;
468         int count;// if just count
469         /**if object **/
470         CkGroupID mgrID;
471         CkArrayIndex idx;
472         int locationPE;
473 } DummyMigrationMsg;
474
475
476 //function pointer passed to the forAllCharesDo method.
477 //It takes a void *data and a ChareMlogData pointer 
478 //It gets called for each chare
479 typedef void (*MlogFn)(void *,ChareMlogData *);
480
481 void _messageLoggingInit();
482
483 //Methods for sending ticket requests
484 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
485 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
486 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
487 void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
488 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
489 void ticketLogLocalMessage(MlogEntry *entry);
490 void sendLocalMessageCopy(MlogEntry *entry);
491 void sendBufferedLocalMessageCopy();
492 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime);
493 void sendBufferedTicketRequests(int destPE);
494 void checkBufferedTicketRequests(void *_destPE,double curWallTime);
495
496
497
498
499 //handler idxs
500 extern int _ticketRequestHandlerIdx;
501 extern int _ticketHandlerIdx;
502 extern int _localMessageCopyHandlerIdx;
503 extern int _localMessageAckHandlerIdx;
504 extern int _bufferedLocalMessageCopyHandlerIdx;
505 extern int _bufferedLocalMessageAckHandlerIdx;
506 extern int _bufferedTicketRequestHandlerIdx;
507 extern int _bufferedTicketHandlerIdx;
508
509 //handler functions
510 void _ticketRequestHandler(TicketRequest *);
511 void _ticketHandler(TicketReply *);
512 void _localMessageCopyHandler(LocalMessageLog *);
513 void _localMessageAckHandler(LocalMessageLogAck *);
514 void _pingHandler(CkPingMsg *msg);
515 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
516 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
517 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
518 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
519
520
521 //methods for sending messages
522 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
523 extern void _noCldNodeEnqueue(int node, envelope *env);
524 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
525 void retryTicketRequest(void *_ticketRequest,double curWallTime);
526
527 //methods to process received messages with respect to mlog
528 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
529 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
530
531
532 //Checkpoint
533 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
534
535 //methods for checkpointing
536 void checkpointAlarm(void *_dummy,double curWallTime);
537 void startMlogCheckpoint(void *_dummy,double curWallTime);
538 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listSize=0);
539
540 //handler functions for checkpoint
541 void _checkpointRequestHandler(CheckpointRequest *request);
542 void _storeCheckpointHandler(char *msg);
543 void _checkpointAckHandler(CheckPointAck *ackMsg);
544 void _removeProcessedLogHandler(char *requestMsg);
545
546 //handler idxs for checkpoint
547 extern int _checkpointRequestHandlerIdx;
548 extern int _storeCheckpointHandlerIdx;
549 extern int _checkpointAckHandlerIdx;
550 extern int _removeProcessedLogHandlerIdx;
551
552 //Restart 
553
554
555 //methods for restart
556 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
557 void CkMlogRestartDouble(void *,double);
558 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
559 void initializeRestart(void *data,ChareMlogData *mlogData);
560 void distributeRestartedObjects();
561 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData);
562 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndex &idx,int locationPE);
563
564 //TML: function for locally calling the restart
565 void CkMlogRestartLocal();
566
567 //handler functions for restart
568 void _getCheckpointHandler(RestartRequest *restartMsg);
569 void _recvCheckpointHandler(char *_restartData);
570 void _resendMessagesHandler(char *msg);
571 void _resendReplyHandler(char *msg);
572 void _receivedTNDataHandler(ReceivedTNData *msg);
573 void _distributedLocationHandler(char *receivedMsg);
574 void _updateHomeRequestHandler(RestartRequest *updateRequest);
575 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
576 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
577 void _verifyAckHandler(VerifyAckMsg *verifyReply);
578 void _dummyMigrationHandler(DummyMigrationMsg *msg);
579
580 //TML: new functions for group-based message logging
581 void _restartHandler(RestartRequest *restartMsg);
582 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
583 void _recvRestartCheckpointHandler(char *_restartData);
584
585
586 //handler idxs for restart
587 extern int _getCheckpointHandlerIdx;
588 extern int _recvCheckpointHandlerIdx;
589 extern int _resendMessagesHandlerIdx;
590 extern int _resendReplyHandlerIdx;
591 extern int _receivedTNDataHandlerIdx;
592 extern int _distributedLocationHandlerIdx;
593 extern int _updateHomeRequestHandlerIdx;
594 extern int _updateHomeAckHandlerIdx;
595 extern int _verifyAckRequestHandlerIdx;
596 extern int _verifyAckHandlerIdx;
597 extern int _dummyMigrationHandlerIdx;
598
599
600 /// Load Balancing
601
602 //methods for load balancing
603 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
604 void finishedCheckpointLoadBalancing();
605 void sendMlogLocation(int targetPE,envelope *env);
606 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
607
608 //handlers for Load Balancing
609 void _receiveMlogLocationHandler(void *buf);
610 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
611 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
612 void _getGlobalStepHandler(LBStepMsg *msg);
613 void _recvGlobalStepHandler(LBStepMsg *msg);
614 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
615 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
616
617
618 //globals used for loadBalancing
619 extern int onGoingLoadBalancing;
620 extern void *centralLb;
621 extern void (*resumeLbFnPtr)(void *) ;
622 extern int _receiveMlogLocationHandlerIdx;
623 extern int _receiveMigrationNoticeHandlerIdx;
624 extern int _receiveMigrationNoticeAckHandlerIdx;
625 extern int _getGlobalStepHandlerIdx;
626 extern int _recvGlobalStepHandlerIdx;
627 extern int _checkpointBarrierHandlerIdx;
628 extern int _checkpointBarrierAckHandlerIdx;
629
630 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
631 extern CkVec<MigrationRecord> migratedNoticeList;
632 extern CkVec<RetainedMigratedObject *> retainedObjectList;
633
634 int getCheckPointPE();
635 void forAllCharesDo(MlogFn fnPointer,void *data);
636 envelope *copyEnvelope(envelope *env);
637 extern void _initDone(void);
638
639 //TML: needed for group restart
640 extern void _resetNodeBocInitVec(void);
641
642 //methods for updating location
643 void informLocationHome(CkGroupID mgrID,CkArrayIndex idx,int homePE,int currentPE);
644
645 //handlers for updating locations
646 void _receiveLocationHandler(CurrentLocationMsg *data);
647
648 //globals for updating locations
649 extern int _receiveLocationHandlerIdx;
650
651
652 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
653 inline void processRemoteMlogMessages(){
654         CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
655 }
656
657 #endif