minor change to make charm output uniform.
[charm.git] / src / conv-ldb / cldb.c
1
2 #include <stdlib.h>
3 #include "queueing.h"
4 #include "cldb.h"
5 #include <math.h>
6
7 typedef char *BitVector;
8
9 CpvDeclare(int, CldHandlerIndex);
10 CpvDeclare(BitVector, CldPEBitVector);
11 CpvDeclare(int, CldBalanceHandlerIndex);
12
13 CpvDeclare(int, CldRelocatedMessages);
14 CpvDeclare(int, CldLoadBalanceMessages);
15 CpvDeclare(int, CldMessageChunks);
16 CpvDeclare(int, CldLoadNotify);
17
18 CpvDeclare(CmiNodeLock, cldLock);
19
20 extern void LoadNotifyFn(int);
21
22 char* _lbtopo = "torus_nd_5";
23
24 /* Estimator stuff.  Of any use? */
25 /*
26 CpvStaticDeclare(CldEstimatorTable, _estfns);
27 */
28 void CldRegisterEstimator(CldEstimator fn)
29 {
30   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
31 }
32
33 /* 
34 int CldEstimate(void)
35 {
36   CldEstimatorTable *estab = &(CpvAccess(_estfns));
37   int i, load=0;
38   for(i=0; i<estab->count; i++)
39     load += (*(estab->fns[i]))();
40   return load;
41 }
42
43 static int CsdEstimator(void)
44 {
45   return CsdLength();
46 }
47 */
48
49 CpvDeclare(int, CldLoadOffset);
50
51
52 int CldRegisterInfoFn(CldInfoFn fn)
53 {
54   return CmiRegisterHandler((CmiHandler)fn);
55 }
56
57 int CldRegisterPackFn(CldPackFn fn)
58 {
59   return CmiRegisterHandler((CmiHandler)fn);
60 }
61
62 /* CldSwitchHandler takes a message and a new handler number.  It
63  * changes the handler number to the new handler number and move the
64  * old to the Xhandler part of the header.  When the message gets
65  * handled, the handler should call CldRestoreHandler to put the old
66  * handler back.
67  *
68  * CldPutToken puts a message in the scheduler queue in such a way
69  * that it can be retreived from the queue.  Once the message gets
70  * handled, it can no longer be retreived.  CldGetToken removes a
71  * message that was placed in the scheduler queue in this way.
72  * CldCountTokens tells you how many tokens are currently retreivable.  
73 */
74
75 void CldSwitchHandler(char *cmsg, int handler)
76 {
77 #if CMK_MEM_CHECKPOINT
78   int old_phase = CmiGetRestartPhase(cmsg);
79 #endif
80   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
81   CmiSetHandler(cmsg, handler);
82 #if CMK_MEM_CHECKPOINT
83   CmiGetRestartPhase(cmsg) = old_phase;
84 #endif
85 }
86
87 void CldRestoreHandler(char *cmsg)
88 {
89 #if CMK_MEM_CHECKPOINT
90   int old_phase = CmiGetRestartPhase(cmsg);
91 #endif
92   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
93 #if CMK_MEM_CHECKPOINT
94   CmiGetRestartPhase(cmsg) = old_phase;
95 #endif
96 }
97
98 void Cldhandler(char *);
99  
100 typedef struct CldToken_s {
101   char msg_header[CmiMsgHeaderSizeBytes];
102   char *msg;  /* if null, message already removed */
103   struct CldToken_s *pred;
104   struct CldToken_s *succ;
105 } *CldToken;
106
107 typedef struct CldProcInfo_s {
108   int tokenhandleridx;
109   int load; /* number of items in doubly-linked circle besides sentinel */
110   CldToken sentinel;
111 } *CldProcInfo;
112
113 CpvDeclare(CldProcInfo, CldProc);
114
115 static void CldTokenHandler(CldToken tok)
116 {
117   CldProcInfo proc = CpvAccess(CldProc);
118   if (tok->msg) {
119     tok->pred->succ = tok->succ;
120     tok->succ->pred = tok->pred;
121     proc->load --;
122     CmiHandleMessage(tok->msg);
123   }
124   else 
125     CpvAccess(CldLoadOffset)--;
126   if (CpvAccess(CldLoadNotify))
127     LoadNotifyFn(CpvAccess(CldProc)->load);
128   CmiFree(tok);
129 }
130
131 int CldCountTokens(void)
132 {
133   return (CpvAccess(CldProc)->load);
134 }
135
136 int CldLoad(void)
137 {
138   return (CsdLength() - CpvAccess(CldLoadOffset));
139 }
140
141 int CldLoadRank(int rank)
142 {
143   int len, offset;
144   /* CmiLock(CpvAccessOther(cldLock, rank));  */
145   len = CqsLength(CpvAccessOther(CsdSchedQueue, rank));
146      /* CldLoadOffset is the empty token counter */
147   offset = CpvAccessOther(CldLoadOffset, rank);
148   /* CmiUnlock(CpvAccessOther(cldLock, rank)); */
149   return len - offset;
150 }
151
152 void CldPutToken(char *msg)
153 {
154   CldProcInfo proc = CpvAccess(CldProc);
155   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
156   CldToken tok;
157   int len, queueing, priobits; unsigned int *prioptr;
158   CldPackFn pfn;
159
160   CmiLock(CpvAccess(cldLock));
161   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
162   tok->msg = msg;
163
164   /* add token to the doubly-linked circle */
165   tok->pred = proc->sentinel->pred;
166   tok->succ = proc->sentinel;
167   tok->pred->succ = tok;
168   tok->succ->pred = tok;
169   proc->load ++;
170   /* add token to the scheduler */
171   CmiSetHandler(tok, proc->tokenhandleridx);
172   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
173   /* not sigio or thread safe */
174   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
175   CmiUnlock(CpvAccess(cldLock));
176 }
177
178
179 static 
180 #if CMK_C_INLINE
181 inline 
182 #endif
183 void * _CldGetTokenMsg(CldProcInfo proc)
184 {
185   CldToken tok;
186   void *msg;
187   
188   tok = proc->sentinel->succ;
189   if (tok == proc->sentinel) {
190     return NULL;
191   }
192   tok->pred->succ = tok->succ;
193   tok->succ->pred = tok->pred;
194   proc->load --;
195   msg = tok->msg;
196   tok->msg = 0;
197   return msg;
198 }
199
200 void CldGetToken(char **msg)
201 {
202   CldProcInfo proc = CpvAccess(CldProc);
203   CmiNodeLock cldlock = CpvAccess(cldLock);
204   CmiLock(cldlock);
205   *msg = _CldGetTokenMsg(proc);
206   if (*msg) CpvAccess(CldLoadOffset)++;
207   CmiUnlock(cldlock);
208 }
209
210 /* called at node level */
211 /* get token from processor of rank pe */
212 static 
213 #if CMK_C_INLINE
214 inline 
215 #endif
216 void CldGetTokenFromRank(char **msg, int rank)
217 {
218   CldProcInfo proc = CpvAccessOther(CldProc, rank);
219   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
220   CmiLock(cldlock);
221   *msg = _CldGetTokenMsg(proc);
222   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
223   CmiUnlock(cldlock);
224 }
225
226 /* Bit Vector Stuff */
227
228 int CldPresentPE(int pe)
229 {
230   return CpvAccess(CldPEBitVector)[pe];
231 }
232
233 void CldMoveAllSeedsAway()
234 {
235   char *msg;
236   int len, queueing, priobits, pe;
237   unsigned int *prioptr;
238   CldInfoFn ifn;  CldPackFn pfn;
239
240   CldGetToken(&msg);
241   while (msg != 0) {
242     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
243     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
244     CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
245     pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
246     while (!CldPresentPE(pe))
247       pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
248     CmiSyncSendAndFree(pe, len, msg);
249     CldGetToken(&msg);
250   }
251 }
252
253 void CldSetPEBitVector(const char *newBV)
254 {
255   int i;
256   
257   for (i=0; i<CmiNumPes(); i++)
258     CpvAccess(CldPEBitVector)[i] = newBV[i];
259   if (!CldPresentPE(CmiMyPe()))
260     CldMoveAllSeedsAway();
261 }
262
263 /* End Bit Vector Stuff */
264
265 void CldModuleGeneralInit(char **argv)
266 {
267   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
268   CldProcInfo proc;
269   int i;
270
271   CpvInitialize(CldProcInfo, CldProc);
272   CpvInitialize(int, CldLoadOffset);
273   CpvAccess(CldLoadOffset) = 0;
274   CpvInitialize(int, CldLoadNotify);
275   CpvInitialize(BitVector, CldPEBitVector);
276   CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
277   for (i=0; i<CmiNumPes(); i++)
278     CpvAccess(CldPEBitVector)[i] = 1;
279   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
280   proc = CpvAccess(CldProc);
281   proc->load = 0;
282   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
283   proc->sentinel = sentinel;
284   sentinel->succ = sentinel;
285   sentinel->pred = sentinel;
286
287   /* lock to protect token queue for immediate message and smp */
288   CpvInitialize(CmiNodeLock, cldLock);
289   CpvAccess(cldLock) = CmiCreateLock();
290
291   
292   if (CmiMyPe() == 0) {
293     char *stra = CldGetStrategy();
294     if (strcmp(stra, "rand") != 0) {
295       CmiPrintf("Charm++> %s seed load balancer.\n", stra);
296     }
297   } 
298 }
299
300 /* function can be called in an immediate handler at node level
301    rank specify the rank of processor for the node to represent
302    This function can also send as immeidate messages
303 */
304 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
305 {
306   char **msgs;
307   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
308   unsigned int *prioptr;
309   CldInfoFn ifn;
310   CldPackFn pfn;
311
312   msgs = (char **)calloc(numToSend, sizeof(char *));
313   msgSizes = (int *)calloc(numToSend, sizeof(int));
314
315   while (!done) {
316     numSent = 0;
317     parcelSize = 0;
318     for (i=0; i<numToSend; i++) {
319       CldGetTokenFromRank(&msgs[i], rank);
320       if (msgs[i] != 0) {
321         done = 1;
322         numSent++;
323         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
324         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
325         msgSizes[i] = len;
326         parcelSize += len;
327         CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
328         if (immed) CmiBecomeImmediate(msgs[i]);
329       }
330       else {
331         done = 1;
332         break;
333       }
334       if (parcelSize > MAXMSGBFRSIZE) {
335         if(i<numToSend-1)
336           done = 0;
337         numToSend -= numSent;
338         break;
339       }
340     }
341     if (numSent > 1) {
342       if (immed)
343         CmiMultipleIsend(pe, numSent, msgSizes, msgs);
344       else
345         CmiMultipleSend(pe, numSent, msgSizes, msgs);
346       for (i=0; i<numSent; i++)
347         CmiFree(msgs[i]);
348       CpvAccessOther(CldRelocatedMessages, rank) += numSent;
349       CpvAccessOther(CldMessageChunks, rank)++;
350     }
351     else if (numSent == 1) {
352       if (immed) CmiBecomeImmediate(msgs[0]);
353       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
354       CpvAccessOther(CldRelocatedMessages, rank)++;
355       CpvAccessOther(CldMessageChunks, rank)++;
356     }
357   }
358   free(msgs);
359   free(msgSizes);
360 }
361
362 /* simple scheme - just send one by one. useful for multicore */
363 void CldSimpleMultipleSend(int pe, int numToSend)
364 {
365   char *msg;
366   int len, queueing, priobits, *msgSizes, i, numSent, done=0;
367   unsigned int *prioptr;
368   CldInfoFn ifn;
369   CldPackFn pfn;
370
371   if (numToSend == 0)
372     return;
373
374   numSent = 0;
375   while (!done) {
376     for (i=0; i<numToSend; i++) {
377       CldGetToken(&msg);
378       if (msg != 0) {
379         done = 1;
380         numToSend--;
381         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
382         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
383         CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, pe));
384         CmiSyncSendAndFree(pe, len, msg);
385         if (numToSend == 0) done = 1;
386       }
387       else {
388         done = 1;
389         break;
390       }
391     }
392   }
393 }