Made changes to account for new extended headers.
[charm.git] / src / conv-ldb / cldb.c
1 #include "converse.h"
2 /* ITEMS 1-3 below are no longer true.
3  * How to write a load-balancer:
4  *
5  * 1. Every load-balancer must contain a definition of struct CldField.
6  *    This structure describes what kind of data will be piggybacked on
7  *    the messages that go through the load balancer.  The structure
8  *    must include certain predefined fields.  Put the word
9  *    CLD_STANDARD_FIELD_STUFF at the front of the struct definition
10  *    to include these predefined fields.
11  *
12  * 2. Every load-balancer must contain a definition of the global variable
13  *    Cld_fieldsize.  You must initialize this to sizeof(struct CldField).
14  *    This is not a CPV or CSV variable, it's a plain old C global.
15  *
16  * 3. When you send a message, you'll probably want to temporarily
17  *    switch the handler.  The following function will switch the handler
18  *    while saving the old one:
19  *
20  *       CldSwitchHandler(msg, field, newhandler);
21  *
22  *    Field must be a pointer to the gap in the message where the CldField
23  *    is to be stored.  The switch routine will use this region to store
24  *    the old handler, as well as some other stuff.  When the message
25  *    gets handled, you can switch the handler back like this:
26  *    
27  *       CldRestoreHandler(msg, &field);
28  *
29  *    This will not only restore the handler, it will also tell you
30  *    where in the message the CldField was stored.
31  *
32  * 4. Don't forget that CldEnqueue must support directed transmission of
33  *    messages as well as undirected, and broadcasts too.
34  *
35  */
36
37 int CldRegisterInfoFn(CldInfoFn fn)
38 {
39   return CmiRegisterHandler((CmiHandler)fn);
40 }
41
42 int CldRegisterPackFn(CldPackFn fn)
43 {
44   return CmiRegisterHandler((CmiHandler)fn);
45 }
46
47 /* CldSwitchHandler takes a message and a new handler number.  It
48  * changes the handler number to the new handler number and move the
49  * old to the Xhandler part of the header.  When the message gets
50  * handled, the handler should call CldRestoreHandler to put the old
51  * handler back.
52  *
53  * These next subroutines are balanced on a thin wire.  They're
54  * correct, but the slightest disturbance in the offsets could break them.
55  * */
56
57 void CldSwitchHandler(char *cmsg, int handler)
58 {
59   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
60   CmiSetHandler(cmsg, handler);
61 }
62
63 void CldRestoreHandler(char *cmsg)
64 {
65   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
66 }
67
68 /* CldPutToken puts a message in the scheduler queue in such a way
69  * that it can be retreived from the queue.  Once the message gets
70  * handled, it can no longer be retreived.  CldGetToken removes a
71  * message that was placed in the scheduler queue in this way.
72  * CldCountTokens tells you how many tokens are currently retreivable.
73  *
74  * Caution: these functions are using the function "CmiReference"
75  * which I just added to the Cmi memory allocator (it increases the
76  * reference count field, making it possible to free the memory
77  * twice.)  I'm not sure how well this is going to work.  I need this
78  * because the message should not be freed until it's out of the
79  * scheduler queue AND out of the user's hands.  It needs to stay
80  * around while it's in the scheduler queue because it may contain
81  * a priority.
82  *
83  */
84
85 void Cldhandler(void *);
86  
87 typedef struct CldToken_s {
88   char msg_header[CmiMsgHeaderSizeBytes];
89   void *msg;  /* if null, message already removed */
90   struct CldToken_s *pred;
91   struct CldToken_s *succ;
92 } *CldToken;
93
94 typedef struct CldProcInfo_s {
95   int tokenhandleridx;
96   int load; /* number of items in doubly-linked circle besides sentinel */
97   CldToken sentinel;
98 } *CldProcInfo;
99
100 CpvDeclare(CldProcInfo, CldProc);
101
102 static void CldTokenHandler(CldToken tok)
103 {
104   CldProcInfo proc = CpvAccess(CldProc);
105   CldToken pred, succ;
106   if (tok->pred) {
107     tok->pred->succ = tok->succ;
108     tok->succ->pred = tok->pred;
109     proc->load --;
110     CmiHandleMessage(tok->msg);
111   } else {
112     /* CmiFree(tok->msg); */
113   }
114 }
115
116 int CldCountTokens()
117 {
118   CldProcInfo proc = CpvAccess(CldProc);
119   return proc->load;
120 }
121
122 void CldPutToken(void *msg)
123 {
124   CldProcInfo proc = CpvAccess(CldProc);
125   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
126   CldToken tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
127   int len, queueing, priobits; unsigned int *prioptr;
128   
129   tok->msg = msg;
130
131   /* add token to the doubly-linked circle */
132   tok->pred = proc->sentinel->pred;
133   tok->succ = proc->sentinel;
134   tok->pred->succ = tok;
135   tok->succ->pred = tok;
136   proc->load ++;
137   
138   /* add token to the scheduler */
139   CmiSetHandler(tok, proc->tokenhandleridx);
140   ifn(msg, &len, &queueing, &priobits, &prioptr);
141   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
142 }
143
144 void CldGetToken(void **msg)
145 {
146   CldProcInfo proc = CpvAccess(CldProc);
147   CldToken tok;
148   tok = proc->sentinel->succ;
149   if (tok == proc->sentinel) {
150     *msg = 0; return;
151   }
152   tok->pred->succ = tok->succ;
153   tok->succ->pred = tok->pred;
154   tok->succ = 0;
155   tok->pred = 0;
156   proc->load --;
157   *msg = tok->msg;
158   CmiReference(*msg);
159 }
160
161 void CldModuleGeneralInit()
162 {
163   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
164   CldProcInfo proc;
165
166   CpvInitialize(CldProcInfo, CldProc);
167   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
168   proc = CpvAccess(CldProc);
169   proc->load = 0;
170   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
171   proc->sentinel = sentinel;
172   sentinel->succ = sentinel;
173   sentinel->pred = sentinel;
174 }