Rename the majority of remaining C files in the RTS to C++
[charm.git] / src / conv-ldb / cldb.spray.C
1 #include "converse.h"
2 #include "queueing.h"
3 #include "cldb.h"
4 #include <time.h>
5 #include <stdlib.h>
6 #include <math.h>
7
8 void LoadNotifyFn(int l)
9 {
10 }
11
12 const char *CldGetStrategy(void)
13 {
14   return "spray";
15 }
16
17 #define CYCLE_MILLISECONDS 500
18 #define DEBUGGING_OUTPUT 0
19
20 typedef struct 
21 {
22   int mype;
23   int EnqueueHandler;
24   int ReduceHandler;
25   int AverageHandler;
26   int HopHandler;
27   double load_reported;
28   double load_total;
29   int    load_count;
30   int    spantree_parent;
31   int    spantree_children;
32   int    spantree_root;
33   int    rebalance;
34 }
35 peinfo;
36
37 CpvStaticDeclare(peinfo, peinf);
38
39 struct loadmsg {
40   char core[CmiMsgHeaderSizeBytes];
41   double load_total;
42 };
43
44 struct reqmsg {
45   char core[CmiMsgHeaderSizeBytes];
46 };
47
48 void CldPropagateLoad(double load);
49
50 int CldEstimate(void)
51 {
52   return CldLoad();
53 }
54
55 void CldInitiateReduction(void)
56 {
57   double load = CldEstimate();
58   peinfo *pinf = &(CpvAccess(peinf));
59   pinf->load_reported = load;
60   CldPropagateLoad(load);
61 }
62
63 void CldPropagateLoad(double load)
64 {
65   struct loadmsg msg;
66   peinfo *pinf = &(CpvAccess(peinf));
67   pinf->load_total += load;
68   pinf->load_count ++;
69   if (pinf->load_count == pinf->spantree_children + 1) {
70     msg.load_total   = pinf->load_total;
71     if (pinf->mype == pinf->spantree_root) {
72       if (DEBUGGING_OUTPUT) CmiPrintf("---\n");
73       CmiSetHandler(&msg, pinf->AverageHandler);
74       CmiSyncBroadcastAll(sizeof(msg), &msg);
75     } else {
76       CmiSetHandler(&msg, pinf->ReduceHandler);
77       CmiSyncSend(pinf->spantree_parent, sizeof(msg), &msg);
78     }
79     pinf->load_total = 0;
80     pinf->load_count = 0;
81   }
82 }
83
84 void CldReduceHandler(struct loadmsg *msg)
85 {
86   CldPropagateLoad(msg->load_total);
87   CmiFree(msg);
88 }
89
90 void CldAverageHandler(struct loadmsg *msg)
91 {
92   peinfo *pinf = &(CpvAccess(peinf));
93   double load = CldEstimate();
94   double average = (msg->load_total / CmiNumPes());
95   int rebalance;
96   if (load < (average+10) * 1.2) rebalance=0;
97   else rebalance = (int)(load - average);
98   if (DEBUGGING_OUTPUT)
99     CmiPrintf("PE %d load=%6d average=%6d rebalance=%d\n", 
100               CmiMyPe(), CldEstimate(), (int)average, rebalance);
101   pinf->rebalance = rebalance;
102   CmiFree(msg);
103   CcdCallFnAfter((CcdVoidFn)CldInitiateReduction, 0, CYCLE_MILLISECONDS);
104 }
105
106 void CldEnqueueHandler(char *msg)
107 {
108   int len, queueing, priobits; unsigned int *prioptr;
109   CldInfoFn ifn; CldPackFn pfn;
110   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
111   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
112   CmiSetHandler(msg, CmiGetXHandler(msg));
113   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
114 }
115
116 void CldHopHandler(char *msg)
117 {
118   peinfo *pinf = &(CpvAccess(peinf));
119   int len, queueing, priobits; unsigned int *prioptr;
120   CldInfoFn ifn; CldPackFn pfn; int pe;
121
122   if (pinf->rebalance) {
123     /* do pe = ((lrand48()&0x7FFFFFFF)%CmiNumPes()); */
124     do pe = ((CrnRand()&0x7FFFFFFF)%CmiNumPes());
125     while (pe == pinf->mype);
126     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
127     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
128     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
129       pfn(&msg);
130       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
131     }
132     CmiSyncSendAndFree(pe, len, msg);
133     pinf->rebalance--;
134   } else {
135     CmiSetHandler(msg, CmiGetXHandler(msg));
136     CmiHandleMessage(msg);
137   }
138 }
139
140 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
141 {
142   int npes, *pes;
143   int len, queueing, priobits,i; unsigned int *prioptr;
144   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
145   peinfo *pinf = &(CpvAccess(peinf));
146   CldPackFn pfn;
147   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
148   if (pfn) {
149     pfn(&msg);
150     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
151   }
152   CmiSetInfo(msg,infofn);
153   CmiSetXHandler(msg, CmiGetHandler(msg));
154   CmiSetHandler(msg, pinf->EnqueueHandler);
155   CmiLookupGroup(grp, &npes, &pes);
156   for(i=0;i<npes;i++) {
157     CmiSyncSend(pes[i], len, msg);
158   }
159   CmiFree(msg);
160 }
161
162 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
163 {
164   int len, queueing, priobits,i; unsigned int *prioptr;
165   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
166   peinfo *pinf = &(CpvAccess(peinf));
167   CldPackFn pfn;
168   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
169   if (pfn) {
170     pfn(&msg);
171     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
172   }
173   CmiSetInfo(msg,infofn);
174   CmiSetXHandler(msg, CmiGetHandler(msg));
175   CmiSetHandler(msg, pinf->EnqueueHandler);
176   for(i=0;i<npes;i++) {
177     CmiSyncSend(pes[i], len, msg);
178   }
179   CmiFree(msg);
180 }
181
182 void CldEnqueue(int pe, void *msg, int infofn)
183 {
184   int len, queueing, priobits; unsigned int *prioptr;
185   CldInfoFn ifn; CldPackFn pfn;
186   peinfo *pinf = &(CpvAccess(peinf));
187   ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
188   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
189   if (pe != CLD_ANYWHERE) {
190     if (pfn && (CmiNodeOf(pe) != CmiMyNode())) {
191       pfn(&msg);
192       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
193     }
194     CmiSetInfo(msg, infofn);
195     CmiSetXHandler(msg, CmiGetHandler(msg));
196     CmiSetHandler(msg, pinf->EnqueueHandler);
197     if (pe==CLD_BROADCAST) CmiSyncBroadcastAndFree(len, msg);
198     else if (pe==CLD_BROADCAST_ALL) CmiSyncBroadcastAllAndFree(len, msg);
199     else CmiSyncSendAndFree(pe, len, msg);
200   } else {
201     CmiSetInfo(msg, infofn);
202     CmiSetXHandler(msg, CmiGetHandler(msg));
203     CmiSetHandler(msg, pinf->HopHandler);
204     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
205   }
206 }
207
208 void CldNodeEnqueue(int node, void *msg, int infofn)
209 {
210   int len, queueing, priobits; unsigned int *prioptr;
211   CldInfoFn ifn; CldPackFn pfn;
212   peinfo *pinf = &(CpvAccess(peinf));
213   ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
214   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
215   if (node != CLD_ANYWHERE) {
216     if (pfn && (node != CmiMyNode())) {
217       pfn(&msg);
218       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
219     }
220     CmiSetInfo(msg, infofn);
221     CmiSetXHandler(msg, CmiGetHandler(msg));
222     CmiSetHandler(msg, pinf->EnqueueHandler);
223     if (node==CLD_BROADCAST) CmiSyncNodeBroadcastAndFree(len, msg);
224     else if (node==CLD_BROADCAST_ALL) CmiSyncNodeBroadcastAllAndFree(len, msg);
225     else CmiSyncNodeSendAndFree(node, len, msg);
226   } else {
227     CmiSetInfo(msg, infofn);
228     CmiSetXHandler(msg, CmiGetHandler(msg));
229     CmiSetHandler(msg, pinf->HopHandler);
230     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
231   }
232 }
233
234 void CldModuleInit(char **argv)
235 {
236   peinfo *pinf;
237   CpvInitialize(peinfo, peinf);
238   /* srand48(time(0)+CmiMyPe()); */
239   CrnSrand((int) (time(0)+CmiMyPe()));
240   pinf = &CpvAccess(peinf);
241   pinf->mype = CmiMyPe();
242   pinf->EnqueueHandler = CmiRegisterHandler((CmiHandler)CldEnqueueHandler);
243   pinf->ReduceHandler  = CmiRegisterHandler((CmiHandler)CldReduceHandler);
244   pinf->AverageHandler = CmiRegisterHandler((CmiHandler)CldAverageHandler);
245   pinf->HopHandler     = CmiRegisterHandler((CmiHandler)CldHopHandler);
246   pinf->load_total = 0.0;
247   pinf->load_count = 0;
248   pinf->spantree_children = CmiNumSpanTreeChildren(CmiMyPe());
249   pinf->spantree_parent = CmiSpanTreeParent(CmiMyPe());
250   pinf->spantree_root = 0;
251   pinf->rebalance = 0;
252   CldModuleGeneralInit(argv);
253   CldInitiateReduction();
254 }
255 void CldCallback(void)
256 {}