A few bug fixes related to the mlogft machine.
[charm.git] / src / ck-core / ckmessagelogging.h
1 #ifndef _CKMESSAGELOGGING_H_
2 #define _CKMESSAGELOGGING_H_
3
4 #include "ckobjid.h"
5
6 CpvExtern(Chare *,_currentObj);
7
8 //states of a ticket sent as a reply to a request
9 #define NEW_TICKET 1
10 #define OLD_TICKET 2
11 #define FORWARDED_TICKET 0x8000
12
13 //array on which we print the formatted string representing an object id
14 extern char objString[100];
15
16 /**
17  * @brief
18  */
19 class Ticket {
20 public:
21         MCount TN;
22         int state;
23         Ticket(){
24                 TN = 0;
25                 state = 0;
26         }
27         Ticket(int x){
28                 TN = x;
29                 state = 0;
30         }
31 };
32 PUPbytes(Ticket);
33 class MlogEntry;
34
35 //Log entry for local messages, can also be sent as a message
36 typedef struct{
37         char header[CmiMsgHeaderSizeBytes];
38         CkObjID sender;
39         CkObjID recver;
40         MCount SN;
41         MCount TN;
42         MlogEntry *entry;
43         int PE;
44 } LocalMessageLog;
45 PUPbytes(LocalMessageLog);
46
47 class MlogEntry;
48 class RestoredLocalMap;
49
50 #define INITSIZE_SNTOTICKET 100
51
52 /**
53  * @brief
54  */
55 class SNToTicket{
56         private:
57                 Ticket initial[INITSIZE_SNTOTICKET];
58                 Ticket *ticketVec;
59                 MCount startSN;
60                 int currentSize;
61         public:
62                 SNToTicket(){
63                         currentSize = INITSIZE_SNTOTICKET;
64                         ticketVec = &initial[0];
65                         bzero(ticketVec,sizeof(Ticket)*currentSize);
66                         startSN = 0;
67                 }
68                 //assume indices start from 1.. true for MCounts
69                 inline Ticket &put(MCount SN){
70                         if(startSN == 0){
71                                 startSN = SN;                           
72                         }
73                         int index = SN-startSN;
74                         if(index >= currentSize){
75                                 int oldSize = currentSize;
76                                 Ticket *old = ticketVec;
77                                 
78                                 currentSize = index*2;
79                                 ticketVec = new Ticket[currentSize];
80                                 memcpy(ticketVec,old,sizeof(Ticket)*oldSize);
81                                 if(old != &initial[0]){                                 
82                                         delete [] old;
83                                 }
84                         }
85                         return ticketVec[index];
86                 }
87
88                 inline Ticket get(MCount SN){
89                         int index = SN-startSN;
90                         CmiAssert(index >= 0);
91                         if(index >= currentSize){
92                                 Ticket tn;
93                                 return tn;
94                         }else{
95                                 return ticketVec[index];
96                         }
97                 }
98
99                 inline void pup(PUP::er &p){
100                         p | startSN;
101                         p | currentSize;
102                         if(p.isUnpacking()){
103                                 if(currentSize > INITSIZE_SNTOTICKET){
104                                         ticketVec = new Ticket[currentSize];
105                                 }
106                         }
107                         for(int i=0;i<currentSize;i++){
108                                 p | ticketVec[i];
109                         }
110                 }
111                 
112 };
113
114
115 /**
116  * @brief This class stores all the message logging related data in a chare
117  */
118 class ChareMlogData{
119 public:
120         CkObjID objID;
121         MCount tCount; //count of ticket handed out
122         MCount tProcessed;  // highest ticket number that has been processed
123         
124         //TODO: pup receivedTNs
125         CkVec<MCount> *receivedTNs; //used to store TNs received by senders during a restart
126         MCount *ticketHoles;
127         int numberHoles;
128         int currentHoles;
129         CkVec<LocalMessageLog> restoredLocalMsgLog;
130         int maxRestoredLocalTN;
131         int resendReplyRecvd;// variable that keeps a count of the processors that have replied to a requests to resend messages. 
132         int restartFlag; /*0 -> Normal state .. 1-> just after restart. tickets should not be handed out at this time */
133         CkHashtableT<CkHashtableAdaptorT<CkObjID>,RestoredLocalMap *> mapTable; 
134
135         int toResumeOrNot;
136         int resumeCount;
137
138 private:
139
140         CkHashtableT<CkHashtableAdaptorT<CkObjID>,MCount> snTable; //SN used for messages to different objects
141 //      typedef CkHashtableT<CkHashtableAdaptorT<MCount>,Ticket> SNToTicket;
142         CkHashtableT<CkHashtableAdaptorT<CkObjID>,SNToTicket *> ticketTable; //<ObjectID,SN> -> Ticket handed out 
143         CkQ<MlogEntry *> mlog;  //log of messages sent
144         
145         
146         inline MCount newTN();
147
148 public: 
149         ChareMlogData():ticketTable(1000,0.3),snTable(100,0.4){
150                 tCount = 0;
151                 tProcessed = 0;
152                 numberHoles = 0;
153                 ticketHoles = NULL;
154                 currentHoles = 0;
155                 restartFlag=0;
156                 receivedTNs = NULL;
157                 resendReplyRecvd=0;
158           maxRestoredLocalTN=0;
159
160                 toResumeOrNot=0;
161                 resumeCount=0;
162         };
163         inline MCount nextSN(const CkObjID &recver);
164         inline Ticket next_ticket(CkObjID &sender,MCount SN);
165         void addLogEntry(MlogEntry *entry);
166         virtual void pup(PUP::er &p);
167         CkQ<MlogEntry *> *getMlog(){ return &mlog;};
168         MCount searchRestoredLocalQ(CkObjID &sender,CkObjID &recver,MCount SN);
169         void addToRestoredLocalQ(LocalMessageLog *logEntry);
170         void sortRestoredLocalMsgLog();
171 };
172
173 /**
174  * @brief Entry in a message log
175  */
176 class MlogEntry{
177 public:
178         envelope *env;
179         int destPE;
180         int _infoIdx;
181         char unackedLocal;
182         
183         MlogEntry(envelope *_env,int _destPE,int __infoIdx){
184                 env = _env;
185                 destPE = _destPE;
186                 _infoIdx = __infoIdx;
187                 unackedLocal = 0;
188         }
189         MlogEntry(){
190                 env = 0;
191                 destPE = -1;
192                 _infoIdx = 0;
193                 unackedLocal = 0;
194         }
195         ~MlogEntry(){
196                 if(env){
197                         CmiFree(env);
198                 }
199         }
200         virtual void pup(PUP::er &p);
201 };
202
203 /**
204  * @brief 
205  */
206 class LocationID{
207 public:
208         CkArrayIndexMax idx;
209         CkGroupID gid;
210 };
211
212 /**
213  * @brief
214  */
215 class StoredCheckpoint{
216 public:
217         char *buf;
218         int bufSize;
219         int PE;
220         StoredCheckpoint(){
221                 buf = NULL;
222                 bufSize = 0;
223                 PE = -1;
224         };
225 };
226
227 /**
228  *  @brief 
229  */
230 class RestoredLocalMap {
231 public:
232         MCount minSN,maxSN,count;
233         MCount *TNArray;
234         RestoredLocalMap(){
235                 minSN=maxSN=count=0;
236                 TNArray=NULL;
237         };
238         RestoredLocalMap(int i){
239                 minSN=maxSN=count=0;
240                 TNArray=NULL;
241         };
242
243         virtual void pup(PUP::er &p);
244 };
245
246
247 typedef struct {
248         char header[CmiMsgHeaderSizeBytes];
249         CkObjID sender;
250         CkObjID recver;
251         MlogEntry *logEntry;
252         MCount SN;
253         int senderPE;
254 } TicketRequest;
255 CpvExtern(CkQ<TicketRequest *> *,_delayedTicketRequests);
256 CpvExtern(CkQ<MlogEntry *> *,_delayedLocalTicketRequests);
257
258 typedef struct{
259         TicketRequest request;
260         Ticket ticket;
261         int recverPE;
262 } TicketReply;
263
264
265 CpvExtern(CkQ<LocalMessageLog> *,_localMessageLog); // used on buddy to store local message logs
266
267 CpvExtern(CkQ<LocalMessageLog>*,_bufferedLocalMessageLogs);
268 extern int _maxBufferedMessages; //Number of local message logs  to be buffered
269
270 CpvExtern(char**,_bufferedTicketRequests);
271 extern int _maxBufferedTicketRequests; //Number of ticket requests to be buffered
272
273
274
275 typedef struct {
276         char header[CmiMsgHeaderSizeBytes];
277         int numberLogs;
278 } BufferedLocalLogHeader;
279
280 typedef BufferedLocalLogHeader BufferedTicketRequestHeader;
281
282 typedef struct{
283         char header[CmiMsgHeaderSizeBytes];
284         MlogEntry *entry;               
285 } LocalMessageLogAck;
286
287 typedef struct{
288         char header[CmiMsgHeaderSizeBytes];
289         int PE;
290         int dataSize;
291 } CheckPointDataMsg;
292
293 /*typedef struct{
294         char header[CmiMsgHeaderSizeBytes];
295         int PE;
296         int dataSize;
297 } CheckPointAck;*/
298
299 typedef CheckPointDataMsg CheckPointAck;
300
301 typedef struct{
302         CkObjID recver;
303         MCount tProcessed;
304 } TProcessedLog;
305
306
307 typedef struct{
308         char header[CmiMsgHeaderSizeBytes];
309         int PE;
310 } RestartRequest;
311
312 typedef RestartRequest CkPingMsg;
313 typedef RestartRequest CheckpointRequest;
314
315 typedef struct{
316         char header[CmiMsgHeaderSizeBytes];
317         int PE;
318         double restartWallTime;
319         int checkPointSize;
320         int numMigratedAwayElements;
321         int numMigratedInElements;
322         int migratedElementSize;
323         int numLocalMessages;   
324         CkGroupID lbGroupID;
325 } RestartProcessorData;
326
327 typedef struct{
328         char header[CmiMsgHeaderSizeBytes];
329         int PE;
330         int numberObjects;
331 } ResendRequest;
332
333 typedef ResendRequest RemoveLogRequest;
334
335 typedef struct {
336         char header[CmiMsgHeaderSizeBytes];
337         CkObjID recver;
338         int numTNs;
339 } ReceivedTNData;
340
341 typedef struct{
342         int PE;
343         int numberObjects;
344         TProcessedLog *listObjects;
345         MCount *maxTickets;
346         CkVec<MCount> *ticketVecs;
347 } ResendData;
348
349 typedef struct {
350         CkGroupID gID;
351         CkArrayIndexMax idx;
352         int fromPE,toPE;
353         char ackFrom,ackTo;
354 } MigrationRecord;
355
356 typedef struct {
357         char header[CmiMsgHeaderSizeBytes];
358         MigrationRecord migRecord;
359         void *record;
360 } MigrationNotice;
361
362 typedef struct {
363         char header[CmiMsgHeaderSizeBytes];
364         void *record;
365 } MigrationNoticeAck;
366
367 typedef struct {
368         MigrationRecord migRecord;
369         void *msg;
370         int size;
371         char acked;
372 } RetainedMigratedObject;
373
374 typedef struct {
375         char header[CmiMsgHeaderSizeBytes];
376         MigrationRecord migRecord;
377         int index;
378         int fromPE;
379 } VerifyAckMsg;
380
381 typedef struct {
382         char header[CmiMsgHeaderSizeBytes];
383         int checkpointCount;
384         int fromPE;
385 } CheckpointBarrierMsg;
386
387
388 //message used to inform a locmgr of an object's current location
389 typedef struct {
390         char header[CmiMsgHeaderSizeBytes];
391         CkGroupID mgrID;
392         CkArrayIndexMax idx;
393         int locationPE;
394         int fromPE;
395 } CurrentLocationMsg;
396
397 typedef struct {
398         char header[CmiMsgHeaderSizeBytes];
399         CkGroupID lbID;
400         int fromPE;
401         int step;
402 } LBStepMsg;
403
404
405 #define MLOG_OBJECT 1
406 #define MLOG_COUNT 2
407
408 typedef struct {
409         char header[CmiMsgHeaderSizeBytes];
410         int flag;// specific object(1) or count(2)
411         CkGroupID lbID;
412         int count;// if just count
413         /**if object **/
414         CkGroupID mgrID;
415         CkArrayIndexMax idx;
416         int locationPE;
417 } DummyMigrationMsg;
418
419
420 //function pointer passed to the forAllCharesDo method.
421 //It takes a void *data and a ChareMlogData pointer 
422 //It gets called for each chare
423 typedef void (*MlogFn)(void *,ChareMlogData *);
424
425 void _messageLoggingInit();
426
427 //Methods for sending ticket requests
428 void sendTicketGroupRequest(envelope *env,int destPE,int _infoIdx);
429 void sendTicketArrayRequest(envelope *env,int destPE,int _infoIdx);
430 void sendTicketNodeGroupRequest(envelope *env,int destNode,int _infoIdx);
431 void generateCommonTicketRequest(CkObjID &recver,envelope *env,int destPE,int _infoIdx);
432 void sendTicketRequest(CkObjID &sender,CkObjID &recver,int destPE,MlogEntry *entry,MCount SN,int resend);
433 void ticketLogLocalMessage(MlogEntry *entry);
434 void sendLocalMessageCopy(MlogEntry *entry);
435 void sendBufferedLocalMessageCopy();
436 void checkBufferedLocalMessageCopy(void *_dummy,double curWallTime);
437 void sendBufferedTicketRequests(int destPE);
438 void checkBufferedTicketRequests(void *_destPE,double curWallTime);
439
440
441
442
443 //handler idxs
444 extern int _ticketRequestHandlerIdx;
445 extern int _ticketHandlerIdx;
446 extern int _localMessageCopyHandlerIdx;
447 extern int _localMessageAckHandlerIdx;
448 extern int _bufferedLocalMessageCopyHandlerIdx;
449 extern int _bufferedLocalMessageAckHandlerIdx;
450 extern int _bufferedTicketRequestHandlerIdx;
451 extern int _bufferedTicketHandlerIdx;
452
453 //handler functions
454 void _ticketRequestHandler(TicketRequest *);
455 void _ticketHandler(TicketReply *);
456 void _localMessageCopyHandler(LocalMessageLog *);
457 void _localMessageAckHandler(LocalMessageLogAck *);
458 void _pingHandler(CkPingMsg *msg);
459 void _bufferedLocalMessageCopyHandler(BufferedLocalLogHeader *recvdHeader,int freeHeader=1);
460 void _bufferedLocalMessageAckHandler(BufferedLocalLogHeader *recvdHeader);
461 void _bufferedTicketRequestHandler(BufferedTicketRequestHeader *recvdHeader);
462 void _bufferedTicketHandler(BufferedTicketRequestHeader *recvdHeader);
463
464
465 //methods for sending messages
466 extern void _skipCldEnqueue(int pe,envelope *env, int infoFn);
467 extern void _noCldNodeEnqueue(int node, envelope *env);
468 void generalCldEnqueue(int destPE,envelope *env,int _infoIdx);
469 void retryTicketRequest(void *_ticketRequest,double curWallTime);
470
471 //methods to process received messages with respect to mlog
472 int preProcessReceivedMessage(envelope *env,Chare **objPointer,MlogEntry **localLogEntry);
473 void postProcessReceivedMessage(Chare *obj,CkObjID &sender,MCount SN,MlogEntry *entry);
474
475
476 //Checkpoint
477 CpvExtern(StoredCheckpoint *,_storedCheckpointData);
478
479 //methods for checkpointing
480 void checkpointAlarm(void *_dummy,double curWallTime);
481 void startMlogCheckpoint(void *_dummy,double curWallTime);
482 void pupArrayElementsSkip(PUP::er &p, MigrationRecord *listToSkip,int listSize=0);
483
484 //handler functions for checkpoint
485 void _checkpointRequestHandler(CheckpointRequest *request);
486 void _storeCheckpointHandler(char *msg);
487 void _checkpointAckHandler(CheckPointAck *ackMsg);
488 void _removeProcessedLogHandler(char *requestMsg);
489
490 //handler idxs for checkpoint
491 extern int _checkpointRequestHandlerIdx;
492 extern int _storeCheckpointHandlerIdx;
493 extern int _checkpointAckHandlerIdx;
494 extern int _removeProcessedLogHandlerIdx;
495
496 //Restart 
497
498
499 //methods for restart
500 void CkMlogRestart(const char * dummy, CkArgMsg * dummyMsg);
501 void CkMlogRestartDouble(void *,double);
502 void processReceivedTN(Chare *obj,int vecsize,MCount *listTNs);
503 void initializeRestart(void *data,ChareMlogData *mlogData);
504 void distributeRestartedObjects();
505 void sortRestoredLocalMsgLog(void *_dummy,ChareMlogData *mlogData);
506 void sendDummyMigration(int restartPE,CkGroupID lbID,CkGroupID locMgrID,CkArrayIndexMax &idx,int locationPE);
507
508
509 //handler functions for restart
510 void _getCheckpointHandler(RestartRequest *restartMsg);
511 void _recvCheckpointHandler(char *_restartData);
512 void _resendMessagesHandler(char *msg);
513 void _resendReplyHandler(char *msg);
514 void _receivedTNDataHandler(ReceivedTNData *msg);
515 void _distributedLocationHandler(char *receivedMsg);
516 void _updateHomeRequestHandler(RestartRequest *updateRequest);
517 void _updateHomeAckHandler(RestartRequest *updateHomeAck);
518 void _verifyAckRequestHandler(VerifyAckMsg *verifyRequest);
519 void _verifyAckHandler(VerifyAckMsg *verifyReply);
520 void _dummyMigrationHandler(DummyMigrationMsg *msg);
521
522
523
524 //handler idxs for restart
525 extern int _getCheckpointHandlerIdx;
526 extern int _recvCheckpointHandlerIdx;
527 extern int _resendMessagesHandlerIdx;
528 extern int _resendReplyHandlerIdx;
529 extern int _receivedTNDataHandlerIdx;
530 extern int _distributedLocationHandlerIdx;
531 extern int _updateHomeRequestHandlerIdx;
532 extern int _updateHomeAckHandlerIdx;
533 extern int _verifyAckRequestHandlerIdx;
534 extern int _verifyAckHandlerIdx;
535 extern int _dummyMigrationHandlerIdx;
536
537
538 /// Load Balancing
539
540 //methods for load balancing
541 void startLoadBalancingMlog(void (*fnPtr)(void *),void *_centralLb);
542 void finishedCheckpointLoadBalancing();
543 void sendMlogLocation(int targetPE,envelope *env);
544 void resumeFromSyncRestart(void *data,ChareMlogData *mlogData);
545
546 //handlers for Load Balancing
547 void _receiveMlogLocationHandler(void *buf);
548 void _receiveMigrationNoticeHandler(MigrationNotice *msg);
549 void _receiveMigrationNoticeAckHandler(MigrationNoticeAck *msg);
550 void _getGlobalStepHandler(LBStepMsg *msg);
551 void _recvGlobalStepHandler(LBStepMsg *msg);
552 void _checkpointBarrierHandler(CheckpointBarrierMsg *msg);
553 void _checkpointBarrierAckHandler(CheckpointBarrierMsg *msg);
554
555
556 //globals used for loadBalancing
557 extern int onGoingLoadBalancing;
558 extern void *centralLb;
559 extern void (*resumeLbFnPtr)(void *) ;
560 extern int _receiveMlogLocationHandlerIdx;
561 extern int _receiveMigrationNoticeHandlerIdx;
562 extern int _receiveMigrationNoticeAckHandlerIdx;
563 extern int _getGlobalStepHandlerIdx;
564 extern int _recvGlobalStepHandlerIdx;
565 extern int _checkpointBarrierHandlerIdx;
566 extern int _checkpointBarrierAckHandlerIdx;
567
568 //extern CkHashtableT<CkHashtableAdaptorT<CkObjID>,void *> migratedObjectList;
569 extern CkVec<MigrationRecord> migratedNoticeList;
570 extern CkVec<RetainedMigratedObject *> retainedObjectList;
571
572 int getCheckPointPE();
573 void forAllCharesDo(MlogFn fnPointer,void *data);
574 envelope *copyEnvelope(envelope *env);
575 extern void _initDone(void);
576
577 //methods for updating location
578 void informLocationHome(CkGroupID mgrID,CkArrayIndexMax idx,int homePE,int currentPE);
579
580 //handlers for updating locations
581 void _receiveLocationHandler(CurrentLocationMsg *data);
582
583 //globals for updating locations
584 extern int _receiveLocationHandlerIdx;
585
586
587 extern "C" void CmiDeliverRemoteMsgHandlerRange(int lowerHandler,int higherHandler);
588 inline void processRemoteMlogMessages(){
589         CmiDeliverRemoteMsgHandlerRange(_ticketRequestHandlerIdx,_receiveLocationHandlerIdx);
590 };
591
592 #endif