5cc283f68be1d56cee5b7ff46bab078fb13fd2d7
[charm.git] / src / conv-ldb / cldb.c
1 #include "cldb.h"
2
3 CpvDeclare(int, CldHandlerIndex);
4 CpvDeclare(int, CldBalanceHandlerIndex);
5
6 CpvDeclare(int, CldRelocatedMessages);
7 CpvDeclare(int, CldLoadBalanceMessages);
8 CpvDeclare(int, CldMessageChunks);
9
10 /* Estimator stuff.  Of any use? */
11 /*
12 CpvStaticDeclare(CldEstimatorTable, _estfns);
13 */
14 void CldRegisterEstimator(CldEstimator fn)
15 {
16   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
17 }
18
19 /* 
20 int CldEstimate(void)
21 {
22   CldEstimatorTable *estab = &(CpvAccess(_estfns));
23   int i, load=0;
24   for(i=0; i<estab->count; i++)
25     load += (*(estab->fns[i]))();
26   return load;
27 }
28
29 static int CsdEstimator(void)
30 {
31   return CsdLength();
32 }
33 */
34
35 CpvDeclare(int, CldLoadOffset);
36
37 int CldRegisterInfoFn(CldInfoFn fn)
38 {
39   return CmiRegisterHandler((CmiHandler)fn);
40 }
41
42 int CldRegisterPackFn(CldPackFn fn)
43 {
44   return CmiRegisterHandler((CmiHandler)fn);
45 }
46
47 /* CldSwitchHandler takes a message and a new handler number.  It
48  * changes the handler number to the new handler number and move the
49  * old to the Xhandler part of the header.  When the message gets
50  * handled, the handler should call CldRestoreHandler to put the old
51  * handler back.
52  *
53  * CldPutToken puts a message in the scheduler queue in such a way
54  * that it can be retreived from the queue.  Once the message gets
55  * handled, it can no longer be retreived.  CldGetToken removes a
56  * message that was placed in the scheduler queue in this way.
57  * CldCountTokens tells you how many tokens are currently retreivable.  
58 */
59
60 void CldSwitchHandler(char *cmsg, int handler)
61 {
62   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
63   CmiSetHandler(cmsg, handler);
64 }
65
66 void CldRestoreHandler(char *cmsg)
67 {
68   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
69 }
70
71 void Cldhandler(char *);
72  
73 typedef struct CldToken_s {
74   char msg_header[CmiMsgHeaderSizeBytes];
75   char *msg;  /* if null, message already removed */
76   struct CldToken_s *pred;
77   struct CldToken_s *succ;
78 } *CldToken;
79
80 typedef struct CldProcInfo_s {
81   int tokenhandleridx;
82   int load; /* number of items in doubly-linked circle besides sentinel */
83   CldToken sentinel;
84 } *CldProcInfo;
85
86 CpvDeclare(CldProcInfo, CldProc);
87
88 static void CldTokenHandler(CldToken tok)
89 {
90   CldProcInfo proc = CpvAccess(CldProc);
91   if (tok->msg) {
92     tok->pred->succ = tok->succ;
93     tok->succ->pred = tok->pred;
94     proc->load --;
95     CmiHandleMessage(tok->msg);
96   }
97   else 
98     CpvAccess(CldLoadOffset)--;
99 }
100
101 int CldCountTokens()
102 {
103   return (CpvAccess(CldProc)->load);
104 }
105
106 int CldLoad()
107 {
108   return (CsdLength() - CpvAccess(CldLoadOffset));
109 }
110
111 void CldPutToken(char *msg)
112 {
113   CldProcInfo proc = CpvAccess(CldProc);
114   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
115   CldToken tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
116   int len, queueing, priobits; unsigned int *prioptr;
117   CldPackFn pfn;
118
119   tok->msg = msg;
120
121   /* add token to the doubly-linked circle */
122   tok->pred = proc->sentinel->pred;
123   tok->succ = proc->sentinel;
124   tok->pred->succ = tok;
125   tok->succ->pred = tok;
126   proc->load ++;
127   
128   /* add token to the scheduler */
129   CmiSetHandler(tok, proc->tokenhandleridx);
130   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
131
132   queueing = CQS_QUEUEING_LIFO; 
133   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
134 }
135
136 void CldGetToken(char **msg)
137 {
138   CldProcInfo proc = CpvAccess(CldProc);
139   CldToken tok;
140   
141   tok = proc->sentinel->succ;
142   if (tok == proc->sentinel) {
143     *msg = 0; return;
144   }
145   tok->pred->succ = tok->succ;
146   tok->succ->pred = tok->pred;
147   proc->load --;
148   *msg = tok->msg;
149   tok->msg = 0;
150   CpvAccess(CldLoadOffset)++;
151 }
152
153 void CldModuleGeneralInit()
154 {
155   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
156   CldProcInfo proc;
157
158   CpvInitialize(CldProcInfo, CldProc);
159   CpvInitialize(int, CldLoadOffset);
160   CpvAccess(CldLoadOffset) = 0;
161   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
162   proc = CpvAccess(CldProc);
163   proc->load = 0;
164   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
165   proc->sentinel = sentinel;
166   sentinel->succ = sentinel;
167   sentinel->pred = sentinel;
168 }
169
170 void CldMultipleSend(int pe, int numToSend)
171 {
172   char **msgs;
173   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
174   unsigned int *prioptr;
175   CldInfoFn ifn;
176   CldPackFn pfn;
177
178   if (numToSend == 0)
179     return;
180   msgs = (char **)calloc(numToSend, sizeof(char *));
181   msgSizes = (int *)calloc(numToSend, sizeof(int));
182
183   while (!done) {
184     numSent = 0;
185     parcelSize = 0;
186     for (i=0; i<numToSend; i++) {
187       CldGetToken(&msgs[i]);
188       if (msgs[i] != 0) {
189         done = 1;
190         numSent++;
191         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
192         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
193         msgSizes[i] = len;
194         parcelSize += len;
195         CldSwitchHandler(msgs[i], CpvAccess(CldBalanceHandlerIndex));
196       }
197       else {
198         done = 1;
199         break;
200       }
201       if (parcelSize > MAXMSGBFRSIZE) {
202         if(i<numToSend-1)
203           done = 0;
204         numToSend -= numSent;
205         break;
206       }
207     }
208     if (numSent > 1) {
209       CmiMultipleSend(pe, numSent, msgSizes, msgs);
210       for (i=0; i<numSent; i++)
211         CmiFree(msgs[i]);
212       CpvAccess(CldRelocatedMessages) += numSent;
213       CpvAccess(CldMessageChunks)++;
214     }
215     else if (numSent == 1) {
216       CmiSyncSend(pe, msgSizes[0], msgs[0]);
217       CmiFree(msgs[0]);
218       CpvAccess(CldRelocatedMessages)++;
219       CpvAccess(CldMessageChunks)++;
220     }
221   }
222   free(msgs);
223   free(msgSizes);
224 }