Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / conv-ldb / cldb.c
1
2 #include <stdlib.h>
3 #include "queueing.h"
4 #include "cldb.h"
5 #include <math.h>
6
7 typedef char *BitVector;
8
9 CpvDeclare(int, CldHandlerIndex);
10 CpvDeclare(BitVector, CldPEBitVector);
11 CpvDeclare(int, CldBalanceHandlerIndex);
12
13 CpvDeclare(int, CldRelocatedMessages);
14 CpvDeclare(int, CldLoadBalanceMessages);
15 CpvDeclare(int, CldMessageChunks);
16 CpvDeclare(int, CldLoadNotify);
17
18 CpvDeclare(CmiNodeLock, cldLock);
19
20 extern void LoadNotifyFn(int);
21
22 char* _lbtopo = "torus_nd_5";
23
24 /* Estimator stuff.  Of any use? */
25 /*
26 CpvStaticDeclare(CldEstimatorTable, _estfns);
27 */
28 void CldRegisterEstimator(CldEstimator fn)
29 {
30   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
31 }
32
33 /* 
34 int CldEstimate(void)
35 {
36   CldEstimatorTable *estab = &(CpvAccess(_estfns));
37   int i, load=0;
38   for(i=0; i<estab->count; i++)
39     load += (*(estab->fns[i]))();
40   return load;
41 }
42
43 static int CsdEstimator(void)
44 {
45   return CsdLength();
46 }
47 */
48
49 CpvDeclare(int, CldLoadOffset);
50
51
52 int CldRegisterInfoFn(CldInfoFn fn)
53 {
54   return CmiRegisterHandler((CmiHandler)fn);
55 }
56
57 int CldRegisterPackFn(CldPackFn fn)
58 {
59   return CmiRegisterHandler((CmiHandler)fn);
60 }
61
62 /* CldSwitchHandler takes a message and a new handler number.  It
63  * changes the handler number to the new handler number and move the
64  * old to the Xhandler part of the header.  When the message gets
65  * handled, the handler should call CldRestoreHandler to put the old
66  * handler back.
67  *
68  * CldPutToken puts a message in the scheduler queue in such a way
69  * that it can be retreived from the queue.  Once the message gets
70  * handled, it can no longer be retreived.  CldGetToken removes a
71  * message that was placed in the scheduler queue in this way.
72  * CldCountTokens tells you how many tokens are currently retreivable.  
73 */
74
75 void CldSwitchHandler(char *cmsg, int handler)
76 {
77 #if CMK_MEM_CHECKPOINT
78   int old_phase = CmiGetRestartPhase(cmsg);
79 #endif
80   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
81   CmiSetHandler(cmsg, handler);
82 #if CMK_MEM_CHECKPOINT
83   CmiGetRestartPhase(cmsg) = old_phase;
84 #endif
85 }
86
87 void CldRestoreHandler(char *cmsg)
88 {
89 #if CMK_MEM_CHECKPOINT
90   int old_phase = CmiGetRestartPhase(cmsg);
91 #endif
92   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
93 #if CMK_MEM_CHECKPOINT
94   CmiGetRestartPhase(cmsg) = old_phase;
95 #endif
96 }
97
98 void Cldhandler(char *);
99  
100 typedef struct CldToken_s {
101   char msg_header[CmiMsgHeaderSizeBytes];
102   char *msg;  /* if null, message already removed */
103   struct CldToken_s *pred;
104   struct CldToken_s *succ;
105 } *CldToken;
106
107 typedef struct CldProcInfo_s {
108   int tokenhandleridx;
109   int load; /* number of items in doubly-linked circle besides sentinel */
110   CldToken sentinel;
111 } *CldProcInfo;
112
113 CpvDeclare(CldProcInfo, CldProc);
114
115 static void CldTokenHandler(CldToken tok)
116 {
117   CldProcInfo proc = CpvAccess(CldProc);
118   if (tok->msg) {
119     tok->pred->succ = tok->succ;
120     tok->succ->pred = tok->pred;
121     proc->load --;
122     CmiHandleMessage(tok->msg);
123   }
124   else 
125     CpvAccess(CldLoadOffset)--;
126   if (CpvAccess(CldLoadNotify))
127     LoadNotifyFn(CpvAccess(CldProc)->load);
128   CmiFree(tok);
129 }
130
131 int CldCountTokens(void)
132 {
133   return (CpvAccess(CldProc)->load);
134 }
135
136 int CldLoad(void)
137 {
138   return (CsdLength() - CpvAccess(CldLoadOffset));
139 }
140
141 int CldLoadRank(int rank)
142 {
143   int len, offset;
144   /* CmiLock(CpvAccessOther(cldLock, rank));  */
145   len = CqsLength(CpvAccessOther(CsdSchedQueue, rank));
146      /* CldLoadOffset is the empty token counter */
147   offset = CpvAccessOther(CldLoadOffset, rank);
148   /* CmiUnlock(CpvAccessOther(cldLock, rank)); */
149   return len - offset;
150 }
151
152 void CldPutToken(char *msg)
153 {
154   CldProcInfo proc = CpvAccess(CldProc);
155   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
156   CldToken tok;
157   int len, queueing, priobits; unsigned int *prioptr;
158   CldPackFn pfn;
159
160   CmiLock(CpvAccess(cldLock));
161   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
162   tok->msg = msg;
163
164   /* add token to the doubly-linked circle */
165   tok->pred = proc->sentinel->pred;
166   tok->succ = proc->sentinel;
167   tok->pred->succ = tok;
168   tok->succ->pred = tok;
169   proc->load ++;
170   /* add token to the scheduler */
171   CmiSetHandler(tok, proc->tokenhandleridx);
172   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
173   /* not sigio or thread safe */
174   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
175   CmiUnlock(CpvAccess(cldLock));
176 }
177
178
179 static void * _CldGetTokenMsg(CldProcInfo proc)
180 {
181   CldToken tok;
182   void *msg;
183   
184   tok = proc->sentinel->succ;
185   if (tok == proc->sentinel) {
186     return NULL;
187   }
188   tok->pred->succ = tok->succ;
189   tok->succ->pred = tok->pred;
190   proc->load --;
191   msg = tok->msg;
192   tok->msg = 0;
193   return msg;
194 }
195
196 void CldGetToken(char **msg)
197 {
198   CldProcInfo proc = CpvAccess(CldProc);
199   CmiNodeLock cldlock = CpvAccess(cldLock);
200   CmiLock(cldlock);
201   *msg = _CldGetTokenMsg(proc);
202   if (*msg) CpvAccess(CldLoadOffset)++;
203   CmiUnlock(cldlock);
204 }
205
206 /* called at node level */
207 /* get token from processor of rank pe */
208 static void CldGetTokenFromRank(char **msg, int rank)
209 {
210   CldProcInfo proc = CpvAccessOther(CldProc, rank);
211   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
212   CmiLock(cldlock);
213   *msg = _CldGetTokenMsg(proc);
214   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
215   CmiUnlock(cldlock);
216 }
217
218 /* Bit Vector Stuff */
219
220 int CldPresentPE(int pe)
221 {
222   return CpvAccess(CldPEBitVector)[pe];
223 }
224
225 void CldMoveAllSeedsAway()
226 {
227   char *msg;
228   int len, queueing, priobits, pe;
229   unsigned int *prioptr;
230   CldInfoFn ifn;  CldPackFn pfn;
231
232   CldGetToken(&msg);
233   while (msg != 0) {
234     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
235     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
236     CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
237     pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
238     while (!CldPresentPE(pe))
239       pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
240     CmiSyncSendAndFree(pe, len, msg);
241     CldGetToken(&msg);
242   }
243 }
244
245 void CldSetPEBitVector(const char *newBV)
246 {
247   int i;
248   
249   for (i=0; i<CmiNumPes(); i++)
250     CpvAccess(CldPEBitVector)[i] = newBV[i];
251   if (!CldPresentPE(CmiMyPe()))
252     CldMoveAllSeedsAway();
253 }
254
255 /* End Bit Vector Stuff */
256
257 void CldModuleGeneralInit(char **argv)
258 {
259   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
260   CldProcInfo proc;
261   int i;
262
263   CpvInitialize(CldProcInfo, CldProc);
264   CpvInitialize(int, CldLoadOffset);
265   CpvAccess(CldLoadOffset) = 0;
266   CpvInitialize(int, CldLoadNotify);
267   CpvInitialize(BitVector, CldPEBitVector);
268   CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
269   for (i=0; i<CmiNumPes(); i++)
270     CpvAccess(CldPEBitVector)[i] = 1;
271   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
272   proc = CpvAccess(CldProc);
273   proc->load = 0;
274   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
275   proc->sentinel = sentinel;
276   sentinel->succ = sentinel;
277   sentinel->pred = sentinel;
278
279   /* lock to protect token queue for immediate message and smp */
280   CpvInitialize(CmiNodeLock, cldLock);
281   CpvAccess(cldLock) = CmiCreateLock();
282
283   
284   if (CmiMyPe() == 0) {
285     char *stra = CldGetStrategy();
286     if (strcmp(stra, "rand") != 0) {
287       CmiPrintf("Charm++: %s seed load balancer.\n", stra);
288     }
289   } 
290 }
291
292 /* function can be called in an immediate handler at node level
293    rank specify the rank of processor for the node to represent
294    This function can also send as immeidate messages
295 */
296 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
297 {
298   char **msgs;
299   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
300   unsigned int *prioptr;
301   CldInfoFn ifn;
302   CldPackFn pfn;
303
304   if (numToSend == 0)
305     return;
306
307   msgs = (char **)calloc(numToSend, sizeof(char *));
308   msgSizes = (int *)calloc(numToSend, sizeof(int));
309
310   while (!done) {
311     numSent = 0;
312     parcelSize = 0;
313     for (i=0; i<numToSend; i++) {
314       CldGetTokenFromRank(&msgs[i], rank);
315       if (msgs[i] != 0) {
316         done = 1;
317         numSent++;
318         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
319         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
320         msgSizes[i] = len;
321         parcelSize += len;
322         CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
323         if (immed) CmiBecomeImmediate(msgs[i]);
324       }
325       else {
326         done = 1;
327         break;
328       }
329       if (parcelSize > MAXMSGBFRSIZE) {
330         if(i<numToSend-1)
331           done = 0;
332         numToSend -= numSent;
333         break;
334       }
335     }
336     if (numSent > 1) {
337       if (immed)
338         CmiMultipleIsend(pe, numSent, msgSizes, msgs);
339       else
340         CmiMultipleSend(pe, numSent, msgSizes, msgs);
341       for (i=0; i<numSent; i++)
342         CmiFree(msgs[i]);
343       CpvAccessOther(CldRelocatedMessages, rank) += numSent;
344       CpvAccessOther(CldMessageChunks, rank)++;
345     }
346     else if (numSent == 1) {
347       if (immed) CmiBecomeImmediate(msgs[0]);
348       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
349       CpvAccessOther(CldRelocatedMessages, rank)++;
350       CpvAccessOther(CldMessageChunks, rank)++;
351     }
352   }
353   free(msgs);
354   free(msgSizes);
355 }
356
357 /* simple scheme - just send one by one. useful for multicore */
358 void CldSimpleMultipleSend(int pe, int numToSend)
359 {
360   char *msg;
361   int len, queueing, priobits, *msgSizes, i, numSent, done=0;
362   unsigned int *prioptr;
363   CldInfoFn ifn;
364   CldPackFn pfn;
365
366   if (numToSend == 0)
367     return;
368
369   numSent = 0;
370   while (!done) {
371     for (i=0; i<numToSend; i++) {
372       CldGetToken(&msg);
373       if (msg != 0) {
374         done = 1;
375         numToSend--;
376         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
377         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
378         CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, pe));
379         CmiSyncSendAndFree(pe, len, msg);
380         if (numToSend == 0) done = 1;
381       }
382       else {
383         done = 1;
384         break;
385       }
386     }
387   }
388 }