c29e92f4ac42da61e48bf53bc08164aaf8456487
[charm.git] / src / ck-core / ckmessagelogging.h
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
3
4 #include "ckobjid.h"
5
6 #if CMK_HAS_STRINGS_H
7 #include <strings.h>            /* defines bzero */
8 #else
9 #define bzero(s,n)   memset(s,0,n)
10 #endif
11
12 CpvExtern(Chare *,_currentObj);
13
14 //states of a ticket sent as a reply to a request
15 #define NEW_TICKET 1
16 #define OLD_TICKET 2
17 #define FORWARDED_TICKET 0x8000
18
19 //GML: global variable for the size of the group
20 #define GROUP_SIZE_MLOG 1
21 #define MEGABYTE 1048576
22
23 //array on which we print the formatted string representing an object id
24 extern char objString[100];
25
26 /**
27  * @brief
28  */
29 class Ticket {
30 public:
31         MCount TN;
32         int state;
33         Ticket(){
34                 TN = 0;
35                 state = 0;
36         }
37         Ticket(int x){
38                 TN = x;
39                 state = 0;
40         }
41 };
42 PUPbytes(Ticket)
43 class MlogEntry;
44
45 //Log entry for local messages, can also be sent as a message
46 typedef struct{
47         char header[CmiMsgHeaderSizeBytes];
48         CkObjID sender;
49         CkObjID recver;
50         MCount SN;
51         MCount TN;
52         MlogEntry *entry;
53         int PE;
54 } LocalMessageLog;
55 PUPbytes(LocalMessageLog)
56
57 class MlogEntry;
58 class RestoredLocalMap;
59
60 #define INITSIZE_SNTOTICKET 100
61
62 /**
63  * @brief
64  */
65 class SNToTicket{
66         private:
67                 Ticket initial[INITSIZE_SNTOTICKET];
68                 Ticket *ticketVec;
69                 MCount startSN;
70                 int currentSize;
71         public:
72                 SNToTicket(){
73                         currentSize = INITSIZE_SNTOTICKET;
74                         ticketVec = &initial[0];
75                         bzero(ticketVec,sizeof(Ticket)*currentSize);
76                         startSN = 0;
77                 }
78                 //assume indices start from 1.. true for MCounts
79                 inline Ticket &put(MCount SN){
80                         if(startSN == 0){
81                                 startSN = SN;                           
82                         }
83                         int index = SN-startSN;
84                         if(index >= currentSize){
85                                 int oldSize = currentSize;
86                                 Ticket *old = ticketVec;
87                                 
88                                 currentSize = index*2;
89                                 ticketVec = new Ticket[currentSize];
90                                 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
91                                 if(old != &initial[0]){                                 
92                                         delete [] old;
93                                 }
94                         }
95                         return ticketVec[index];
96                 }
97
98                 inline Ticket get(MCount SN){
99                         int index = SN-startSN;
100                         CmiAssert(index >= 0);
101                         if(index >= currentSize){
102                                 Ticket tn;
103                                 return tn;
104                         }else{
105                                 return ticketVec[index];
106                         }
107                 }
108
109                 inline void pup(PUP::er &p){
110                         p | startSN;
111                         p | currentSize;
112                         if(p.isUnpacking()){
113                                 if(currentSize > INITSIZE_SNTOTICKET){
114                                         ticketVec = new Ticket[currentSize];
115                                 }
116                         }
117                         for(int i=0;i<currentSize;i++){
118                                 p | ticketVec[i];
119                         }
120                 }
121                 
122 };
123
124
125 /**
126  * @brief This class stores all the message logging related data in a chare
127  */
128 class ChareMlogData{
129 public:
130         CkObjID objID;
131         MCount tCount; //count of ticket handed out
132         MCount tProcessed;  // highest ticket number that has been processed
133         
134         //TODO: pup receivedTNs
135         CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
136         MCount *ticketHoles;
137         int numberHoles;
138         int currentHoles;
139         CkVec<LocalMessageLog> restoredLocalMsgLog;
140         int maxRestoredLocalTN;
141         int resendReplyRecvd;// variable that keeps a count of the processors that have replied to a requests to resend messages. 
142         int restartFlag; /*0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time */
143         CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable; 
144
145         int toResumeOrNot;
146         int resumeCount;
147
148 private:
149
150         CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable; //SN used for messages to different objects
151 //      typedef CkHashtableT<CkHashtableAdaptorT<MCount>,Ticket> SNToTicket;
152         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable; //<ObjectID,SN> -> Ticket handed out 
153         CkQ<MlogEntry *> mlog;  //log of messages sent
154         
155         
156         inline MCount newTN();
157
158 public: 
159         ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4){
160                 tCount = 0;
161                 tProcessed = 0;
162                 numberHoles = 0;
163                 ticketHoles = NULL;
164                 currentHoles = 0;
165                 restartFlag=0;
166                 receivedTNs = NULL;
167                 resendReplyRecvd=0;
168           maxRestoredLocalTN=0;
169
170                 toResumeOrNot=0;
171                 resumeCount=0;
172         };
173         inline MCount nextSN(const CkObjID &recver);
174         inline Ticket next_ticket(CkObjID &sender,MCount SN);
175         void addLogEntry(MlogEntry *entry);
176         virtual void pup(PUP::er &p);
177         CkQ<MlogEntry *> *getMlog(){ return &mlog;};
178         MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
179         void addToRestoredLocalQ(LocalMessageLog *logEntry);
180         void sortRestoredLocalMsgLog();
181 };
182
183 /**
184  * @brief Entry in a message log
185  */
186 class MlogEntry{
187 public:
188         envelope *env;
189         int destPE;
190         int _infoIdx;
191         char unackedLocal;
192         
193         MlogEntry(envelope *_env,int _destPE,int __infoIdx){
194                 env = _env;
195                 destPE = _destPE;
196                 _infoIdx = __infoIdx;
197                 unackedLocal = 0;
198         }
199         MlogEntry(){
200                 env = 0;
201                 destPE = -1;
202                 _infoIdx = 0;
203                 unackedLocal = 0;
204         }
205         ~MlogEntry(){
206                 if(env){
207                         CmiFree(env);
208                 }
209         }
210         virtual void pup(PUP::er &p);
211 };
212
213 /**
214  * @brief 
215  */
216 class LocationID{
217 public:
218         CkArrayIndexMax idx;
219         CkGroupID gid;
220 };
221
222 /**
223  * @brief
224  */
225 class StoredCheckpoint{
226 public:
227         char *buf;
228         int bufSize;
229         int PE;
230         StoredCheckpoint(){
231                 buf = NULL;
232                 bufSize = 0;
233                 PE = -1;
234         };
235 };
236
237 /**
238  *  @brief 
239  */
240 class RestoredLocalMap {
241 public:
242         MCount minSN,maxSN,count;
243         MCount *TNArray;
244         RestoredLocalMap(){
245                 minSN=maxSN=count=0;
246                 TNArray=NULL;
247         };
248         RestoredLocalMap(int i){
249                 minSN=maxSN=count=0;
250                 TNArray=NULL;
251         };
252
253         virtual void pup(PUP::er &p);
254 };
255
256
257 typedef struct {
258         char header[CmiMsgHeaderSizeBytes];
259         CkObjID sender;
260         CkObjID recver;
261         MlogEntry *logEntry;
262         MCount SN;
263         int senderPE;
264 } TicketRequest;
265 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
266 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
267
268 typedef struct{
269         TicketRequest request;
270         Ticket ticket;
271         int recverPE;
272 } TicketReply;
273
274
275 CpvExtern(CkQ<LocalMessageLog> *,_localMessageLog); // used on buddy to store local message logs
276
277 CpvExtern(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
278 extern int _maxBufferedMessages; //Number of local message logs  to be buffered
279
280 CpvExtern(char**,_bufferedTicketRequests);
281 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
282
283
284
285 typedef struct {
286         char header[CmiMsgHeaderSizeBytes];
287         int numberLogs;
288 } BufferedLocalLogHeader;
289
290 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
291
292 typedef struct{
293         char header[CmiMsgHeaderSizeBytes];
294         MlogEntry *entry;               
295 } LocalMessageLogAck;
296
297 typedef struct{
298         char header[CmiMsgHeaderSizeBytes];
299         int PE;
300         int dataSize;
301 } CheckPointDataMsg;
302
303 /*typedef struct{
304         char header[CmiMsgHeaderSizeBytes];
305         int PE;
306         int dataSize;
307 } CheckPointAck;*/
308
309 typedef CheckPointDataMsg CheckPointAck;
310
311 typedef struct{
312         CkObjID recver;
313         MCount tProcessed;
314 } TProcessedLog;
315
316
317 typedef struct{
318         char header[CmiMsgHeaderSizeBytes];
319         int PE;
320 } RestartRequest;
321
322 typedef RestartRequest CkPingMsg;
323 typedef RestartRequest CheckpointRequest;
324
325 typedef struct{
326         char header[CmiMsgHeaderSizeBytes];
327         int PE;
328         double restartWallTime;
329         int checkPointSize;
330         int numMigratedAwayElements;
331         int numMigratedInElements;
332         int migratedElementSize;
333         int numLocalMessages;   
334         CkGroupID lbGroupID;
335 } RestartProcessorData;
336
337 typedef struct{
338         char header[CmiMsgHeaderSizeBytes];
339         int PE;
340         int numberObjects;
341 } ResendRequest;
342
343 typedef ResendRequest RemoveLogRequest;
344
345 typedef struct {
346         char header[CmiMsgHeaderSizeBytes];
347         CkObjID recver;
348         int numTNs;
349 } ReceivedTNData;
350
351 typedef struct{
352         int PE;
353         int numberObjects;
354         TProcessedLog *listObjects;
355         MCount *maxTickets;
356         CkVec<MCount> *ticketVecs;
357 } ResendData;
358
359 typedef struct {
360         CkGroupID gID;
361         CkArrayIndexMax idx;
362         int fromPE,toPE;
363         char ackFrom,ackTo;
364 } MigrationRecord;
365
366 typedef struct {
367         char header[CmiMsgHeaderSizeBytes];
368         MigrationRecord migRecord;
369         void *record;
370 } MigrationNotice;
371
372 typedef struct {
373         char header[CmiMsgHeaderSizeBytes];
374         void *record;
375 } MigrationNoticeAck;
376
377 typedef struct {
378         MigrationRecord migRecord;
379         void *msg;
380         int size;
381         char acked;
382 } RetainedMigratedObject;
383
384 typedef struct {
385         char header[CmiMsgHeaderSizeBytes];
386         MigrationRecord migRecord;
387         int index;
388         int fromPE;
389 } VerifyAckMsg;
390
391 typedef struct {
392         char header[CmiMsgHeaderSizeBytes];
393         int checkpointCount;
394         int fromPE;
395 } CheckpointBarrierMsg;
396
397
398 //message used to inform a locmgr of an object's current location
399 typedef struct {
400         char header[CmiMsgHeaderSizeBytes];
401         CkGroupID mgrID;
402         CkArrayIndexMax idx;
403         int locationPE;
404         int fromPE;
405 } CurrentLocationMsg;
406
407 typedef struct {
408         char header[CmiMsgHeaderSizeBytes];
409         CkGroupID lbID;
410         int fromPE;
411         int step;
412 } LBStepMsg;
413
414
415 #define MLOG_OBJECT 1
416 #define MLOG_COUNT 2
417
418 typedef struct {
419         char header[CmiMsgHeaderSizeBytes];
420         int flag;// specific object(1) or count(2)
421         CkGroupID lbID;
422         int count;// if just count
423         /**if object **/
424         CkGroupID mgrID;
425         CkArrayIndexMax idx;
426         int locationPE;
427 } DummyMigrationMsg;
428
429
430 //function pointer passed to the forAllCharesDo method.
431 //It takes a void *data and a ChareMlogData pointer 
432 //It gets called for each chare
433 typedef void (*MlogFn)(void *,ChareMlogData *);
434
435 void _messageLoggingInit();
436
437 //Methods for sending ticket requests
438 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
439 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
440 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
441 void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
442 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend);
443 void ticketLogLocalMessage(MlogEntry *entry);
444 void sendLocalMessageCopy(MlogEntry *entry);
445 void sendBufferedLocalMessageCopy();
446 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime);
447 void sendBufferedTicketRequests(int destPE);
448 void checkBufferedTicketRequests(void *_destPE,double curWallTime);
449
450
451
452
453 //handler idxs
454 extern int _ticketRequestHandlerIdx;
455 extern int _ticketHandlerIdx;
456 extern int _localMessageCopyHandlerIdx;
457 extern int _localMessageAckHandlerIdx;
458 extern int _bufferedLocalMessageCopyHandlerIdx;
459 extern int _bufferedLocalMessageAckHandlerIdx;
460 extern int _bufferedTicketRequestHandlerIdx;
461 extern int _bufferedTicketHandlerIdx;
462
463 //handler functions
464 void _ticketRequestHandler(TicketRequest *);
465 void _ticketHandler(TicketReply *);
466 void _localMessageCopyHandler(LocalMessageLog *);
467 void _localMessageAckHandler(LocalMessageLogAck *);
468 void _pingHandler(CkPingMsg *msg);
469 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
470 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
471 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
472 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
473
474
475 //methods for sending messages
476 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
477 extern void _noCldNodeEnqueue(int node, envelope *env);
478 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
479 void retryTicketRequest(void *_ticketRequest,double curWallTime);
480
481 //methods to process received messages with respect to mlog
482 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
483 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
484
485
486 //Checkpoint
487 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
488
489 //methods for checkpointing
490 void checkpointAlarm(void *_dummy,double curWallTime);
491 void startMlogCheckpoint(void *_dummy,double curWallTime);
492 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listSize=0);
493
494 //handler functions for checkpoint
495 void _checkpointRequestHandler(CheckpointRequest *request);
496 void _storeCheckpointHandler(char *msg);
497 void _checkpointAckHandler(CheckPointAck *ackMsg);
498 void _removeProcessedLogHandler(char *requestMsg);
499
500 //handler idxs for checkpoint
501 extern int _checkpointRequestHandlerIdx;
502 extern int _storeCheckpointHandlerIdx;
503 extern int _checkpointAckHandlerIdx;
504 extern int _removeProcessedLogHandlerIdx;
505
506 //Restart 
507
508
509 //methods for restart
510 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
511 void CkMlogRestartDouble(void *,double);
512 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
513 void initializeRestart(void *data,ChareMlogData *mlogData);
514 void distributeRestartedObjects();
515 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData);
516 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
517
518 //GML: function for locally calling the restart
519 void CkMlogRestartLocal();
520
521 //handler functions for restart
522 void _getCheckpointHandler(RestartRequest *restartMsg);
523 void _recvCheckpointHandler(char *_restartData);
524 void _resendMessagesHandler(char *msg);
525 void _resendReplyHandler(char *msg);
526 void _receivedTNDataHandler(ReceivedTNData *msg);
527 void _distributedLocationHandler(char *receivedMsg);
528 void _updateHomeRequestHandler(RestartRequest *updateRequest);
529 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
530 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
531 void _verifyAckHandler(VerifyAckMsg *verifyReply);
532 void _dummyMigrationHandler(DummyMigrationMsg *msg);
533
534
535
536 //handler idxs for restart
537 extern int _getCheckpointHandlerIdx;
538 extern int _recvCheckpointHandlerIdx;
539 extern int _resendMessagesHandlerIdx;
540 extern int _resendReplyHandlerIdx;
541 extern int _receivedTNDataHandlerIdx;
542 extern int _distributedLocationHandlerIdx;
543 extern int _updateHomeRequestHandlerIdx;
544 extern int _updateHomeAckHandlerIdx;
545 extern int _verifyAckRequestHandlerIdx;
546 extern int _verifyAckHandlerIdx;
547 extern int _dummyMigrationHandlerIdx;
548
549
550 /// Load Balancing
551
552 //methods for load balancing
553 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
554 void finishedCheckpointLoadBalancing();
555 void sendMlogLocation(int targetPE,envelope *env);
556 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
557
558 //handlers for Load Balancing
559 void _receiveMlogLocationHandler(void *buf);
560 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
561 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
562 void _getGlobalStepHandler(LBStepMsg *msg);
563 void _recvGlobalStepHandler(LBStepMsg *msg);
564 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
565 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
566
567
568 //globals used for loadBalancing
569 extern int onGoingLoadBalancing;
570 extern void *centralLb;
571 extern void (*resumeLbFnPtr)(void *) ;
572 extern int _receiveMlogLocationHandlerIdx;
573 extern int _receiveMigrationNoticeHandlerIdx;
574 extern int _receiveMigrationNoticeAckHandlerIdx;
575 extern int _getGlobalStepHandlerIdx;
576 extern int _recvGlobalStepHandlerIdx;
577 extern int _checkpointBarrierHandlerIdx;
578 extern int _checkpointBarrierAckHandlerIdx;
579
580 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
581 extern CkVec<MigrationRecord> migratedNoticeList;
582 extern CkVec<RetainedMigratedObject *> retainedObjectList;
583
584 int getCheckPointPE();
585 void forAllCharesDo(MlogFn fnPointer,void *data);
586 envelope *copyEnvelope(envelope *env);
587 extern void _initDone(void);
588
589 //GML: needed for group restart
590 extern void _resetNodeBocInitVec(void);
591
592 //methods for updating location
593 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
594
595 //handlers for updating locations
596 void _receiveLocationHandler(CurrentLocationMsg *data);
597
598 //globals for updating locations
599 extern int _receiveLocationHandlerIdx;
600
601
602 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
603 inline void processRemoteMlogMessages(){
604         CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
605 }
606
607 #endif