Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / conv-ldb / cldb.spray.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7 #include "converse.h"
8 #include "queueing.h"
9 #include "cldb.h"
10 #include <time.h>
11 #include <stdlib.h>
12 #include <math.h>
13
14 void LoadNotifyFn(int l)
15 {
16 }
17
18 char *CldGetStrategy(void)
19 {
20   return "spray";
21 }
22
23 #define CYCLE_MILLISECONDS 500
24 #define DEBUGGING_OUTPUT 0
25
26 typedef struct 
27 {
28   int mype;
29   int EnqueueHandler;
30   int ReduceHandler;
31   int AverageHandler;
32   int HopHandler;
33   double load_reported;
34   double load_total;
35   int    load_count;
36   int    spantree_parent;
37   int    spantree_children;
38   int    spantree_root;
39   int    rebalance;
40 }
41 peinfo;
42
43 CpvStaticDeclare(peinfo, peinf);
44
45 struct loadmsg {
46   char core[CmiMsgHeaderSizeBytes];
47   double load_total;
48 };
49
50 struct reqmsg {
51   char core[CmiMsgHeaderSizeBytes];
52 };
53
54 void CldPropagateLoad(double load);
55
56 int CldEstimate(void)
57 {
58   return CldLoad();
59 }
60
61 void CldInitiateReduction()
62 {
63   double load = CldEstimate();
64   peinfo *pinf = &(CpvAccess(peinf));
65   pinf->load_reported = load;
66   CldPropagateLoad(load);
67 }
68
69 void CldPropagateLoad(double load)
70 {
71   struct loadmsg msg;
72   peinfo *pinf = &(CpvAccess(peinf));
73   pinf->load_total += load;
74   pinf->load_count ++;
75   if (pinf->load_count == pinf->spantree_children + 1) {
76     msg.load_total   = pinf->load_total;
77     if (pinf->mype == pinf->spantree_root) {
78       if (DEBUGGING_OUTPUT) CmiPrintf("---\n");
79       CmiSetHandler(&msg, pinf->AverageHandler);
80       CmiSyncBroadcastAll(sizeof(msg), &msg);
81     } else {
82       CmiSetHandler(&msg, pinf->ReduceHandler);
83       CmiSyncSend(pinf->spantree_parent, sizeof(msg), &msg);
84     }
85     pinf->load_total = 0;
86     pinf->load_count = 0;
87   }
88 }
89
90 void CldReduceHandler(struct loadmsg *msg)
91 {
92   CldPropagateLoad(msg->load_total);
93   CmiFree(msg);
94 }
95
96 void CldAverageHandler(struct loadmsg *msg)
97 {
98   peinfo *pinf = &(CpvAccess(peinf));
99   double load = CldEstimate();
100   double average = (msg->load_total / CmiNumPes());
101   int rebalance;
102   if (load < (average+10) * 1.2) rebalance=0;
103   else rebalance = (int)(load - average);
104   if (DEBUGGING_OUTPUT)
105     CmiPrintf("PE %d load=%6d average=%6d rebalance=%d\n", 
106               CmiMyPe(), CldEstimate(), (int)average, rebalance);
107   pinf->rebalance = rebalance;
108   CmiFree(msg);
109   CcdCallFnAfter((CcdVoidFn)CldInitiateReduction, 0, CYCLE_MILLISECONDS);
110 }
111
112 void CldEnqueueHandler(char *msg)
113 {
114   int len, queueing, priobits; unsigned int *prioptr;
115   CldInfoFn ifn; CldPackFn pfn;
116   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
117   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
118   CmiSetHandler(msg, CmiGetXHandler(msg));
119   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
120 }
121
122 void CldHopHandler(char *msg)
123 {
124   peinfo *pinf = &(CpvAccess(peinf));
125   int len, queueing, priobits; unsigned int *prioptr;
126   CldInfoFn ifn; CldPackFn pfn; int pe;
127
128   if (pinf->rebalance) {
129     /* do pe = ((lrand48()&0x7FFFFFFF)%CmiNumPes()); */
130     do pe = ((CrnRand()&0x7FFFFFFF)%CmiNumPes());
131     while (pe == pinf->mype);
132     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
133     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
134     if (pfn && CmiNodeOf(pe) != CmiMyNode()) {
135       pfn(&msg);
136       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
137     }
138     CmiSyncSendAndFree(pe, len, msg);
139     pinf->rebalance--;
140   } else {
141     CmiSetHandler(msg, CmiGetXHandler(msg));
142     CmiHandleMessage(msg);
143   }
144 }
145
146 void CldEnqueueGroup(CmiGroup grp, void *msg, int infofn)
147 {
148   int npes, *pes;
149   int len, queueing, priobits,i; unsigned int *prioptr;
150   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
151   peinfo *pinf = &(CpvAccess(peinf));
152   CldPackFn pfn;
153   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
154   if (pfn) {
155     pfn(&msg);
156     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
157   }
158   CmiSetInfo(msg,infofn);
159   CmiSetXHandler(msg, CmiGetHandler(msg));
160   CmiSetHandler(msg, pinf->EnqueueHandler);
161   CmiLookupGroup(grp, &npes, &pes);
162   for(i=0;i<npes;i++) {
163     CmiSyncSend(pes[i], len, msg);
164   }
165   CmiFree(msg);
166 }
167
168 void CldEnqueueMulti(int npes, int *pes, void *msg, int infofn)
169 {
170   int len, queueing, priobits,i; unsigned int *prioptr;
171   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
172   peinfo *pinf = &(CpvAccess(peinf));
173   CldPackFn pfn;
174   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
175   if (pfn) {
176     pfn(&msg);
177     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
178   }
179   CmiSetInfo(msg,infofn);
180   CmiSetXHandler(msg, CmiGetHandler(msg));
181   CmiSetHandler(msg, pinf->EnqueueHandler);
182   for(i=0;i<npes;i++) {
183     CmiSyncSend(pes[i], len, msg);
184   }
185   CmiFree(msg);
186 }
187
188 void CldEnqueue(int pe, void *msg, int infofn)
189 {
190   int len, queueing, priobits; unsigned int *prioptr;
191   CldInfoFn ifn; CldPackFn pfn;
192   peinfo *pinf = &(CpvAccess(peinf));
193   ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
194   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
195   if (pe != CLD_ANYWHERE) {
196     if (pfn && (CmiNodeOf(pe) != CmiMyNode())) {
197       pfn(&msg);
198       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
199     }
200     CmiSetInfo(msg, infofn);
201     CmiSetXHandler(msg, CmiGetHandler(msg));
202     CmiSetHandler(msg, pinf->EnqueueHandler);
203     if (pe==CLD_BROADCAST) CmiSyncBroadcastAndFree(len, msg);
204     else if (pe==CLD_BROADCAST_ALL) CmiSyncBroadcastAllAndFree(len, msg);
205     else CmiSyncSendAndFree(pe, len, msg);
206   } else {
207     CmiSetInfo(msg, infofn);
208     CmiSetXHandler(msg, CmiGetHandler(msg));
209     CmiSetHandler(msg, pinf->HopHandler);
210     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
211   }
212 }
213
214 void CldNodeEnqueue(int node, void *msg, int infofn)
215 {
216   int len, queueing, priobits; unsigned int *prioptr;
217   CldInfoFn ifn; CldPackFn pfn;
218   peinfo *pinf = &(CpvAccess(peinf));
219   ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
220   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
221   if (node != CLD_ANYWHERE) {
222     if (pfn && (node != CmiMyNode())) {
223       pfn(&msg);
224       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
225     }
226     CmiSetInfo(msg, infofn);
227     CmiSetXHandler(msg, CmiGetHandler(msg));
228     CmiSetHandler(msg, pinf->EnqueueHandler);
229     if (node==CLD_BROADCAST) CmiSyncNodeBroadcastAndFree(len, msg);
230     else if (node==CLD_BROADCAST_ALL) CmiSyncNodeBroadcastAllAndFree(len, msg);
231     else CmiSyncNodeSendAndFree(node, len, msg);
232   } else {
233     CmiSetInfo(msg, infofn);
234     CmiSetXHandler(msg, CmiGetHandler(msg));
235     CmiSetHandler(msg, pinf->HopHandler);
236     CsdNodeEnqueueGeneral(msg, queueing, priobits, prioptr);
237   }
238 }
239
240 void CldModuleInit(char **argv)
241 {
242   peinfo *pinf;
243   CpvInitialize(peinfo, peinf);
244   /* srand48(time(0)+CmiMyPe()); */
245   CrnSrand((int) (time(0)+CmiMyPe()));
246   pinf = &CpvAccess(peinf);
247   pinf->mype = CmiMyPe();
248   pinf->EnqueueHandler = CmiRegisterHandler((CmiHandler)CldEnqueueHandler);
249   pinf->ReduceHandler  = CmiRegisterHandler((CmiHandler)CldReduceHandler);
250   pinf->AverageHandler = CmiRegisterHandler((CmiHandler)CldAverageHandler);
251   pinf->HopHandler     = CmiRegisterHandler((CmiHandler)CldHopHandler);
252   pinf->load_total = 0.0;
253   pinf->load_count = 0;
254   pinf->spantree_children = CmiNumSpanTreeChildren(CmiMyPe());
255   pinf->spantree_parent = CmiSpanTreeParent(CmiMyPe());
256   pinf->spantree_root = 0;
257   pinf->rebalance = 0;
258   CldModuleGeneralInit(argv);
259   CldInitiateReduction();
260 }
261 void CldCallback()
262 {}