Updated cldb with additional functions.
[charm.git] / src / conv-ldb / cldb.c
1 #include "cldb.h"
2
3 /* Estimator stuff.  Of any use? */
4 /*
5 CpvStaticDeclare(CldEstimatorTable, _estfns);
6 */
7 void CldRegisterEstimator(CldEstimator fn)
8 {
9   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
10 }
11
12 /* 
13 int CldEstimate(void)
14 {
15   CldEstimatorTable *estab = &(CpvAccess(_estfns));
16   int i, load=0;
17   for(i=0; i<estab->count; i++)
18     load += (*(estab->fns[i]))();
19   return load;
20 }
21
22 static int CsdEstimator(void)
23 {
24   return CsdLength();
25 }
26 */
27
28 CpvDeclare(int, CldLoadOffset);
29
30 int CldRegisterInfoFn(CldInfoFn fn)
31 {
32   return CmiRegisterHandler((CmiHandler)fn);
33 }
34
35 int CldRegisterPackFn(CldPackFn fn)
36 {
37   return CmiRegisterHandler((CmiHandler)fn);
38 }
39
40 /* CldSwitchHandler takes a message and a new handler number.  It
41  * changes the handler number to the new handler number and move the
42  * old to the Xhandler part of the header.  When the message gets
43  * handled, the handler should call CldRestoreHandler to put the old
44  * handler back.
45  *
46  * CldPutToken puts a message in the scheduler queue in such a way
47  * that it can be retreived from the queue.  Once the message gets
48  * handled, it can no longer be retreived.  CldGetToken removes a
49  * message that was placed in the scheduler queue in this way.
50  * CldCountTokens tells you how many tokens are currently retreivable.  
51 */
52
53 void CldSwitchHandler(char *cmsg, int handler)
54 {
55   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
56   CmiSetHandler(cmsg, handler);
57 }
58
59 void CldRestoreHandler(char *cmsg)
60 {
61   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
62 }
63
64 void Cldhandler(char *);
65  
66 typedef struct CldToken_s {
67   char msg_header[CmiMsgHeaderSizeBytes];
68   char *msg;  /* if null, message already removed */
69   struct CldToken_s *pred;
70   struct CldToken_s *succ;
71 } *CldToken;
72
73 typedef struct CldProcInfo_s {
74   int tokenhandleridx;
75   int load; /* number of items in doubly-linked circle besides sentinel */
76   CldToken sentinel;
77 } *CldProcInfo;
78
79 CpvDeclare(CldProcInfo, CldProc);
80
81 static void CldTokenHandler(CldToken tok)
82 {
83   CldProcInfo proc = CpvAccess(CldProc);
84   CldToken pred, succ;
85   if (tok->msg) {
86     tok->pred->succ = tok->succ;
87     tok->succ->pred = tok->pred;
88     proc->load --;
89     CmiHandleMessage(tok->msg);
90   }
91   else 
92     CpvAccess(CldLoadOffset)--;
93 }
94
95 int CldCountTokens()
96 {
97   return (CpvAccess(CldProc)->load);
98 }
99
100 int CldLoad()
101 {
102   return (CsdLength() - CpvAccess(CldLoadOffset));
103 }
104
105 void CldPutToken(char *msg)
106 {
107   CldProcInfo proc = CpvAccess(CldProc);
108   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
109   CldToken tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
110   int len, queueing, priobits; unsigned int *prioptr;
111   CldPackFn pfn;
112
113   tok->msg = msg;
114
115   /* add token to the doubly-linked circle */
116   tok->pred = proc->sentinel->pred;
117   tok->succ = proc->sentinel;
118   tok->pred->succ = tok;
119   tok->succ->pred = tok;
120   proc->load ++;
121   
122   /* add token to the scheduler */
123   CmiSetHandler(tok, proc->tokenhandleridx);
124   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
125
126   queueing = CQS_QUEUEING_LIFO; 
127   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
128 }
129
130 void CldGetToken(char **msg)
131 {
132   CldProcInfo proc = CpvAccess(CldProc);
133   CldToken tok;
134   
135   tok = proc->sentinel->succ;
136   if (tok == proc->sentinel) {
137     *msg = 0; return;
138   }
139   tok->pred->succ = tok->succ;
140   tok->succ->pred = tok->pred;
141   proc->load --;
142   *msg = tok->msg;
143   tok->msg = 0;
144   CpvAccess(CldLoadOffset)++;
145 }
146
147 void CldModuleGeneralInit()
148 {
149   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
150   CldProcInfo proc;
151
152   CpvInitialize(CldProcInfo, CldProc);
153   CpvInitialize(int, CldLoadOffset);
154   CpvAccess(CldLoadOffset) = 0;
155   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
156   proc = CpvAccess(CldProc);
157   proc->load = 0;
158   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
159   proc->sentinel = sentinel;
160   sentinel->succ = sentinel;
161   sentinel->pred = sentinel;
162 }
163
164 void CldMultipleSend(int pe, int numToSend)
165 {
166   char **msgs;
167   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
168   unsigned int *prioptr;
169   CldInfoFn ifn;
170   CldPackFn pfn;
171
172   if (numToSend == 0)
173     return;
174   msgs = (char **)calloc(numToSend, sizeof(char *));
175   msgSizes = (int *)calloc(numToSend, sizeof(int));
176
177   while (!done) {
178     numSent = 0;
179     parcelSize = 0;
180     for (i=0; i<numToSend; i++) {
181       CldGetToken(&msgs[i]);
182       if (msgs[i] != 0) {
183         done = 1;
184         numSent++;
185         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
186         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
187         msgSizes[i] = len;
188         parcelSize += len;
189         CldSwitchHandler(msgs[i], CpvAccess(CldBalanceHandlerIndex));
190       }
191       else {
192         done = 1;
193         break;
194       }
195       if (parcelSize > MAXMSGBFRSIZE) {
196         if(i<numToSend-1)
197           done = 0;
198         numToSend -= numSent;
199         break;
200       }
201     }
202     if (numSent > 1) {
203       CmiMultipleSend(pe, numSent, msgSizes, msgs);
204       for (i=0; i<numSent; i++)
205         CmiFree(msgs[i]);
206       CpvAccess(CldRelocatedMessages) += numSent;
207       CpvAccess(CldMessageChunks)++;
208     }
209     else if (numSent == 1) {
210       CmiSyncSend(pe, msgSizes[0], msgs[0]);
211       CmiFree(msgs[0]);
212       CpvAccess(CldRelocatedMessages)++;
213       CpvAccess(CldMessageChunks)++;
214     }
215   }
216   free(msgs);
217   free(msgSizes);
218 }