Sample of a very simple load balancer (referred to by LB HowTo).
[charm.git] / src / conv-ldb / cldb.test.c
1 #include <stdio.h>
2 #include "converse.h"
3 #define PERIOD 10
4
5 typedef struct requestmsg_s {
6   char header[CmiMsgHeaderSizeBytes];
7   int pe;
8 } requestmsg;
9
10 CpvDeclare(int, CldHandlerIndex);
11 CpvDeclare(int, CldBalanceHandlerIndex);
12 CpvDeclare(int, CldRelocatedMessages);
13 CpvDeclare(int, CldLoadBalanceMessages);
14 CpvDeclare(int, CldMessageChunks);
15 CpvDeclare(int, CldRequestExists);
16 CpvDeclare(int, CldNoneSentHandlerIndex);
17 CpvDeclare(int, CldRequestResponseHandlerIndex);
18
19 void CldNoneSent(int pe);
20
21 void CldMultipleSend(int pe, int numToSend)
22 {
23   void **msgs;
24   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
25   unsigned int *prioptr;
26   CldInfoFn ifn;
27   CldPackFn pfn;
28
29   if (numToSend == 0)
30     return;
31
32   msgs = (void **)calloc(numToSend, sizeof(void *));
33   msgSizes = (int *)calloc(numToSend, sizeof(int));
34
35   while (!done) {
36     numSent = 0;
37     parcelSize = 0;
38     for (i=0; i<numToSend; i++) {
39       CldGetToken(&msgs[i]);
40       if (msgs[i] != 0) {
41         done = 1;
42         numSent++;
43         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
44         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
45         msgSizes[i] = len;
46         parcelSize += len;
47         CldSwitchHandler(msgs[i], CpvAccess(CldBalanceHandlerIndex));
48       }
49       else {
50         done = 1;
51         break;
52       }
53       if (parcelSize > MAXMSGBFRSIZE) {
54         if (i<numToSend-1)
55           done = 0;
56         numToSend -= numSent;
57         break;
58       }
59     }
60     if (numSent > 1) {
61       CmiMultipleSend(pe, numSent, msgSizes, msgs);
62       for (i=0; i<numSent; i++)
63         CmiFree(msgs[i]);
64       CpvAccess(CldRelocatedMessages) += numSent;
65       CpvAccess(CldMessageChunks)++;
66     }
67     else if (numSent == 1) {
68       CmiSyncSend(pe, msgSizes[0], msgs[0]);
69       CmiFree(msgs[0]);
70       CpvAccess(CldRelocatedMessages)++;
71       CpvAccess(CldMessageChunks)++;
72     }
73   }
74   free(msgs);
75   free(msgSizes);
76 }
77
78 void CldRequestTokens()
79 {
80   int destPe = (CmiMyPe()+1)%CmiNumPes();
81   requestmsg msg;
82
83   msg.pe = CmiMyPe();
84   CmiSetHandler(&msg, CpvAccess(CldRequestResponseHandlerIndex));
85   CmiSyncSend(destPe, sizeof(requestmsg), &msg);
86   CpvAccess(CldLoadBalanceMessages)++;
87   CpvAccess(CldRequestExists) = 0;
88 }
89
90 void CldRequestResponseHandler(requestmsg *msg)
91 {
92   int numToSend;
93
94   numToSend = CsdLength() / 2;
95   if (numToSend > CldCountTokens())
96     numToSend = CldCountTokens();
97
98   if (numToSend > 0)
99     CldMultipleSend(msg->pe, numToSend);
100   else 
101     CldNoneSent(msg->pe);
102   free(msg);
103 }       
104   
105 void CldNoneSent(int pe)
106 {
107   requestmsg msg;
108
109   msg.pe = CmiMyPe();
110   CmiSetHandler(&msg, CpvAccess(CldNoneSentHandlerIndex));
111   CmiSyncSend(pe, sizeof(requestmsg), &msg);
112 }
113
114 void CldNoneSentHandler(requestmsg *msg)
115 {
116   free(msg);
117   if (!CpvAccess(CldRequestExists)) {
118     CpvAccess(CldRequestExists) = 1;
119     CcdCallFnAfter((CcdVoidFn)CldRequestTokens, NULL, PERIOD);
120   }
121 }
122
123 void CldBalanceHandler(void *msg)
124 {
125   CmiGrabBuffer((void **)&msg);
126   CldRestoreHandler(msg);
127   CldPutToken(msg);
128 }
129
130 void CldHandler(void *msg)
131 {
132   CldInfoFn ifn; CldPackFn pfn;
133   int len, queueing, priobits; unsigned int *prioptr;
134   
135   CmiGrabBuffer((void **)&msg);
136   CldRestoreHandler(msg);
137   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
138   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
139   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
140 }
141
142 void CldEnqueue(int pe, void *msg, int infofn)
143 {
144   int len, queueing, priobits; unsigned int *prioptr;
145   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
146   CldPackFn pfn;
147
148   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
149     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
150     CmiSetInfo(msg,infofn);
151     CldPutToken(msg); 
152   } 
153   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
154     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
155     CmiSetInfo(msg,infofn);
156     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
157   }
158   else {
159     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
160     if (pfn) {
161       pfn(&msg);
162       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
163     }
164     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
165     CmiSetInfo(msg,infofn);
166     if (pe==CLD_BROADCAST) 
167       CmiSyncBroadcastAndFree(len, msg);
168     else if (pe==CLD_BROADCAST_ALL)
169       CmiSyncBroadcastAllAndFree(len, msg);
170     else CmiSyncSendAndFree(pe, len, msg);
171   }
172 }
173
174 void CldNotifyIdle()
175 {
176   if (!CpvAccess(CldRequestExists))
177     CldRequestTokens();
178 }
179
180 void CldNotifyBusy()
181 {
182 }
183
184 void CldHelpModuleInit()
185 {
186   CpvInitialize(int, CldRequestExists);
187   CpvInitialize(int, CldBalanceHandlerIndex);
188   CpvInitialize(int, CldNoneSentHandlerIndex);
189   CpvInitialize(int, CldRequestResponseHandlerIndex);
190
191   CpvAccess(CldBalanceHandlerIndex) = 
192     CmiRegisterHandler(CldBalanceHandler);
193   CpvAccess(CldNoneSentHandlerIndex) = 
194     CmiRegisterHandler(CldNoneSentHandler);
195   CpvAccess(CldRequestResponseHandlerIndex) = 
196     CmiRegisterHandler(CldRequestResponseHandler);
197   CpvAccess(CldRequestExists) = 0;
198
199   if (CmiNumPes() > 1) {
200     CldRequestTokens();
201     CsdSetNotifyIdle(CldNotifyIdle, CldNotifyBusy);
202     CsdStartNotifyIdle();
203   }
204 }
205
206 void CldModuleInit()
207 {
208   CpvInitialize(int, CldHandlerIndex);
209   CpvInitialize(int, CldRelocatedMessages);
210   CpvInitialize(int, CldLoadBalanceMessages);
211   CpvInitialize(int, CldMessageChunks);
212   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
213   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
214     CpvAccess(CldMessageChunks) = 0;
215   CldModuleGeneralInit();
216   CldHelpModuleInit();
217 }