Added some support for team-based message logging.
[charm.git] / src / ck-core / ckmessagelogging.h
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
3
4 #include "ckobjid.h"
5
6 #if CMK_HAS_STRINGS_H
7 #include <strings.h>            /* defines bzero */
8 #else
9 #define bzero(s,n)   memset(s,0,n)
10 #endif
11
12 CpvExtern(Chare *,_currentObj);
13
14 //states of a ticket sent as a reply to a request
15 #define NEW_TICKET 1
16 #define OLD_TICKET 2
17 #define FORWARDED_TICKET 0x8000
18
19 //TML: global variable for the size of the team
20 #define TEAM_SIZE_MLOG 4
21 #define MLOG_RESTARTED 0
22 #define MLOG_CRASHED 1
23 #define MEGABYTE 1048576
24
25 //array on which we print the formatted string representing an object id
26 extern char objString[100];
27
28 /**
29  * @brief
30  */
31 class Ticket {
32 public:
33         MCount TN;
34         int state;
35         Ticket(){
36                 TN = 0;
37                 state = 0;
38         }
39         Ticket(int x){
40                 TN = x;
41                 state = 0;
42         }
43 };
44 PUPbytes(Ticket);
45 class MlogEntry;
46
47 /**
48  * Log entry for local messages, can also be sent as a message.
49  * A message is local if:
50  * 1) It is sent between objects in the same processor.
51  * 2) It is sent between objects residing in processors of the same group
52  * (whenever group-based message logging is used).
53  */
54 typedef struct{
55         char header[CmiMsgHeaderSizeBytes];
56         CkObjID sender;
57         CkObjID recver;
58         MCount SN;
59         MCount TN;
60         MlogEntry *entry;
61         int senderPE;
62         int recverPE;
63 } LocalMessageLog;
64 PUPbytes(LocalMessageLog);
65
66 class MlogEntry;
67 class RestoredLocalMap;
68
69 #define INITSIZE_SNTOTICKET 100
70
71 /**
72  * @brief Class that maps SN (sequence numbers) to TN (ticket numbers)
73  * for a particular object.
74  */
75 class SNToTicket{
76         private:
77                 Ticket initial[INITSIZE_SNTOTICKET];
78                 Ticket *ticketVec;
79                 MCount startSN;
80                 int currentSize;
81                 MCount finishSN;
82         public:
83                 SNToTicket(){
84                         currentSize = INITSIZE_SNTOTICKET;
85                         ticketVec = &initial[0];
86                         bzero(ticketVec,sizeof(Ticket)*currentSize);
87                         startSN = 0;
88                         finishSN = 0;
89                 }
90                 /**
91                  * Gets the finishSN value.
92                  */ 
93                 inline MCount getFinishSN(){
94                         return finishSN;
95                 }
96                 /**
97                  * Gets the startSN value.
98                  */      
99                 inline MCount getStartSN(){
100                         return startSN;
101                 }
102                 //assume indices start from 1.. true for MCounts
103                 inline Ticket &put(MCount SN){
104                         if(SN > finishSN) finishSN = SN;
105                         if(startSN == 0){
106                                 startSN = SN;                           
107                         }
108                         int index = SN-startSN;
109                         if(index >= currentSize){
110                                 int oldSize = currentSize;
111                                 Ticket *old = ticketVec;
112                                 
113                                 currentSize = index*2;
114                                 ticketVec = new Ticket[currentSize];
115                                 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
116                                 if(old != &initial[0]){                                 
117                                         delete [] old;
118                                 }
119                         }
120                         return ticketVec[index];
121                 }
122
123                 inline Ticket get(MCount SN){
124                         int index = SN-startSN;
125                         CmiAssert(index >= 0);
126                         if(index >= currentSize){
127                                 Ticket tn;
128                                 return tn;
129                         }else{
130                                 return ticketVec[index];
131                         }
132                 }
133
134                 inline void pup(PUP::er &p){
135                         p | startSN;
136                         p | currentSize;
137                         if(p.isUnpacking()){
138                                 if(currentSize > INITSIZE_SNTOTICKET){
139                                         ticketVec = new Ticket[currentSize];
140                                 }
141                         }
142                         for(int i=0;i<currentSize;i++){
143                                 p | ticketVec[i];
144                         }
145                 }       
146 };
147
148
149 /**
150  * This file includes the definition of the class for storing the meta data
151  * associdated with the message logging protocol.
152  */
153
154
155 /**
156  * @brief This class stores all the message logging related data for a chare.
157  */
158 class ChareMlogData{
159 public:
160         // Object unique ID.
161         CkObjID objID;
162         // Counts how many tickets have been handed out.
163         MCount tCount; 
164         // Stores the highest ticket that has been processed.
165         MCount tProcessed;
166         
167         //TODO: pup receivedTNs
168         CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
169         MCount *ticketHoles;
170         int numberHoles;
171         int currentHoles;
172         CkVec<LocalMessageLog> restoredLocalMsgLog;
173         int maxRestoredLocalTN;
174         int resendReplyRecvd;// variable that keeps a count of the processors that have replied to a requests to resend messages. 
175         int restartFlag; /*0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time */
176         CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable; 
177         //TML: teamTable, stores the SN to TN mapping for messages intra team
178         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> teamTable;
179
180         int toResumeOrNot;
181         int resumeCount;
182
183 private:
184
185         // SNTable, stores the number of messages sent (sequence numbers) to other objects.
186         CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
187         // TNTable, stores the ticket associated with a particular combination <ObjectID,SN>.
188         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
189         // Log of messages sent.
190         CkQ<MlogEntry *> mlog;
191         
192                 
193         inline MCount newTN();
194
195 public:
196         /**
197          * Default constructor.
198          */ 
199         ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
200                 tCount = 0;
201                 tProcessed = 0;
202                 numberHoles = 0;
203                 ticketHoles = NULL;
204                 currentHoles = 0;
205                 restartFlag=0;
206                 receivedTNs = NULL;
207                 resendReplyRecvd=0;
208                 maxRestoredLocalTN=0;
209                 toResumeOrNot=0;
210                 resumeCount=0;
211         };
212         inline MCount nextSN(const CkObjID &recver);
213         inline Ticket next_ticket(CkObjID &sender,MCount SN);
214         void addLogEntry(MlogEntry *entry);
215         virtual void pup(PUP::er &p);
216         CkQ<MlogEntry *> *getMlog(){ return &mlog;};
217         MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
218         void addToRestoredLocalQ(LocalMessageLog *logEntry);
219         void sortRestoredLocalMsgLog();
220 };
221
222 /**
223  * @brief Entry in a message log
224  */
225 class MlogEntry{
226 public:
227         envelope *env;
228         int destPE;
229         int _infoIdx;
230         char unackedLocal;
231         
232         MlogEntry(envelope *_env,int _destPE,int __infoIdx){
233                 env = _env;
234                 destPE = _destPE;
235                 _infoIdx = __infoIdx;
236                 unackedLocal = 0;
237         }
238         MlogEntry(){
239                 env = 0;
240                 destPE = -1;
241                 _infoIdx = 0;
242                 unackedLocal = 0;
243         }
244         ~MlogEntry(){
245                 if(env){
246                         CmiFree(env);
247                 }
248         }
249         virtual void pup(PUP::er &p);
250 };
251
252 /**
253  * @brief 
254  */
255 class LocationID{
256 public:
257         CkArrayIndexMax idx;
258         CkGroupID gid;
259 };
260
261 /**
262  * @brief
263  */
264 class StoredCheckpoint{
265 public:
266         char *buf;
267         int bufSize;
268         int PE;
269         StoredCheckpoint(){
270                 buf = NULL;
271                 bufSize = 0;
272                 PE = -1;
273         };
274 };
275
276 /**
277  *  @brief 
278  */
279 class RestoredLocalMap {
280 public:
281         MCount minSN,maxSN,count;
282         MCount *TNArray;
283         RestoredLocalMap(){
284                 minSN=maxSN=count=0;
285                 TNArray=NULL;
286         };
287         RestoredLocalMap(int i){
288                 minSN=maxSN=count=0;
289                 TNArray=NULL;
290         };
291
292         virtual void pup(PUP::er &p);
293 };
294
295
296 typedef struct {
297         char header[CmiMsgHeaderSizeBytes];
298         CkObjID sender;
299         CkObjID recver;
300         MlogEntry *logEntry;
301         MCount SN;
302         int senderPE;
303 } TicketRequest;
304 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
305 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
306
307 typedef struct{
308         TicketRequest request;
309         Ticket ticket;
310         int recverPE;
311 } TicketReply;
312
313
314 CpvExtern(CkQ<LocalMessageLog> *,_localMessageLog); // used on buddy to store local message logs
315
316 CpvExtern(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
317 extern int _maxBufferedMessages; //Number of local message logs  to be buffered
318
319 CpvExtern(char**,_bufferedTicketRequests);
320 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
321
322
323
324 typedef struct {
325         char header[CmiMsgHeaderSizeBytes];
326         int numberLogs;
327 } BufferedLocalLogHeader;
328
329 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
330
331 typedef struct{
332         char header[CmiMsgHeaderSizeBytes];
333         MlogEntry *entry;               
334 } LocalMessageLogAck;
335
336 typedef struct{
337         char header[CmiMsgHeaderSizeBytes];
338         int PE;
339         int dataSize;
340 } CheckPointDataMsg;
341
342 /*typedef struct{
343         char header[CmiMsgHeaderSizeBytes];
344         int PE;
345         int dataSize;
346 } CheckPointAck;*/
347
348 typedef CheckPointDataMsg CheckPointAck;
349
350 typedef struct{
351         CkObjID recver;
352         MCount tProcessed;
353 } TProcessedLog;
354
355 /**
356  * Struct to request a particular action during restart.
357  */
358 typedef struct{
359         char header[CmiMsgHeaderSizeBytes];
360         int PE;
361 } RestartRequest;
362
363 typedef RestartRequest CkPingMsg;
364 typedef RestartRequest CheckpointRequest;
365
366 typedef struct{
367         char header[CmiMsgHeaderSizeBytes];
368         int PE;
369         double restartWallTime;
370         int checkPointSize;
371         int numMigratedAwayElements;
372         int numMigratedInElements;
373         int migratedElementSize;
374         int numLocalMessages;   
375         CkGroupID lbGroupID;
376 } RestartProcessorData;
377
378 typedef struct{
379         char header[CmiMsgHeaderSizeBytes];
380         int PE;
381         int numberObjects;
382 } ResendRequest;
383
384 typedef ResendRequest RemoveLogRequest;
385
386 typedef struct {
387         char header[CmiMsgHeaderSizeBytes];
388         CkObjID recver;
389         int numTNs;
390 } ReceivedTNData;
391
392 typedef struct{
393         int PE;
394         int numberObjects;
395         TProcessedLog *listObjects;
396         MCount *maxTickets;
397         CkVec<MCount> *ticketVecs;
398 } ResendData;
399
400 typedef struct {
401         CkGroupID gID;
402         CkArrayIndexMax idx;
403         int fromPE,toPE;
404         char ackFrom,ackTo;
405 } MigrationRecord;
406
407 typedef struct {
408         char header[CmiMsgHeaderSizeBytes];
409         MigrationRecord migRecord;
410         void *record;
411 } MigrationNotice;
412
413 typedef struct {
414         char header[CmiMsgHeaderSizeBytes];
415         void *record;
416 } MigrationNoticeAck;
417
418 typedef struct {
419         MigrationRecord migRecord;
420         void *msg;
421         int size;
422         char acked;
423 } RetainedMigratedObject;
424
425 typedef struct {
426         char header[CmiMsgHeaderSizeBytes];
427         MigrationRecord migRecord;
428         int index;
429         int fromPE;
430 } VerifyAckMsg;
431
432 typedef struct {
433         char header[CmiMsgHeaderSizeBytes];
434         int checkpointCount;
435         int fromPE;
436 } CheckpointBarrierMsg;
437
438
439 //message used to inform a locmgr of an object's current location
440 typedef struct {
441         char header[CmiMsgHeaderSizeBytes];
442         CkGroupID mgrID;
443         CkArrayIndexMax idx;
444         int locationPE;
445         int fromPE;
446 } CurrentLocationMsg;
447
448 typedef struct {
449         char header[CmiMsgHeaderSizeBytes];
450         CkGroupID lbID;
451         int fromPE;
452         int step;
453 } LBStepMsg;
454
455
456 #define MLOG_OBJECT 1
457 #define MLOG_COUNT 2
458
459 typedef struct {
460         char header[CmiMsgHeaderSizeBytes];
461         int flag;// specific object(1) or count(2)
462         CkGroupID lbID;
463         int count;// if just count
464         /**if object **/
465         CkGroupID mgrID;
466         CkArrayIndexMax idx;
467         int locationPE;
468 } DummyMigrationMsg;
469
470
471 //function pointer passed to the forAllCharesDo method.
472 //It takes a void *data and a ChareMlogData pointer 
473 //It gets called for each chare
474 typedef void (*MlogFn)(void *,ChareMlogData *);
475
476 void _messageLoggingInit();
477
478 //Methods for sending ticket requests
479 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
480 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
481 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
482 void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
483 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend);
484 void ticketLogLocalMessage(MlogEntry *entry);
485 void sendLocalMessageCopy(MlogEntry *entry);
486 void sendBufferedLocalMessageCopy();
487 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime);
488 void sendBufferedTicketRequests(int destPE);
489 void checkBufferedTicketRequests(void *_destPE,double curWallTime);
490
491
492
493
494 //handler idxs
495 extern int _ticketRequestHandlerIdx;
496 extern int _ticketHandlerIdx;
497 extern int _localMessageCopyHandlerIdx;
498 extern int _localMessageAckHandlerIdx;
499 extern int _bufferedLocalMessageCopyHandlerIdx;
500 extern int _bufferedLocalMessageAckHandlerIdx;
501 extern int _bufferedTicketRequestHandlerIdx;
502 extern int _bufferedTicketHandlerIdx;
503
504 //handler functions
505 void _ticketRequestHandler(TicketRequest *);
506 void _ticketHandler(TicketReply *);
507 void _localMessageCopyHandler(LocalMessageLog *);
508 void _localMessageAckHandler(LocalMessageLogAck *);
509 void _pingHandler(CkPingMsg *msg);
510 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
511 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
512 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
513 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
514
515
516 //methods for sending messages
517 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
518 extern void _noCldNodeEnqueue(int node, envelope *env);
519 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
520 void retryTicketRequest(void *_ticketRequest,double curWallTime);
521
522 //methods to process received messages with respect to mlog
523 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
524 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
525
526
527 //Checkpoint
528 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
529
530 //methods for checkpointing
531 void checkpointAlarm(void *_dummy,double curWallTime);
532 void startMlogCheckpoint(void *_dummy,double curWallTime);
533 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listSize=0);
534
535 //handler functions for checkpoint
536 void _checkpointRequestHandler(CheckpointRequest *request);
537 void _storeCheckpointHandler(char *msg);
538 void _checkpointAckHandler(CheckPointAck *ackMsg);
539 void _removeProcessedLogHandler(char *requestMsg);
540
541 //handler idxs for checkpoint
542 extern int _checkpointRequestHandlerIdx;
543 extern int _storeCheckpointHandlerIdx;
544 extern int _checkpointAckHandlerIdx;
545 extern int _removeProcessedLogHandlerIdx;
546
547 //Restart 
548
549
550 //methods for restart
551 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
552 void CkMlogRestartDouble(void *,double);
553 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
554 void initializeRestart(void *data,ChareMlogData *mlogData);
555 void distributeRestartedObjects();
556 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData);
557 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
558
559 //TML: function for locally calling the restart
560 void CkMlogRestartLocal();
561
562 //handler functions for restart
563 void _getCheckpointHandler(RestartRequest *restartMsg);
564 void _recvCheckpointHandler(char *_restartData);
565 void _resendMessagesHandler(char *msg);
566 void _resendReplyHandler(char *msg);
567 void _receivedTNDataHandler(ReceivedTNData *msg);
568 void _distributedLocationHandler(char *receivedMsg);
569 void _updateHomeRequestHandler(RestartRequest *updateRequest);
570 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
571 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
572 void _verifyAckHandler(VerifyAckMsg *verifyReply);
573 void _dummyMigrationHandler(DummyMigrationMsg *msg);
574
575 //TML: new functions for group-based message logging
576 void _restartHandler(RestartRequest *restartMsg);
577 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
578 void _recvRestartCheckpointHandler(char *_restartData);
579
580
581 //handler idxs for restart
582 extern int _getCheckpointHandlerIdx;
583 extern int _recvCheckpointHandlerIdx;
584 extern int _resendMessagesHandlerIdx;
585 extern int _resendReplyHandlerIdx;
586 extern int _receivedTNDataHandlerIdx;
587 extern int _distributedLocationHandlerIdx;
588 extern int _updateHomeRequestHandlerIdx;
589 extern int _updateHomeAckHandlerIdx;
590 extern int _verifyAckRequestHandlerIdx;
591 extern int _verifyAckHandlerIdx;
592 extern int _dummyMigrationHandlerIdx;
593
594
595 /// Load Balancing
596
597 //methods for load balancing
598 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
599 void finishedCheckpointLoadBalancing();
600 void sendMlogLocation(int targetPE,envelope *env);
601 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
602
603 //handlers for Load Balancing
604 void _receiveMlogLocationHandler(void *buf);
605 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
606 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
607 void _getGlobalStepHandler(LBStepMsg *msg);
608 void _recvGlobalStepHandler(LBStepMsg *msg);
609 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
610 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
611
612
613 //globals used for loadBalancing
614 extern int onGoingLoadBalancing;
615 extern void *centralLb;
616 extern void (*resumeLbFnPtr)(void *) ;
617 extern int _receiveMlogLocationHandlerIdx;
618 extern int _receiveMigrationNoticeHandlerIdx;
619 extern int _receiveMigrationNoticeAckHandlerIdx;
620 extern int _getGlobalStepHandlerIdx;
621 extern int _recvGlobalStepHandlerIdx;
622 extern int _checkpointBarrierHandlerIdx;
623 extern int _checkpointBarrierAckHandlerIdx;
624
625 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
626 extern CkVec<MigrationRecord> migratedNoticeList;
627 extern CkVec<RetainedMigratedObject *> retainedObjectList;
628
629 int getCheckPointPE();
630 void forAllCharesDo(MlogFn fnPointer,void *data);
631 envelope *copyEnvelope(envelope *env);
632 extern void _initDone(void);
633
634 //TML: needed for group restart
635 extern void _resetNodeBocInitVec(void);
636
637 //methods for updating location
638 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
639
640 //handlers for updating locations
641 void _receiveLocationHandler(CurrentLocationMsg *data);
642
643 //globals for updating locations
644 extern int _receiveLocationHandlerIdx;
645
646
647 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
648 inline void processRemoteMlogMessages(){
649         CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
650 };
651
652 #endif