Remove archaic CVS keyword header comment blocks
[charm.git] / src / conv-ldb / cldb.c
1
2 #include <stdlib.h>
3 #include "queueing.h"
4 #include "cldb.h"
5 #include <math.h>
6
7 typedef char *BitVector;
8
9 CpvDeclare(int, CldHandlerIndex);
10 CpvDeclare(BitVector, CldPEBitVector);
11 CpvDeclare(int, CldBalanceHandlerIndex);
12
13 CpvDeclare(int, CldRelocatedMessages);
14 CpvDeclare(int, CldLoadBalanceMessages);
15 CpvDeclare(int, CldMessageChunks);
16 CpvDeclare(int, CldLoadNotify);
17
18 CpvDeclare(CmiNodeLock, cldLock);
19
20 extern void LoadNotifyFn(int);
21
22 char* _lbtopo = "torus_nd_5";
23
24 /* Estimator stuff.  Of any use? */
25 /*
26 CpvStaticDeclare(CldEstimatorTable, _estfns);
27 */
28 void CldRegisterEstimator(CldEstimator fn)
29 {
30   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
31 }
32
33 /* 
34 int CldEstimate(void)
35 {
36   CldEstimatorTable *estab = &(CpvAccess(_estfns));
37   int i, load=0;
38   for(i=0; i<estab->count; i++)
39     load += (*(estab->fns[i]))();
40   return load;
41 }
42
43 static int CsdEstimator(void)
44 {
45   return CsdLength();
46 }
47 */
48
49 CpvDeclare(int, CldLoadOffset);
50
51
52 int CldRegisterInfoFn(CldInfoFn fn)
53 {
54   return CmiRegisterHandler((CmiHandler)fn);
55 }
56
57 int CldRegisterPackFn(CldPackFn fn)
58 {
59   return CmiRegisterHandler((CmiHandler)fn);
60 }
61
62 /* CldSwitchHandler takes a message and a new handler number.  It
63  * changes the handler number to the new handler number and move the
64  * old to the Xhandler part of the header.  When the message gets
65  * handled, the handler should call CldRestoreHandler to put the old
66  * handler back.
67  *
68  * CldPutToken puts a message in the scheduler queue in such a way
69  * that it can be retreived from the queue.  Once the message gets
70  * handled, it can no longer be retreived.  CldGetToken removes a
71  * message that was placed in the scheduler queue in this way.
72  * CldCountTokens tells you how many tokens are currently retreivable.  
73 */
74
75 void CldSwitchHandler(char *cmsg, int handler)
76 {
77 #if CMK_MEM_CHECKPOINT
78   int old_phase = CmiGetRestartPhase(cmsg);
79 #endif
80   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
81   CmiSetHandler(cmsg, handler);
82 #if CMK_MEM_CHECKPOINT
83   CmiGetRestartPhase(cmsg) = old_phase;
84 #endif
85 }
86
87 void CldRestoreHandler(char *cmsg)
88 {
89 #if CMK_MEM_CHECKPOINT
90   int old_phase = CmiGetRestartPhase(cmsg);
91 #endif
92   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
93 #if CMK_MEM_CHECKPOINT
94   CmiGetRestartPhase(cmsg) = old_phase;
95 #endif
96 }
97
98 void Cldhandler(char *);
99  
100 typedef struct CldToken_s {
101   char msg_header[CmiMsgHeaderSizeBytes];
102   char *msg;  /* if null, message already removed */
103   struct CldToken_s *pred;
104   struct CldToken_s *succ;
105 } *CldToken;
106
107 typedef struct CldProcInfo_s {
108   int tokenhandleridx;
109   int load; /* number of items in doubly-linked circle besides sentinel */
110   CldToken sentinel;
111 } *CldProcInfo;
112
113 CpvDeclare(CldProcInfo, CldProc);
114
115 static void CldTokenHandler(CldToken tok)
116 {
117   CldProcInfo proc = CpvAccess(CldProc);
118   if (tok->msg) {
119     tok->pred->succ = tok->succ;
120     tok->succ->pred = tok->pred;
121     proc->load --;
122     CmiHandleMessage(tok->msg);
123   }
124   else 
125     CpvAccess(CldLoadOffset)--;
126   if (CpvAccess(CldLoadNotify))
127     LoadNotifyFn(CpvAccess(CldProc)->load);
128   CmiFree(tok);
129 }
130
131 int CldCountTokensRank(int rank)
132 {
133   return CpvAccessOther(CldProc, rank)->load;
134 }
135
136 int CldCountTokens(void)
137 {
138   return (CpvAccess(CldProc)->load);
139 }
140
141 int CldLoad(void)
142 {
143   return (CsdLength() - CpvAccess(CldLoadOffset));
144 }
145
146 int CldLoadRank(int rank)
147 {
148   int len, offset;
149   /* CmiLock(CpvAccessOther(cldLock, rank));  */
150   len = CqsLength(CpvAccessOther(CsdSchedQueue, rank));
151      /* CldLoadOffset is the empty token counter */
152   offset = CpvAccessOther(CldLoadOffset, rank);
153   /* CmiUnlock(CpvAccessOther(cldLock, rank)); */
154   return len - offset;
155 }
156
157 void CldPutToken(char *msg)
158 {
159   CldProcInfo proc = CpvAccess(CldProc);
160   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
161   CldToken tok;
162   int len, queueing, priobits; unsigned int *prioptr;
163   CldPackFn pfn;
164
165   CmiLock(CpvAccess(cldLock));
166   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
167   tok->msg = msg;
168
169   /* add token to the doubly-linked circle */
170   tok->pred = proc->sentinel->pred;
171   tok->succ = proc->sentinel;
172   tok->pred->succ = tok;
173   tok->succ->pred = tok;
174   proc->load ++;
175   /* add token to the scheduler */
176   CmiSetHandler(tok, proc->tokenhandleridx);
177   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
178   /* not sigio or thread safe */
179   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
180   CmiUnlock(CpvAccess(cldLock));
181 }
182
183 void CldPutTokenPrio(char *msg)
184 {
185   CldProcInfo proc = CpvAccess(CldProc);
186   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
187   CldToken tok, ptr;
188   int len, queueing, priobits; unsigned int *prioptr, ints;
189   CldPackFn pfn;
190
191   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
192   ints = (priobits+CINTBITS-1)/CINTBITS;
193
194   CmiLock(CpvAccess(cldLock));
195   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
196   tok->msg = msg;
197
198   /* find the right place */
199   ptr = proc->sentinel->succ;
200   while (ptr!=proc->sentinel) {
201     int len1, queueing1, priobits1; unsigned int *prioptr1, ints1;
202     CldPackFn pfn1;
203     ifn(ptr->msg, &pfn1, &len1, &queueing1, &priobits1, &prioptr1);
204     ints1 = (priobits1+CINTBITS-1)/CINTBITS;
205
206     if (!CqsPrioGT_(ints, prioptr, ints1, prioptr1)) { break;}
207     ptr = ptr->succ;
208   }
209
210   /* add token to the doubly-linked circle */
211   tok->succ = ptr;
212   tok->pred = ptr->pred;
213   tok->pred->succ = tok;
214   tok->succ->pred = tok;
215   proc->load ++;
216   /* add token to the scheduler */
217   CmiSetHandler(tok, proc->tokenhandleridx);
218   /* not sigio or thread safe */
219   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
220   CmiUnlock(CpvAccess(cldLock));
221 }
222
223
224 static 
225 #if CMK_C_INLINE
226 inline 
227 #endif
228 void * _CldGetTokenMsg(CldProcInfo proc)
229 {
230   CldToken tok;
231   void *msg;
232   
233   tok = proc->sentinel->succ;
234   if (tok == proc->sentinel) {
235     return NULL;
236   }
237   tok->pred->succ = tok->succ;
238   tok->succ->pred = tok->pred;
239   proc->load --;
240   msg = tok->msg;
241   tok->msg = 0;
242   return msg;
243 }
244
245 void CldGetToken(char **msg)
246 {
247   CldProcInfo proc = CpvAccess(CldProc);
248   CmiNodeLock cldlock = CpvAccess(cldLock);
249   CmiLock(cldlock);
250   *msg = _CldGetTokenMsg(proc);
251   if (*msg) CpvAccess(CldLoadOffset)++;
252   CmiUnlock(cldlock);
253 }
254
255 /* called at node level */
256 /* get token from processor of rank pe */
257 static 
258 #if CMK_C_INLINE
259 inline 
260 #endif
261 void CldGetTokenFromRank(char **msg, int rank)
262 {
263   CldProcInfo proc = CpvAccessOther(CldProc, rank);
264   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
265   CmiLock(cldlock);
266   *msg = _CldGetTokenMsg(proc);
267   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
268   CmiUnlock(cldlock);
269 }
270
271 static 
272 #if CMK_C_INLINE
273 inline 
274 #endif
275 void * _CldGetTokenMsgAt(CldProcInfo proc, CldToken tok)
276 {
277   void *msg;
278   
279   if (tok == proc->sentinel) return NULL;
280   tok->pred->succ = tok->succ;
281   tok->succ->pred = tok->pred;
282   proc->load --;
283   msg = tok->msg;
284   tok->msg = 0;
285   return msg;
286 }
287
288 /* called at node level */
289 /* get token from processor of rank pe */
290 static 
291 #if CMK_C_INLINE
292 inline 
293 #endif
294 void CldGetTokenFromRankAt(char **msg, int rank, CldToken tok)
295 {
296   CldProcInfo proc = CpvAccessOther(CldProc, rank);
297   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
298   CmiLock(cldlock);
299   *msg = _CldGetTokenMsgAt(proc, tok);
300   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
301   CmiUnlock(cldlock);
302 }
303
304 /* Bit Vector Stuff */
305
306 int CldPresentPE(int pe)
307 {
308   return CpvAccess(CldPEBitVector)[pe];
309 }
310
311 void CldMoveAllSeedsAway()
312 {
313   char *msg;
314   int len, queueing, priobits, pe;
315   unsigned int *prioptr;
316   CldInfoFn ifn;  CldPackFn pfn;
317
318   CldGetToken(&msg);
319   while (msg != 0) {
320     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
321     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
322     CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
323     pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
324     while (!CldPresentPE(pe))
325       pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
326     CmiSyncSendAndFree(pe, len, msg);
327     CldGetToken(&msg);
328   }
329 }
330
331 void CldSetPEBitVector(const char *newBV)
332 {
333   int i;
334   
335   for (i=0; i<CmiNumPes(); i++)
336     CpvAccess(CldPEBitVector)[i] = newBV[i];
337   if (!CldPresentPE(CmiMyPe()))
338     CldMoveAllSeedsAway();
339 }
340
341 /* End Bit Vector Stuff */
342
343 static int _cldb_cs = 0;
344
345 void CldModuleGeneralInit(char **argv)
346 {
347   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
348   CldProcInfo proc;
349   int i;
350
351   CpvInitialize(CldProcInfo, CldProc);
352   CpvInitialize(int, CldLoadOffset);
353   CpvAccess(CldLoadOffset) = 0;
354   CpvInitialize(int, CldLoadNotify);
355   CpvInitialize(BitVector, CldPEBitVector);
356   CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
357   for (i=0; i<CmiNumPes(); i++)
358     CpvAccess(CldPEBitVector)[i] = 1;
359   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
360   proc = CpvAccess(CldProc);
361   proc->load = 0;
362   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
363   proc->sentinel = sentinel;
364   sentinel->succ = sentinel;
365   sentinel->pred = sentinel;
366
367   /* lock to protect token queue for immediate message and smp */
368   CpvInitialize(CmiNodeLock, cldLock);
369   CpvAccess(cldLock) = CmiCreateLock();
370
371   _cldb_cs = CmiGetArgFlagDesc(argv, "+cldb_cs", "Converse> Print seed load balancing statistics.");
372   
373   if (CmiMyPe() == 0) {
374     char *stra = CldGetStrategy();
375     if (strcmp(stra, "rand") != 0) {
376       CmiPrintf("Charm++> %s seed load balancer.\n", stra);
377     }
378   } 
379 }
380
381 /* function can be called in an immediate handler at node level
382    rank specify the rank of processor for the node to represent
383    This function can also send as immeidate messages
384 */
385 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
386 {
387   char **msgs;
388   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
389   unsigned int *prioptr;
390   CldInfoFn ifn;
391   CldPackFn pfn;
392
393   msgs = (char **)calloc(numToSend, sizeof(char *));
394   msgSizes = (int *)calloc(numToSend, sizeof(int));
395
396   while (!done) {
397     numSent = 0;
398     parcelSize = 0;
399     for (i=0; i<numToSend; i++) {
400       CldGetTokenFromRank(&msgs[i], rank);
401       if (msgs[i] != 0) {
402         done = 1;
403         numSent++;
404         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
405         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
406         msgSizes[i] = len;
407         parcelSize += len;
408         CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
409         if (immed) CmiBecomeImmediate(msgs[i]);
410       }
411       else {
412         done = 1;
413         break;
414       }
415       if (parcelSize > MAXMSGBFRSIZE) {
416         if(i<numToSend-1)
417           done = 0;
418         numToSend -= numSent;
419         break;
420       }
421     }
422     if (numSent > 1) {
423       if (immed)
424         CmiMultipleIsend(pe, numSent, msgSizes, msgs);
425       else
426         CmiMultipleSend(pe, numSent, msgSizes, msgs);
427       for (i=0; i<numSent; i++)
428         CmiFree(msgs[i]);
429       CpvAccessOther(CldRelocatedMessages, rank) += numSent;
430       CpvAccessOther(CldMessageChunks, rank)++;
431     }
432     else if (numSent == 1) {
433       if (immed) CmiBecomeImmediate(msgs[0]);
434       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
435       CpvAccessOther(CldRelocatedMessages, rank)++;
436       CpvAccessOther(CldMessageChunks, rank)++;
437     }
438   }
439   free(msgs);
440   free(msgSizes);
441 }
442
443 /* function can be called in an immediate handler at node level
444    rank specify the rank of processor for the node to represent
445    This function can also send as immeidate messages
446 */
447 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed)
448 {
449   char **msgs;
450   int len, queueing, priobits, *msgSizes, i;
451   unsigned int *prioptr;
452   CldInfoFn ifn;
453   CldPackFn pfn;
454   CldToken tok;
455   CldProcInfo proc = CpvAccess(CldProc);
456   int count = 0;
457
458   if (numToSend ==0) return;
459   msgs = (char **)calloc(numToSend, sizeof(char *));
460   msgSizes = (int *)calloc(numToSend, sizeof(int));
461
462   tok = proc->sentinel->succ;
463   if (tok == proc->sentinel) return;
464   tok = tok->succ;
465   while (tok!=proc->sentinel) {
466     tok = tok->succ;
467     if (tok == proc->sentinel) break;
468     CldGetTokenFromRankAt(&msgs[count], rank, tok->pred);
469     if (msgs[i] != 0) {
470         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
471         ifn(msgs[count], &pfn, &len, &queueing, &priobits, &prioptr);
472         msgSizes[count] = len;
473         CldSwitchHandler(msgs[count], CpvAccessOther(CldBalanceHandlerIndex, rank));
474         if (immed) CmiBecomeImmediate(msgs[count]);
475         count ++;
476     }
477     tok = tok->succ;
478   }
479   if (count > 1) {
480       if (immed)
481         CmiMultipleIsend(pe, count, msgSizes, msgs);
482       else
483         CmiMultipleSend(pe, count, msgSizes, msgs);
484       for (i=0; i<count; i++)
485         CmiFree(msgs[i]);
486       CpvAccessOther(CldRelocatedMessages, rank) += count;
487       CpvAccessOther(CldMessageChunks, rank)++;
488   }
489   else if (count == 1) {
490       if (immed) CmiBecomeImmediate(msgs[0]);
491       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
492       CpvAccessOther(CldRelocatedMessages, rank)++;
493       CpvAccessOther(CldMessageChunks, rank)++;
494   }
495   free(msgs);
496   free(msgSizes);
497 }
498
499 /* simple scheme - just send one by one. useful for multicore */
500 void CldSimpleMultipleSend(int pe, int numToSend, int rank)
501 {
502   char *msg;
503   int len, queueing, priobits, *msgSizes, i, numSent, done=0;
504   unsigned int *prioptr;
505   CldInfoFn ifn;
506   CldPackFn pfn;
507
508   if (numToSend == 0)
509     return;
510
511   numSent = 0;
512   while (!done) {
513     for (i=0; i<numToSend; i++) {
514       CldGetTokenFromRank(&msg, rank);
515       if (msg != 0) {
516         done = 1;
517         numToSend--;
518         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
519         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
520         CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, rank));
521         CmiSyncSendAndFree(pe, len, msg);
522         if (numToSend == 0) done = 1;
523       }
524       else {
525         done = 1;
526         break;
527       }
528     }
529   }
530 }
531
532 void seedBalancerExit()
533 {
534   if (_cldb_cs)
535     CmiPrintf("[%d] Relocate message number is %d\n", CmiMyPe(), CpvAccess(CldRelocatedMessages));
536 }