Pedantry: no ';' after PUPbytes(foo) or function definitions
[charm.git] / src / ck-core / ckmessagelogging.h
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
3
4 #include "ckobjid.h"
5
6 #if CMK_HAS_STRINGS_H
7 #include <strings.h>            /* defines bzero */
8 #else
9 #define bzero(s,n)   memset(s,0,n)
10 #endif
11
12 CpvExtern(Chare *,_currentObj);
13
14 //states of a ticket sent as a reply to a request
15 #define NEW_TICKET 1
16 #define OLD_TICKET 2
17 #define FORWARDED_TICKET 0x8000
18
19 //TML: global variable for the size of the team
20 #define TEAM_SIZE_MLOG 1
21 #define MLOG_RESTARTED 0
22 #define MLOG_CRASHED 1
23 #define MEGABYTE 1048576
24
25 //array on which we print the formatted string representing an object id
26 extern char objString[100];
27
28 /**
29  * @brief
30  */
31 class Ticket {
32 public:
33         MCount TN;
34         int state;
35         Ticket(){
36                 TN = 0;
37                 state = 0;
38         }
39         Ticket(int x){
40                 TN = x;
41                 state = 0;
42         }
43 };
44 PUPbytes(Ticket)
45 class MlogEntry;
46
47 /**
48  * Log entry for local messages, can also be sent as a message.
49  * A message is local if:
50  * 1) It is sent between objects in the same processor.
51  * 2) It is sent between objects residing in processors of the same group
52  * (whenever group-based message logging is used).
53  */
54 typedef struct{
55         char header[CmiMsgHeaderSizeBytes];
56         CkObjID sender;
57         CkObjID recver;
58         MCount SN;
59         MCount TN;
60         MlogEntry *entry;
61         int senderPE;
62         int recverPE;
63 } LocalMessageLog;
64 PUPbytes(LocalMessageLog)
65
66 class MlogEntry;
67 class RestoredLocalMap;
68
69 #define INITSIZE_SNTOTICKET 100
70
71 /**
72  * @brief Class that maps SN (sequence numbers) to TN (ticket numbers)
73  * for a particular object.
74  */
75 class SNToTicket{
76         private:
77                 Ticket initial[INITSIZE_SNTOTICKET];
78                 Ticket *ticketVec;
79                 MCount startSN;
80                 int currentSize;
81                 MCount finishSN;
82         public:
83                 SNToTicket(){
84                         currentSize = INITSIZE_SNTOTICKET;
85                         ticketVec = &initial[0];
86                         bzero(ticketVec,sizeof(Ticket)*currentSize);
87                         startSN = 0;
88                         finishSN = 0;
89                 }
90                 /**
91                  * Gets the finishSN value.
92                  */ 
93                 inline MCount getFinishSN(){
94                         return finishSN;
95                 }
96                 /**
97                  * Gets the startSN value.
98                  */      
99                 inline MCount getStartSN(){
100                         return startSN;
101                 }
102                 //assume indices start from 1.. true for MCounts
103                 inline Ticket &put(MCount SN){
104                         if(SN > finishSN) finishSN = SN;
105                         if(startSN == 0){
106                                 startSN = SN;                           
107                         }
108                         int index = SN-startSN;
109                         if(index >= currentSize){
110                                 int oldSize = currentSize;
111                                 Ticket *old = ticketVec;
112                                 
113                                 currentSize = index*2;
114                                 ticketVec = new Ticket[currentSize];
115                                 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
116                                 if(old != &initial[0]){                                 
117                                         delete [] old;
118                                 }
119                         }
120                         return ticketVec[index];
121                 }
122
123                 inline Ticket get(MCount SN){
124                         int index = SN-startSN;
125                         CmiAssert(index >= 0);
126                         if(index >= currentSize){
127                                 Ticket tn;
128                                 return tn;
129                         }else{
130                                 return ticketVec[index];
131                         }
132                 }
133
134                 inline void pup(PUP::er &p){
135                         p | startSN;
136                         p | currentSize;
137                         if(p.isUnpacking()){
138                                 if(currentSize > INITSIZE_SNTOTICKET){
139                                         ticketVec = new Ticket[currentSize];
140                                 }
141                         }
142                         for(int i=0;i<currentSize;i++){
143                                 p | ticketVec[i];
144                         }
145                 }       
146 };
147
148
149 /**
150  * This file includes the definition of the class for storing the meta data
151  * associdated with the message logging protocol.
152  */
153
154
155 /**
156  * @brief This class stores all the message logging related data for a chare.
157  */
158 class ChareMlogData{
159 public:
160         // Object unique ID.
161         CkObjID objID;
162         // Counts how many tickets have been handed out.
163         MCount tCount; 
164         // Stores the highest ticket that has been processed.
165         MCount tProcessed;
166         
167         //TODO: pup receivedTNs
168         CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
169         MCount *ticketHoles;
170         int numberHoles;
171         int currentHoles;
172         CkVec<LocalMessageLog> restoredLocalMsgLog;
173         int maxRestoredLocalTN;
174         int resendReplyRecvd;// variable that keeps a count of the processors that have replied to a requests to resend messages. 
175         int restartFlag; /*0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time */
176     int teamRecoveryFlag; // 0 -> normal state .. 1 -> recovery of a team member        
177         CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable;
178         //TML: teamTable, stores the SN to TN mapping for messages intra team
179         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> teamTable;
180
181         int toResumeOrNot;
182         int resumeCount;
183
184 private:
185
186         // SNTable, stores the number of messages sent (sequence numbers) to other objects.
187         CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable;
188         // TNTable, stores the ticket associated with a particular combination <ObjectID,SN>.
189         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable;
190         // Log of messages sent.
191         CkQ<MlogEntry *> mlog;
192         
193                 
194         inline MCount newTN();
195
196 public:
197         /**
198          * Default constructor.
199          */ 
200         ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4),teamTable(100,0.4){
201                 tCount = 0;
202                 tProcessed = 0;
203                 numberHoles = 0;
204                 ticketHoles = NULL;
205                 currentHoles = 0;
206                 restartFlag=0;
207                 teamRecoveryFlag=0;
208                 receivedTNs = NULL;
209                 resendReplyRecvd=0;
210                 maxRestoredLocalTN=0;
211                 toResumeOrNot=0;
212                 resumeCount=0;
213         };
214         inline MCount nextSN(const CkObjID &recver);
215         inline Ticket next_ticket(CkObjID &sender,MCount SN);
216         inline void verifyTicket(CkObjID &sender,MCount SN, MCount TN);
217         void addLogEntry(MlogEntry *entry);
218         virtual void pup(PUP::er &p);
219         CkQ<MlogEntry *> *getMlog(){ return &mlog;};
220         MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
221         void addToRestoredLocalQ(LocalMessageLog *logEntry);
222         void sortRestoredLocalMsgLog();
223 };
224
225 /**
226  * @brief Entry in a message log
227  */
228 class MlogEntry{
229 public:
230         envelope *env;
231         int destPE;
232         int _infoIdx;
233         char unackedLocal;
234         
235         MlogEntry(envelope *_env,int _destPE,int __infoIdx){
236                 env = _env;
237                 destPE = _destPE;
238                 _infoIdx = __infoIdx;
239                 unackedLocal = 0;
240         }
241         MlogEntry(){
242                 env = 0;
243                 destPE = -1;
244                 _infoIdx = 0;
245                 unackedLocal = 0;
246         }
247         ~MlogEntry(){
248                 if(env){
249                         CmiFree(env);
250                 }
251         }
252         virtual void pup(PUP::er &p);
253 };
254
255 /**
256  * @brief 
257  */
258 class LocationID{
259 public:
260         CkArrayIndexMax idx;
261         CkGroupID gid;
262 };
263
264 /**
265  * @brief
266  */
267 class StoredCheckpoint{
268 public:
269         char *buf;
270         int bufSize;
271         int PE;
272         StoredCheckpoint(){
273                 buf = NULL;
274                 bufSize = 0;
275                 PE = -1;
276         };
277 };
278
279 /**
280  *  @brief Class for storing metadata of local messages.
281  *  It maps sequence numbers to ticket numbers.
282  *  It is used after a restart to maintain the same ticket numbers.
283  */
284 class RestoredLocalMap {
285 public:
286         MCount minSN,maxSN,count;
287         MCount *TNArray;
288         RestoredLocalMap(){
289                 minSN=maxSN=count=0;
290                 TNArray=NULL;
291         };
292         RestoredLocalMap(int i){
293                 minSN=maxSN=count=0;
294                 TNArray=NULL;
295         };
296
297         virtual void pup(PUP::er &p);
298 };
299
300
301 typedef struct {
302         char header[CmiMsgHeaderSizeBytes];
303         CkObjID sender;
304         CkObjID recver;
305         MlogEntry *logEntry;
306         MCount SN;
307         MCount TN;
308         int senderPE;
309 } TicketRequest;
310 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
311 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
312
313 typedef struct{
314         TicketRequest request;
315         Ticket ticket;
316         int recverPE;
317 } TicketReply;
318
319
320 CpvExtern(CkQ<LocalMessageLog> *,_localMessageLog); // used on buddy to store local message logs
321
322 CpvExtern(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
323 extern int _maxBufferedMessages; //Number of local message logs  to be buffered
324
325 CpvExtern(char**,_bufferedTicketRequests);
326 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
327
328
329
330 typedef struct {
331         char header[CmiMsgHeaderSizeBytes];
332         int numberLogs;
333 } BufferedLocalLogHeader;
334
335 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
336
337 typedef struct{
338         char header[CmiMsgHeaderSizeBytes];
339         MlogEntry *entry;               
340 } LocalMessageLogAck;
341
342 typedef struct{
343         char header[CmiMsgHeaderSizeBytes];
344         int PE;
345         int dataSize;
346 } CheckPointDataMsg;
347
348 /*typedef struct{
349         char header[CmiMsgHeaderSizeBytes];
350         int PE;
351         int dataSize;
352 } CheckPointAck;*/
353
354 typedef CheckPointDataMsg CheckPointAck;
355
356 typedef struct{
357         CkObjID recver;
358         MCount tProcessed;
359 } TProcessedLog;
360
361 /**
362  * Struct to request a particular action during restart.
363  */
364 typedef struct{
365         char header[CmiMsgHeaderSizeBytes];
366         int PE;
367 } RestartRequest;
368
369 typedef RestartRequest CkPingMsg;
370 typedef RestartRequest CheckpointRequest;
371
372 typedef struct{
373         char header[CmiMsgHeaderSizeBytes];
374         int PE;
375         double restartWallTime;
376         int checkPointSize;
377         int numMigratedAwayElements;
378         int numMigratedInElements;
379         int migratedElementSize;
380         int numLocalMessages;   
381         CkGroupID lbGroupID;
382 } RestartProcessorData;
383
384 typedef struct{
385         char header[CmiMsgHeaderSizeBytes];
386         int PE;
387         int numberObjects;
388 } ResendRequest;
389
390 typedef ResendRequest RemoveLogRequest;
391
392 typedef struct {
393         char header[CmiMsgHeaderSizeBytes];
394         CkObjID recver;
395         int numTNs;
396 } ReceivedTNData;
397
398 typedef struct{
399         int PE;
400         int numberObjects;
401         TProcessedLog *listObjects;
402         MCount *maxTickets;
403         CkVec<MCount> *ticketVecs;
404 } ResendData;
405
406 typedef struct {
407         CkGroupID gID;
408         CkArrayIndexMax idx;
409         int fromPE,toPE;
410         char ackFrom,ackTo;
411 } MigrationRecord;
412
413 typedef struct {
414         char header[CmiMsgHeaderSizeBytes];
415         MigrationRecord migRecord;
416         void *record;
417 } MigrationNotice;
418
419 typedef struct {
420         char header[CmiMsgHeaderSizeBytes];
421         void *record;
422 } MigrationNoticeAck;
423
424 typedef struct {
425         MigrationRecord migRecord;
426         void *msg;
427         int size;
428         char acked;
429 } RetainedMigratedObject;
430
431 typedef struct {
432         char header[CmiMsgHeaderSizeBytes];
433         MigrationRecord migRecord;
434         int index;
435         int fromPE;
436 } VerifyAckMsg;
437
438 typedef struct {
439         char header[CmiMsgHeaderSizeBytes];
440         int checkpointCount;
441         int fromPE;
442 } CheckpointBarrierMsg;
443
444
445 //message used to inform a locmgr of an object's current location
446 typedef struct {
447         char header[CmiMsgHeaderSizeBytes];
448         CkGroupID mgrID;
449         CkArrayIndexMax idx;
450         int locationPE;
451         int fromPE;
452 } CurrentLocationMsg;
453
454 typedef struct {
455         char header[CmiMsgHeaderSizeBytes];
456         CkGroupID lbID;
457         int fromPE;
458         int step;
459 } LBStepMsg;
460
461
462 #define MLOG_OBJECT 1
463 #define MLOG_COUNT 2
464
465 typedef struct {
466         char header[CmiMsgHeaderSizeBytes];
467         int flag;// specific object(1) or count(2)
468         CkGroupID lbID;
469         int count;// if just count
470         /**if object **/
471         CkGroupID mgrID;
472         CkArrayIndexMax idx;
473         int locationPE;
474 } DummyMigrationMsg;
475
476
477 //function pointer passed to the forAllCharesDo method.
478 //It takes a void *data and a ChareMlogData pointer 
479 //It gets called for each chare
480 typedef void (*MlogFn)(void *,ChareMlogData *);
481
482 void _messageLoggingInit();
483
484 //Methods for sending ticket requests
485 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
486 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
487 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
488 void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
489 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,MCount TN,int resend);
490 void ticketLogLocalMessage(MlogEntry *entry);
491 void sendLocalMessageCopy(MlogEntry *entry);
492 void sendBufferedLocalMessageCopy();
493 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime);
494 void sendBufferedTicketRequests(int destPE);
495 void checkBufferedTicketRequests(void *_destPE,double curWallTime);
496
497
498
499
500 //handler idxs
501 extern int _ticketRequestHandlerIdx;
502 extern int _ticketHandlerIdx;
503 extern int _localMessageCopyHandlerIdx;
504 extern int _localMessageAckHandlerIdx;
505 extern int _bufferedLocalMessageCopyHandlerIdx;
506 extern int _bufferedLocalMessageAckHandlerIdx;
507 extern int _bufferedTicketRequestHandlerIdx;
508 extern int _bufferedTicketHandlerIdx;
509
510 //handler functions
511 void _ticketRequestHandler(TicketRequest *);
512 void _ticketHandler(TicketReply *);
513 void _localMessageCopyHandler(LocalMessageLog *);
514 void _localMessageAckHandler(LocalMessageLogAck *);
515 void _pingHandler(CkPingMsg *msg);
516 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
517 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
518 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
519 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
520
521
522 //methods for sending messages
523 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
524 extern void _noCldNodeEnqueue(int node, envelope *env);
525 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
526 void retryTicketRequest(void *_ticketRequest,double curWallTime);
527
528 //methods to process received messages with respect to mlog
529 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
530 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
531
532
533 //Checkpoint
534 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
535
536 //methods for checkpointing
537 void checkpointAlarm(void *_dummy,double curWallTime);
538 void startMlogCheckpoint(void *_dummy,double curWallTime);
539 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listSize=0);
540
541 //handler functions for checkpoint
542 void _checkpointRequestHandler(CheckpointRequest *request);
543 void _storeCheckpointHandler(char *msg);
544 void _checkpointAckHandler(CheckPointAck *ackMsg);
545 void _removeProcessedLogHandler(char *requestMsg);
546
547 //handler idxs for checkpoint
548 extern int _checkpointRequestHandlerIdx;
549 extern int _storeCheckpointHandlerIdx;
550 extern int _checkpointAckHandlerIdx;
551 extern int _removeProcessedLogHandlerIdx;
552
553 //Restart 
554
555
556 //methods for restart
557 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
558 void CkMlogRestartDouble(void *,double);
559 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
560 void initializeRestart(void *data,ChareMlogData *mlogData);
561 void distributeRestartedObjects();
562 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData);
563 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
564
565 //TML: function for locally calling the restart
566 void CkMlogRestartLocal();
567
568 //handler functions for restart
569 void _getCheckpointHandler(RestartRequest *restartMsg);
570 void _recvCheckpointHandler(char *_restartData);
571 void _resendMessagesHandler(char *msg);
572 void _resendReplyHandler(char *msg);
573 void _receivedTNDataHandler(ReceivedTNData *msg);
574 void _distributedLocationHandler(char *receivedMsg);
575 void _updateHomeRequestHandler(RestartRequest *updateRequest);
576 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
577 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
578 void _verifyAckHandler(VerifyAckMsg *verifyReply);
579 void _dummyMigrationHandler(DummyMigrationMsg *msg);
580
581 //TML: new functions for group-based message logging
582 void _restartHandler(RestartRequest *restartMsg);
583 void _getRestartCheckpointHandler(RestartRequest *restartMsg);
584 void _recvRestartCheckpointHandler(char *_restartData);
585
586
587 //handler idxs for restart
588 extern int _getCheckpointHandlerIdx;
589 extern int _recvCheckpointHandlerIdx;
590 extern int _resendMessagesHandlerIdx;
591 extern int _resendReplyHandlerIdx;
592 extern int _receivedTNDataHandlerIdx;
593 extern int _distributedLocationHandlerIdx;
594 extern int _updateHomeRequestHandlerIdx;
595 extern int _updateHomeAckHandlerIdx;
596 extern int _verifyAckRequestHandlerIdx;
597 extern int _verifyAckHandlerIdx;
598 extern int _dummyMigrationHandlerIdx;
599
600
601 /// Load Balancing
602
603 //methods for load balancing
604 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
605 void finishedCheckpointLoadBalancing();
606 void sendMlogLocation(int targetPE,envelope *env);
607 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
608
609 //handlers for Load Balancing
610 void _receiveMlogLocationHandler(void *buf);
611 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
612 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
613 void _getGlobalStepHandler(LBStepMsg *msg);
614 void _recvGlobalStepHandler(LBStepMsg *msg);
615 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
616 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
617
618
619 //globals used for loadBalancing
620 extern int onGoingLoadBalancing;
621 extern void *centralLb;
622 extern void (*resumeLbFnPtr)(void *) ;
623 extern int _receiveMlogLocationHandlerIdx;
624 extern int _receiveMigrationNoticeHandlerIdx;
625 extern int _receiveMigrationNoticeAckHandlerIdx;
626 extern int _getGlobalStepHandlerIdx;
627 extern int _recvGlobalStepHandlerIdx;
628 extern int _checkpointBarrierHandlerIdx;
629 extern int _checkpointBarrierAckHandlerIdx;
630
631 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
632 extern CkVec<MigrationRecord> migratedNoticeList;
633 extern CkVec<RetainedMigratedObject *> retainedObjectList;
634
635 int getCheckPointPE();
636 void forAllCharesDo(MlogFn fnPointer,void *data);
637 envelope *copyEnvelope(envelope *env);
638 extern void _initDone(void);
639
640 //TML: needed for group restart
641 extern void _resetNodeBocInitVec(void);
642
643 //methods for updating location
644 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
645
646 //handlers for updating locations
647 void _receiveLocationHandler(CurrentLocationMsg *data);
648
649 //globals for updating locations
650 extern int _receiveLocationHandlerIdx;
651
652
653 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
654 inline void processRemoteMlogMessages(){
655         CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
656 }
657
658 #endif