Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / conv-com / pipelinestrategy.h
1 /**
2    @addtogroup ComlibConverseStrategy
3    @{
4    @file 
5    @brief A pipeline router strategy. 
6 */
7
8
9 #ifndef PIPELINE_CONVERSE
10 #define PIPELINE_CONVERSE
11 #include "ckhashtable.h"
12 #include "charm++.h"
13 #include "convcomlibstrategy.h"
14 #include "convcomlibmanager.h"
15
16 #define DEFAULT_PIPE   8196
17
18 struct PipelineInfo {
19   short bcastPe;     // pe who is doing the broadcast, used for the hash key
20   short seqNumber;
21   int chunkSize;   // it is the size of the data part of the message (without the converse header)
22   int chunkNumber;
23   int messageSize;   // the entire message size, all included
24   short srcPe;       // pe from whom I'm receiving the message
25 };
26
27 class PipelineHashKey{
28  public:
29
30     int srcPe;
31     int seq;
32     PipelineHashKey(int _pe, int _seq):srcPe(_pe), seq(_seq){};
33
34     //These routines allow PipelineHashKey to be used in
35     //  a CkHashtableT
36     CkHashCode hash(void) const;
37     static CkHashCode staticHash(const void *a,size_t);
38     int compare(const PipelineHashKey &ind) const;
39     static int staticCompare(const void *a,const void *b,size_t);
40 };
41
42 // sequential numbers must be below 2^16, so the number of processors must
43 inline CkHashCode PipelineHashKey::hash(void) const
44 {
45     register int _seq = seq;
46     register int _pe = srcPe;
47     
48     register CkHashCode ret = (_seq << 16) + _pe;
49     return ret;
50 }
51
52 inline int PipelineHashKey::compare(const PipelineHashKey &k2) const
53 {
54     if(seq == k2.seq && srcPe == k2.srcPe)
55         return 1;
56     
57     return 0;
58 }
59
60 class PipelineHashObj{
61  public:
62   char *message;
63   int dimension;
64   int remaining;
65   PipelineHashObj (int dim, int rem, char *msg) :dimension(dim),remaining(rem),message(msg) {};
66
67 };
68
69 typedef const UInt  constUInt;
70 typedef void (*setFunction)(char*, constUInt);
71
72 class PipelineStrategy : public Strategy {
73  protected:
74
75   int pipeSize; // this is the size of the splitted messages, including the converse header
76   //double log_of_2_inv;
77   int seqNumber;
78   CkQ <MessageHolder*> *messageBuf;
79   CkHashtableT<PipelineHashKey, PipelineHashObj *> fragments;
80   int deliverHandle;
81
82  public:
83   PipelineStrategy(int size=DEFAULT_PIPE, Strategy* st=NULL);
84   PipelineStrategy(CkMigrateMessage *) {};
85   int getPipeSize() { return pipeSize; };
86   void commonInit();
87   void deliverer(char *msg, int dim);
88   void storing(char *msg);
89
90   void conversePipeline(char *env, int size, int destination);
91   void insertMessage(MessageHolder *msg);
92   void doneInserting();
93
94   virtual void pup(PUP::er &p);
95   PUPable_decl(PipelineStrategy);
96 };
97
98 #endif
99 /*@}*/