Several fixes to CldGeneralModuleInit and CldGetToken made.
[charm.git] / src / conv-ldb / cldb.c
1 #include "converse.h"
2 /*
3  * How to write a load-balancer:
4  *
5  * 1. Every load-balancer must contain a definition of struct CldField.
6  *    This structure describes what kind of data will be piggybacked on
7  *    the messages that go through the load balancer.  The structure
8  *    must include certain predefined fields.  Put the word
9  *    CLD_STANDARD_FIELD_STUFF at the front of the struct definition
10  *    to include these predefined fields.
11  *
12  * 2. Every load-balancer must contain a definition of the global variable
13  *    Cld_fieldsize.  You must initialize this to sizeof(struct CldField).
14  *    This is not a CPV or CSV variable, it's a plain old C global.
15  *
16  * 3. When you send a message, you'll probably want to temporarily
17  *    switch the handler.  The following function will switch the handler
18  *    while saving the old one:
19  *
20  *       CldSwitchHandler(msg, field, newhandler);
21  *
22  *    Field must be a pointer to the gap in the message where the CldField
23  *    is to be stored.  The switch routine will use this region to store
24  *    the old handler, as well as some other stuff.  When the message
25  *    gets handled, you can switch the handler back like this:
26  *    
27  *       CldRestoreHandler(msg, &field);
28  *
29  *    This will not only restore the handler, it will also tell you
30  *    where in the message the CldField was stored.
31  *
32  * 4. Don't forget that CldEnqueue must support directed transmission of
33  *    messages as well as undirected, and broadcasts too.
34  *
35  */
36
37 int CldRegisterInfoFn(CldInfoFn fn)
38 {
39   return CmiRegisterHandler((CmiHandler)fn);
40 }
41
42 int CldRegisterPackFn(CldPackFn fn)
43 {
44   return CmiRegisterHandler((CmiHandler)fn);
45 }
46
47 /*
48  * CldSwitchHandler takes a message, a pointer to its CLD field, and
49  * a new handler number.  It changes the handler number and records the
50  * location of the CLD field.  When the message gets handled, the handler
51  * should call CldRestoreHandler to put the old handler back, and get
52  * a pointer to the CLD field which was recorded.
53  *
54  * These next subroutines are balanced on a thin wire.  They're
55  * correct, but the slightest disturbance in the offsets could break them.
56  *
57  */
58
59 void CldSwitchHandler(char *cmsg, int *field, int handler)
60 {
61   int *data = (int*)(cmsg+CmiMsgHeaderSizeBytes);
62   field[1] = CmiGetHandler(cmsg);
63   field[0] = data[0];
64   data[0] = ((char*)field)-cmsg;
65   CmiSetHandler(cmsg, handler);
66 }
67
68 void CldRestoreHandler(char *cmsg, void *hfield)
69 {
70   int *data = (int*)(cmsg+CmiMsgHeaderSizeBytes);
71   int offs = data[0];
72   int *field = (int*)(cmsg+offs);
73   data[0] = field[0];
74   CmiSetHandler(cmsg, field[1]);
75   *(int**)hfield = field;
76 }
77
78 /* CldPutToken puts a message in the scheduler queue in such a way
79  * that it can be retreived from the queue.  Once the message gets
80  * handled, it can no longer be retreived.  CldGetToken removes a
81  * message that was placed in the scheduler queue in this way.
82  * CldCountTokens tells you how many tokens are currently retreivable.
83  *
84  * Caution: these functions are using the function "CmiReference"
85  * which I just added to the Cmi memory allocator (it increases the
86  * reference count field, making it possible to free the memory
87  * twice.)  I'm not sure how well this is going to work.  I need this
88  * because the message should not be freed until it's out of the
89  * scheduler queue AND out of the user's hands.  It needs to stay
90  * around while it's in the scheduler queue because it may contain
91  * a priority.
92  *
93  */
94
95 void Cldhandler(void *);
96  
97 typedef struct CldToken_s {
98   char msg_header[CmiMsgHeaderSizeBytes];
99   void *msg;  /* if null, message already removed */
100   int infofn;
101   int packfn;
102   struct CldToken_s *pred;
103   struct CldToken_s *succ;
104 } *CldToken;
105
106 typedef struct CldProcInfo_s {
107   int tokenhandleridx;
108   int load; /* number of items in doubly-linked circle besides sentinel */
109   CldToken sentinel;
110 } *CldProcInfo;
111
112 CpvDeclare(CldProcInfo, CldProc);
113
114 static void CldTokenHandler(CldToken tok)
115 {
116   CldProcInfo proc = CpvAccess(CldProc);
117   CldToken pred, succ;
118   if (tok->pred) {
119     tok->pred->succ = tok->succ;
120     tok->succ->pred = tok->pred;
121     proc->load --;
122     CmiHandleMessage(tok->msg);
123   } else {
124     /* CmiFree(tok->msg); */
125   }
126 }
127
128 int CldCountTokens()
129 {
130   CldProcInfo proc = CpvAccess(CldProc);
131   return proc->load;
132 }
133
134 void CldPutToken(void *msg, int infofn, int packfn)
135 {
136   CldProcInfo proc = CpvAccess(CldProc);
137   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
138   CldToken tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
139   int len, queueing, priobits; unsigned int *prioptr; void *ldbfield;
140   
141   tok->msg = msg;
142   tok->infofn = infofn;
143   tok->packfn = packfn;
144
145   /* add token to the doubly-linked circle */
146   tok->pred = proc->sentinel->pred;
147   tok->succ = proc->sentinel;
148   tok->pred->succ = tok;
149   tok->succ->pred = tok;
150   proc->load ++;
151   
152   /* add token to the scheduler */
153   CmiSetHandler(tok, proc->tokenhandleridx);
154   ifn(msg, &len, &ldbfield, &queueing, &priobits, &prioptr);
155   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
156 }
157
158 void CldGetToken(void **msg, int *infofn, int *packfn)
159 {
160   CldProcInfo proc = CpvAccess(CldProc);
161   CldToken tok;
162   tok = proc->sentinel->succ;
163   if (tok == proc->sentinel) {
164     *infofn = 0; *packfn = 0;
165     *msg = 0; return;
166   }
167   tok->pred->succ = tok->succ;
168   tok->succ->pred = tok->pred;
169   tok->succ = 0;
170   tok->pred = 0;
171   proc->load --;
172   *msg = tok->msg;
173   CmiReference(*msg);
174   *infofn = tok->infofn;
175   *packfn = tok->packfn;
176 }
177
178 void CldModuleGeneralInit()
179 {
180   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
181   CldProcInfo proc;
182
183   CpvInitialize(CldProcInfo, CldProc);
184   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
185   proc = CpvAccess(CldProc);
186   proc->load = 0;
187   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
188   proc->sentinel = sentinel;
189   sentinel->succ = sentinel;
190   sentinel->pred = sentinel;
191 }