Making use of both message-logging techniques homogeneous in Charm++ core.
[charm.git] / src / ck-core / ckmessagelogging.h
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
3
4 #include "ckobjid.h"
5
6 #if CMK_HAS_STRINGS_H
7 #include <strings.h>            /* defines bzero */
8 #else
9 #define bzero(s,n)   memset(s,0,n)
10 #endif
11
12 CpvExtern(Chare *,_currentObj);
13 CpvExtern(int, _numImmigrantRecObjs);
14
15 //states of a ticket sent as a reply to a request
16 #define NEW_TICKET 1
17 #define OLD_TICKET 2
18 #define FORWARDED_TICKET 0x8000
19
20 //TML: global variable for the size of the team
21 #define MLOG_RESTARTED 0
22 #define MLOG_CRASHED 1
23 #define MEGABYTE 1048576
24
25 //array on which we print the formatted string representing an object id
26 extern char objString[100];
27
28 // defines the initial size of _bufferedDets
29 #define INITIAL_BUFFERED_DETERMINANTS 1024
30
31 // constant to define the type of checkpoint used (synchronized or not)
32 #define SYNCHRONIZED_CHECKPOINT 1
33
34 /**
35  * @brief Struct to store the determinant of a particular message.
36  * The determinant remembers all the necessary information for a 
37  * message to be replayed in the same order as in the execution prior
38  * the failure.
39  */
40 typedef struct {
41         // sender ID
42         CkObjID sender;
43         // receiver ID
44         CkObjID receiver;
45         // SSN: sender sequence number
46         MCount SN;
47         // TN: ticket number (RSN: receiver sequence number)
48         MCount TN;
49 } Determinant;
50
51 /**
52  * @brief Typedef for the hashtable type that maps object IDs to determinants.
53  */
54 typedef CkHashtableT<CkHashtableAdaptorT<CkObjID>, CkVec<Determinant> *> CkDeterminantHashtableT;
55
56 /**
57  * @brief Struct for the header of the removeDeterminants handler
58  */
59 typedef struct {
60         char header[CmiMsgHeaderSizeBytes];
61         int phase;
62         int index;
63 } RemoveDeterminantsHeader;
64
65 /**
66  * @brief Struct for the header of the storeDeterminants handler
67  */
68 typedef struct {
69         char header[CmiMsgHeaderSizeBytes];
70         int number;
71         int index;
72         int phase;
73         int PE;
74 } StoreDeterminantsHeader;
75
76 /**
77  * @brief Structure for a ticket assigned to a particular message.
78  */
79 class Ticket {
80 public:
81         MCount TN;
82         int state;
83         Ticket(){
84                 TN = 0;
85                 state = 0;
86         }
87         Ticket(int x){
88                 TN = x;
89                 state = 0;
90         }
91 };
92 PUPbytes(Ticket)
93 class MlogEntry;
94
95 class RestoredLocalMap;
96
97 #define INITSIZE_SNTOTICKET 100
98
99 /**
100  * @brief Class that maps SN (sequence numbers) to TN (ticket numbers)
101  * for a particular object.
102  */
103 class SNToTicket{
104         private:
105                 Ticket initial[INITSIZE_SNTOTICKET];
106                 Ticket *ticketVec;
107                 MCount startSN;
108                 int currentSize;
109                 MCount finishSN;
110         public:
111                 SNToTicket(){
112                         currentSize = INITSIZE_SNTOTICKET;
113                         ticketVec = &initial[0];
114                         bzero(ticketVec,sizeof(Ticket)*currentSize);
115                         startSN = 0;
116                         finishSN = 0;
117                 }
118                 /**
119                  * Gets the finishSN value.
120                  */ 
121                 inline MCount getFinishSN(){
122                         return finishSN;
123                 }
124                 /**
125                  * Gets the startSN value.
126                  */      
127                 inline MCount getStartSN(){
128                         return startSN;
129                 }
130                 //assume indices start from 1.. true for MCounts
131                 inline Ticket &put(MCount SN){
132                         if(SN > finishSN) finishSN = SN;
133                         if(startSN == 0){
134                                 startSN = SN;                           
135                         }
136                         int index = SN-startSN;
137                         if(index >= currentSize){
138                                 int oldSize = currentSize;
139                                 Ticket *old = ticketVec;
140                                 
141                                 currentSize = index*2;
142                                 ticketVec = new Ticket[currentSize];
143                                 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
144                                 if(old != &initial[0]){                                 
145                                         delete [] old;
146                                 }
147                         }
148                         return ticketVec[index];
149                 }
150
151                 inline Ticket get(MCount SN){
152                         int index = SN-startSN;
153                         CmiAssert(index >= 0);
154                         if(index >= currentSize){
155                                 Ticket tn;
156                                 return tn;
157                         }else{
158                                 return ticketVec[index];
159                         }
160                 }
161
162                 inline void pup(PUP::er &p){
163                         p | startSN;
164                         p | currentSize;
165                         if(p.isUnpacking()){
166                                 if(currentSize > INITSIZE_SNTOTICKET){
167                                         ticketVec = new Ticket[currentSize];
168                                 }
169                         }
170                         for(int i=0;i<currentSize;i++){
171                                 p | ticketVec[i];
172                         }
173                 }       
174 };
175
176
177 /**
178  * This file includes the definition of the class for storing the meta data
179  * associdated with the message logging protocol.
180  */
181
182
183 /**
184  * @brief This class stores all the message logging related data for a chare.
185  */
186 class ChareMlogData{
187 public:
188         // Object unique ID.
189         CkObjID objID;
190         // Counts how many tickets have been handed out.
191         MCount tCount; 
192         // Stores the highest ticket that has been processed.
193         MCount tProcessed;
194         
195         //TODO: pup receivedTNs
196         CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
197         MCount *ticketHoles;
198         int numberHoles;
199         int currentHoles;
200         // variable that keeps a count of the processors that have replied to a requests to resend messages. 
201         int resendReplyRecvd;
202         // 0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time 
203         int restartFlag;
204         // 0 -> normal state .. 1 -> recovery of a team member 
205     int teamRecoveryFlag;       
206         //TML: teamTable, stores the SN to TN mapping for messages intra team
207         CkHashtableT<CkHashtableAdaptorT<CkObjID>, SNToTicket *> teamTable;
208
209         int toResumeOrNot;
210         int resumeCount;
211         int immigrantRecFlag;
212         int immigrantSourcePE;
213
214 private:
215
216         // SNTable, stores the number of messages sent (sequence numbers) to other objects.
217         CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
218         // TNTable, stores the ticket associated with a particular combination <ObjectID,SN>.
219         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
220         // Log of messages sent.
221         CkQ<MlogEntry *> mlog;
222         
223                 
224         inline MCount newTN();
225
226 public:
227         /**
228          * Default constructor.
229          */ 
230         ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
231                 tCount = 0;
232                 tProcessed = 0;
233                 numberHoles = 0;
234                 ticketHoles = NULL;
235                 currentHoles = 0;
236                 restartFlag=0;
237                 teamRecoveryFlag=0;
238                 receivedTNs = NULL;
239                 resendReplyRecvd=0;
240                 toResumeOrNot=0;
241                 resumeCount=0;
242                 immigrantRecFlag = 0;
243         };
244         inline MCount nextSN(const CkObjID &recver);
245         inline Ticket next_ticket(CkObjID &sender,MCount SN);
246         inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
247         inline Ticket getTicket(CkObjID &sender, MCount SN);
248         void addLogEntry(MlogEntry *entry);
249         virtual void pup(PUP::er &p);
250         CkQ<MlogEntry *> *getMlog(){ return &mlog;};
251         MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
252 };
253
254 /**
255  * @brief Entry in a message log. It also includes the index of the buffered
256  * determinants array and the number of appended determinants.
257  * @note: this message appended numBufDets counting downwards from indexBufDets.
258  * In other words, if indexBufDets == 5 and numBufDets = 3, it means that
259  * determinants bufDets[2], bufDets[3] and bufDets[4] were piggybacked.
260  */
261 class MlogEntry{
262 public:
263         envelope *env;
264         int destPE;
265         int _infoIdx;
266         int indexBufDets;
267         int numBufDets;
268         
269         MlogEntry(envelope *_env,int _destPE,int __infoIdx){
270                 env = _env;
271                 destPE = _destPE;
272                 _infoIdx = __infoIdx;
273         }
274         MlogEntry(){
275                 env = 0;
276                 destPE = -1;
277                 _infoIdx = 0;
278         }
279         ~MlogEntry(){
280                 if(env){
281                         CmiFree(env);
282                 }
283         }
284         virtual void pup(PUP::er &p);
285 };
286
287 /**
288  * @brief
289  */
290 class StoredCheckpoint{
291 public:
292         char *buf;
293         int bufSize;
294         int PE;
295         StoredCheckpoint(){
296                 buf = NULL;
297                 bufSize = 0;
298                 PE = -1;
299         };
300 };
301
302 /**
303  *  @brief Class for storing metadata of local messages.
304  *  It maps sequence numbers to ticket numbers.
305  *  It is used after a restart to maintain the same ticket numbers.
306  */
307 class RestoredLocalMap {
308 public:
309         MCount minSN,maxSN,count;
310         MCount *TNArray;
311         RestoredLocalMap(){
312                 minSN=maxSN=count=0;
313                 TNArray=NULL;
314         };
315         RestoredLocalMap(int i){
316                 minSN=maxSN=count=0;
317                 TNArray=NULL;
318         };
319
320         virtual void pup(PUP::er &p);
321 };
322
323
324 typedef struct {
325         char header[CmiMsgHeaderSizeBytes];
326         CkObjID sender;
327         CkObjID recver;
328         MlogEntry *logEntry;
329         MCount SN;
330         MCount TN;
331         int senderPE;
332 } TicketRequest;
333 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
334 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
335
336 typedef struct{
337         TicketRequest request;
338         Ticket ticket;
339         int recverPE;
340 } TicketReply;
341
342 CpvExtern(char**,_bufferedTicketRequests);
343 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
344
345
346
347 typedef struct {
348         char header[CmiMsgHeaderSizeBytes];
349         int numberLogs;
350 } BufferedLocalLogHeader;
351
352 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
353
354 typedef struct{
355         char header[CmiMsgHeaderSizeBytes];
356         int PE;
357         int dataSize;
358 } CheckPointDataMsg;
359
360 typedef struct{
361     char header[CmiMsgHeaderSizeBytes];
362     int PE;
363 } DistributeObjectMsg;
364
365
366 /*typedef struct{
367         char header[CmiMsgHeaderSizeBytes];
368         int PE;
369         int dataSize;
370 } CheckPointAck;*/
371
372 typedef CheckPointDataMsg CheckPointAck;
373
374 typedef struct{
375         CkObjID recver;
376         MCount tProcessed;
377 } TProcessedLog;
378
379
380 /**
381  * Struct to request a particular action during restart.
382  */
383 typedef struct{
384         char header[CmiMsgHeaderSizeBytes];
385         int PE;
386 } RestartRequest;
387
388 typedef RestartRequest CkPingMsg;
389 typedef RestartRequest CheckpointRequest;
390
391 typedef struct{
392         char header[CmiMsgHeaderSizeBytes];
393         int PE;
394         double restartWallTime;
395         int checkPointSize;
396         int numMigratedAwayElements;
397         int numMigratedInElements;
398         int migratedElementSize;
399         int numLocalMessages;   
400         CkGroupID lbGroupID;
401 } RestartProcessorData;
402
403 typedef struct{
404         char header[CmiMsgHeaderSizeBytes];
405         int PE;
406         int numberObjects;
407 } ResendRequest;
408
409 typedef ResendRequest RemoveLogRequest;
410
411 typedef struct {
412         char header[CmiMsgHeaderSizeBytes];
413         CkObjID recver;
414         int numTNs;
415 } ReceivedTNData;
416
417 // Structure to forward determinants in parallel restart
418 typedef struct {
419         char header[CmiMsgHeaderSizeBytes];
420         CkObjID recver;
421         int numDets;
422 } ReceivedDetData;
423
424 typedef struct{
425         int PE;
426         int numberObjects;
427         TProcessedLog *listObjects;
428         CkVec<MCount> *ticketVecs;
429 } ResendData;
430
431 typedef struct {
432         CkGroupID gID;
433         CkArrayIndexMax idx;
434         int fromPE,toPE;
435         char ackFrom,ackTo;
436 } MigrationRecord;
437
438 typedef struct {
439         char header[CmiMsgHeaderSizeBytes];
440         MigrationRecord migRecord;
441         void *record;
442 } MigrationNotice;
443
444 typedef struct {
445         char header[CmiMsgHeaderSizeBytes];
446         void *record;
447 } MigrationNoticeAck;
448
449 typedef struct {
450         MigrationRecord migRecord;
451         void *msg;
452         int size;
453         char acked;
454 } RetainedMigratedObject;
455
456 typedef struct {
457         char header[CmiMsgHeaderSizeBytes];
458         MigrationRecord migRecord;
459         int index;
460         int fromPE;
461 } VerifyAckMsg;
462
463 typedef struct {
464         char header[CmiMsgHeaderSizeBytes];
465         int checkpointCount;
466         int fromPE;
467 } CheckpointBarrierMsg;
468
469
470 //message used to inform a locmgr of an object's current location
471 typedef struct {
472         char header[CmiMsgHeaderSizeBytes];
473         CkGroupID mgrID;
474         CkArrayIndexMax idx;
475         int locationPE;
476         int fromPE;
477 } CurrentLocationMsg;
478
479 typedef struct {
480         char header[CmiMsgHeaderSizeBytes];
481         CkGroupID lbID;
482         int fromPE;
483         int step;
484 } LBStepMsg;
485
486
487 #define MLOG_OBJECT 1
488 #define MLOG_COUNT 2
489
490 typedef struct {
491         char header[CmiMsgHeaderSizeBytes];
492         int flag;// specific object(1) or count(2)
493         CkGroupID lbID;
494         int count;// if just count
495         /**if object **/
496         CkGroupID mgrID;
497         CkArrayIndexMax idx;
498         int locationPE;
499 } DummyMigrationMsg;
500
501
502 //function pointer passed to the forAllCharesDo method.
503 //It takes a void *data and a ChareMlogData pointer 
504 //It gets called for each chare
505 typedef void (*MlogFn)(void *,ChareMlogData *);
506
507 void _messageLoggingInit();
508
509 //Methods for sending ticket requests
510 void sendGroupMsg(envelope *env,int destPE,int _infoIdx);
511 void sendArrayMsg(envelope *env,int destPE,int _infoIdx);
512 void sendChareMsg(envelope *env,int destPE,int _infoIdx, const CkChareID *pCid);
513 void sendNodeGroupMsg(envelope *env,int destNode,int _infoIdx);
514 void sendCommonMsg(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
515 void sendMsg(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
516 void sendLocalMsg(envelope *env, int _infoIdx);
517
518 //handler functions
519 void _ticketRequestHandler(TicketRequest *);
520 void _ticketHandler(TicketReply *);
521 void _pingHandler(CkPingMsg *msg);
522 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
523 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
524 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
525 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
526 void _storeDeterminantsHandler(char *buffer);
527 void _removeDeterminantsHandler(char *buffer);
528
529
530 //methods for sending messages
531 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
532 extern void _noCldNodeEnqueue(int node, envelope *env);
533 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
534 void retryTicketRequest(void *_ticketRequest,double curWallTime);
535
536 //methods to process received messages with respect to mlog
537 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
538 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
539
540
541 //Checkpoint
542 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
543
544 //methods for checkpointing
545 void checkpointAlarm(void *_dummy,double curWallTime);
546 void startMlogCheckpoint(void *_dummy,double curWallTime);
547 void pupArrayElementsSkip(PUP::er &p, CmiBool create, MigrationRecord *listToSkip,int listSize=0);
548
549 //handler functions for checkpoint
550 void _checkpointRequestHandler(CheckpointRequest *request);
551 void _storeCheckpointHandler(char *msg);
552 void _checkpointAckHandler(CheckPointAck *ackMsg);
553 void _removeProcessedLogHandler(char *requestMsg);
554 void garbageCollectMlog();
555
556 //handler idxs for checkpoint
557 extern int _checkpointRequestHandlerIdx;
558 extern int _storeCheckpointHandlerIdx;
559 extern int _checkpointAckHandlerIdx;
560 extern int _removeProcessedLogHandlerIdx;
561
562 //Restart 
563
564
565 //methods for restart
566 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
567 void CkMlogRestartDouble(void *,double);
568 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
569 void processReceivedDet(Chare *obj,int vecsize, Determinant *listDets);
570 void initializeRestart(void *data,ChareMlogData *mlogData);
571 void distributeRestartedObjects();
572 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
573
574 //TML: function for locally calling the restart
575 void CkMlogRestartLocal();
576
577 //handler functions for restart
578 void _getCheckpointHandler(RestartRequest *restartMsg);
579 void _recvCheckpointHandler(char *_restartData);
580 void _resendMessagesHandler(char *msg);
581 void _sendDetsHandler(char *msg);
582 void _sendDetsReplyHandler(char *msg);
583 void _receivedTNDataHandler(ReceivedTNData *msg);
584 void _receivedDetDataHandler(ReceivedDetData *msg);
585 void _distributedLocationHandler(char *receivedMsg);
586 void _sendBackLocationHandler(char *receivedMsg);
587 void _updateHomeRequestHandler(RestartRequest *updateRequest);
588 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
589 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
590 void _verifyAckHandler(VerifyAckMsg *verifyReply);
591 void _dummyMigrationHandler(DummyMigrationMsg *msg);
592
593 //TML: new functions for group-based message logging
594 void _restartHandler(RestartRequest *restartMsg);
595 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
596 void _recvRestartCheckpointHandler(char *_restartData);
597
598 //handler idxs for restart
599 extern int _getCheckpointHandlerIdx;
600 extern int _recvCheckpointHandlerIdx;
601 extern int _resendMessagesHandlerIdx;
602 extern int _sendDetsHandlerIdx;
603 extern int _sendDetsReplyHandlerIdx;
604 extern int _receivedTNDataHandlerIdx;
605 extern int _receivedDetDataHandlerIdx;
606 extern int _distributedLocationHandlerIdx;
607 extern int _updateHomeRequestHandlerIdx;
608 extern int _updateHomeAckHandlerIdx;
609 extern int _verifyAckRequestHandlerIdx;
610 extern int _verifyAckHandlerIdx;
611 extern int _dummyMigrationHandlerIdx;
612
613 /// Load Balancing
614
615 //methods for load balancing
616 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
617 void finishedCheckpointLoadBalancing();
618 void sendMlogLocation(int targetPE,envelope *env);
619 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
620 void restoreParallelRecovery(void (*fnPtr)(void *),void *_centralLb);
621
622 //handlers for Load Balancing
623 void _receiveMlogLocationHandler(void *buf);
624 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
625 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
626 void _getGlobalStepHandler(LBStepMsg *msg);
627 void _recvGlobalStepHandler(LBStepMsg *msg);
628 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
629 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
630
631 //globals used for loadBalancing
632 extern int onGoingLoadBalancing;
633 extern void *centralLb;
634 extern void (*resumeLbFnPtr)(void *) ;
635 extern int _receiveMlogLocationHandlerIdx;
636 extern int _receiveMigrationNoticeHandlerIdx;
637 extern int _receiveMigrationNoticeAckHandlerIdx;
638 extern int _getGlobalStepHandlerIdx;
639 extern int _recvGlobalStepHandlerIdx;
640 extern int _checkpointBarrierHandlerIdx;
641 extern int _checkpointBarrierAckHandlerIdx;
642
643 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
644 extern CkVec<MigrationRecord> migratedNoticeList;
645 extern CkVec<RetainedMigratedObject *> retainedObjectList;
646
647 int getCheckPointPE();
648 inline int isSameDet(Determinant *first, Determinant *second);
649 void forAllCharesDo(MlogFn fnPointer,void *data);
650 envelope *copyEnvelope(envelope *env);
651 extern void _initDone(void);
652
653 //TML: needed for group restart
654 extern void _resetNodeBocInitVec(void);
655
656 //methods for updating location
657 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
658
659 //handlers for updating locations
660 void _receiveLocationHandler(CurrentLocationMsg *data);
661
662 //globals for updating locations
663 extern int _receiveLocationHandlerIdx;
664
665
666 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
667
668 #endif