new converse strategy for message pipelining (point-to-point communication)
[charm.git] / src / conv-com / pipelinestrategy.h
1 #ifndef PIPELINE_CONVERSE
2 #define PIPELINE_CONVERSE
3 #include "ckhashtable.h"
4 #include "charm++.h"
5 #include "convcomlibstrategy.h"
6 #include "convcomlibmanager.h"
7
8 #define DEFAULT_PIPE   8196
9
10 struct PipelineInfo {
11   short bcastPe;     // pe who is doing the broadcast, used for the hash key
12   short seqNumber;
13   int chunkSize;   // it is the size of the data part of the message (without the converse header)
14   int chunkNumber;
15   int messageSize;   // the entire message size, all included
16   short srcPe;       // pe from whom I'm receiving the message
17 };
18
19 class PipelineHashKey{
20  public:
21
22     int srcPe;
23     int seq;
24     PipelineHashKey(int _pe, int _seq):srcPe(_pe), seq(_seq){};
25
26     //These routines allow PipelineHashKey to be used in
27     //  a CkHashtableT
28     CkHashCode hash(void) const;
29     static CkHashCode staticHash(const void *a,size_t);
30     int compare(const PipelineHashKey &ind) const;
31     static int staticCompare(const void *a,const void *b,size_t);
32 };
33
34 // sequential numbers must be below 2^16, so the number of processors must
35 inline CkHashCode PipelineHashKey::hash(void) const
36 {
37     register int _seq = seq;
38     register int _pe = srcPe;
39     
40     register CkHashCode ret = (_seq << 16) + _pe;
41     return ret;
42 }
43
44 inline int PipelineHashKey::compare(const PipelineHashKey &k2) const
45 {
46     if(seq == k2.seq && srcPe == k2.srcPe)
47         return 1;
48     
49     return 0;
50 }
51
52 class PipelineHashObj{
53  public:
54   char *message;
55   int dimension;
56   int remaining;
57   PipelineHashObj (int dim, int rem, char *msg) :dimension(dim),remaining(rem),message(msg) {};
58
59 };
60
61 typedef const UInt  constUInt;
62 typedef void (*setFunction)(char*, constUInt);
63
64 class PipelineStrategy : public Strategy {
65  protected:
66
67   int pipeSize; // this is the size of the splitted messages, including the converse header
68   //double log_of_2_inv;
69   int seqNumber;
70   CkQ <MessageHolder*> *messageBuf;
71   CkHashtableT<PipelineHashKey, PipelineHashObj *> fragments;
72   int deliverHandle;
73
74  public:
75   PipelineStrategy(int size=DEFAULT_PIPE, Strategy* st=NULL);
76   PipelineStrategy(CkMigrateMessage *) {};
77   int getPipeSize() { return pipeSize; };
78   void commonInit();
79   void deliverer(char *msg, int dim);
80   void storing(char *msg);
81
82   void conversePipeline(char *env, int size, int destination);
83   void insertMessage(MessageHolder *msg);
84   void doneInserting();
85
86   virtual void pup(PUP::er &p);
87   PUPable_decl(PipelineStrategy);
88 };
89
90 #endif