small change in the previous commit
[charm.git] / src / conv-ldb / cldb.c
1
2 #include <stdlib.h>
3 #include "queueing.h"
4 #include "cldb.h"
5 #include <math.h>
6
7 typedef char *BitVector;
8
9 CpvDeclare(int, CldHandlerIndex);
10 CpvDeclare(int, CldNodeHandlerIndex);
11 CpvDeclare(BitVector, CldPEBitVector);
12 CpvDeclare(int, CldBalanceHandlerIndex);
13
14 CpvDeclare(int, CldRelocatedMessages);
15 CpvDeclare(int, CldLoadBalanceMessages);
16 CpvDeclare(int, CldMessageChunks);
17 CpvDeclare(int, CldLoadNotify);
18
19 CpvDeclare(CmiNodeLock, cldLock);
20
21 extern void LoadNotifyFn(int);
22
23 char* _lbtopo = "torus_nd_5";
24
25 /* Estimator stuff.  Of any use? */
26 /*
27 CpvStaticDeclare(CldEstimatorTable, _estfns);
28 */
29 void CldRegisterEstimator(CldEstimator fn)
30 {
31   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
32 }
33
34 /* 
35 int CldEstimate(void)
36 {
37   CldEstimatorTable *estab = &(CpvAccess(_estfns));
38   int i, load=0;
39   for(i=0; i<estab->count; i++)
40     load += (*(estab->fns[i]))();
41   return load;
42 }
43
44 static int CsdEstimator(void)
45 {
46   return CsdLength();
47 }
48 */
49
50 CpvDeclare(int, CldLoadOffset);
51
52
53 int CldRegisterInfoFn(CldInfoFn fn)
54 {
55   return CmiRegisterHandler((CmiHandler)fn);
56 }
57
58 int CldRegisterPackFn(CldPackFn fn)
59 {
60   return CmiRegisterHandler((CmiHandler)fn);
61 }
62
63 /* CldSwitchHandler takes a message and a new handler number.  It
64  * changes the handler number to the new handler number and move the
65  * old to the Xhandler part of the header.  When the message gets
66  * handled, the handler should call CldRestoreHandler to put the old
67  * handler back.
68  *
69  * CldPutToken puts a message in the scheduler queue in such a way
70  * that it can be retreived from the queue.  Once the message gets
71  * handled, it can no longer be retreived.  CldGetToken removes a
72  * message that was placed in the scheduler queue in this way.
73  * CldCountTokens tells you how many tokens are currently retreivable.  
74 */
75
76 void CldSwitchHandler(char *cmsg, int handler)
77 {
78 #if CMK_MEM_CHECKPOINT
79   int old_phase = CmiGetRestartPhase(cmsg);
80 #endif
81   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
82   CmiSetHandler(cmsg, handler);
83 #if CMK_MEM_CHECKPOINT
84   CmiGetRestartPhase(cmsg) = old_phase;
85 #endif
86 }
87
88 void CldRestoreHandler(char *cmsg)
89 {
90 #if CMK_MEM_CHECKPOINT
91   int old_phase = CmiGetRestartPhase(cmsg);
92 #endif
93   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
94 #if CMK_MEM_CHECKPOINT
95   CmiGetRestartPhase(cmsg) = old_phase;
96 #endif
97 }
98
99 void Cldhandler(char *);
100  
101 typedef struct CldToken_s {
102   char msg_header[CmiMsgHeaderSizeBytes];
103   char *msg;  /* if null, message already removed */
104   struct CldToken_s *pred;
105   struct CldToken_s *succ;
106 } *CldToken;
107
108 typedef struct CldProcInfo_s {
109   int tokenhandleridx;
110   int load; /* number of items in doubly-linked circle besides sentinel */
111   CldToken sentinel;
112 } *CldProcInfo;
113
114 CpvDeclare(CldProcInfo, CldProc);
115
116 static void CldTokenHandler(CldToken tok)
117 {
118   CldProcInfo proc = CpvAccess(CldProc);
119   if (tok->msg) {
120     tok->pred->succ = tok->succ;
121     tok->succ->pred = tok->pred;
122     proc->load --;
123     CmiHandleMessage(tok->msg);
124   }
125   else 
126     CpvAccess(CldLoadOffset)--;
127   if (CpvAccess(CldLoadNotify))
128     LoadNotifyFn(CpvAccess(CldProc)->load);
129   CmiFree(tok);
130 }
131
132 int CldCountTokensRank(int rank)
133 {
134   return CpvAccessOther(CldProc, rank)->load;
135 }
136
137 int CldCountTokens(void)
138 {
139   return (CpvAccess(CldProc)->load);
140 }
141
142 int CldLoad(void)
143 {
144   return (CsdLength() - CpvAccess(CldLoadOffset));
145 }
146
147 int CldLoadRank(int rank)
148 {
149   int len, offset;
150   /* CmiLock(CpvAccessOther(cldLock, rank));  */
151   len = CqsLength(CpvAccessOther(CsdSchedQueue, rank));
152      /* CldLoadOffset is the empty token counter */
153   offset = CpvAccessOther(CldLoadOffset, rank);
154   /* CmiUnlock(CpvAccessOther(cldLock, rank)); */
155   return len - offset;
156 }
157
158 void CldPutToken(char *msg)
159 {
160   CldProcInfo proc = CpvAccess(CldProc);
161   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
162   CldToken tok;
163   int len, queueing, priobits; unsigned int *prioptr;
164   CldPackFn pfn;
165
166   CmiLock(CpvAccess(cldLock));
167   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
168   tok->msg = msg;
169
170   /* add token to the doubly-linked circle */
171   tok->pred = proc->sentinel->pred;
172   tok->succ = proc->sentinel;
173   tok->pred->succ = tok;
174   tok->succ->pred = tok;
175   proc->load ++;
176   /* add token to the scheduler */
177   CmiSetHandler(tok, proc->tokenhandleridx);
178   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
179   /* not sigio or thread safe */
180   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
181   CmiUnlock(CpvAccess(cldLock));
182 }
183
184 void CldPutTokenPrio(char *msg)
185 {
186   CldProcInfo proc = CpvAccess(CldProc);
187   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
188   CldToken tok, ptr;
189   int len, queueing, priobits; unsigned int *prioptr, ints;
190   CldPackFn pfn;
191
192   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
193   ints = (priobits+CINTBITS-1)/CINTBITS;
194
195   CmiLock(CpvAccess(cldLock));
196   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
197   tok->msg = msg;
198
199   /* find the right place */
200   ptr = proc->sentinel->succ;
201   while (ptr!=proc->sentinel) {
202     int len1, queueing1, priobits1; unsigned int *prioptr1, ints1;
203     CldPackFn pfn1;
204     ifn(ptr->msg, &pfn1, &len1, &queueing1, &priobits1, &prioptr1);
205     ints1 = (priobits1+CINTBITS-1)/CINTBITS;
206
207     if (!CqsPrioGT_(ints, prioptr, ints1, prioptr1)) { break;}
208     ptr = ptr->succ;
209   }
210
211   /* add token to the doubly-linked circle */
212   tok->succ = ptr;
213   tok->pred = ptr->pred;
214   tok->pred->succ = tok;
215   tok->succ->pred = tok;
216   proc->load ++;
217   /* add token to the scheduler */
218   CmiSetHandler(tok, proc->tokenhandleridx);
219   /* not sigio or thread safe */
220   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
221   CmiUnlock(CpvAccess(cldLock));
222 }
223
224
225 static 
226 #if CMK_C_INLINE
227 inline 
228 #endif
229 void * _CldGetTokenMsg(CldProcInfo proc)
230 {
231   CldToken tok;
232   void *msg;
233   
234   tok = proc->sentinel->succ;
235   if (tok == proc->sentinel) {
236     return NULL;
237   }
238   tok->pred->succ = tok->succ;
239   tok->succ->pred = tok->pred;
240   proc->load --;
241   msg = tok->msg;
242   tok->msg = 0;
243   return msg;
244 }
245
246 void CldGetToken(char **msg)
247 {
248   CldProcInfo proc = CpvAccess(CldProc);
249   CmiNodeLock cldlock = CpvAccess(cldLock);
250   CmiLock(cldlock);
251   *msg = _CldGetTokenMsg(proc);
252   if (*msg) CpvAccess(CldLoadOffset)++;
253   CmiUnlock(cldlock);
254 }
255
256 /* called at node level */
257 /* get token from processor of rank pe */
258 static 
259 #if CMK_C_INLINE
260 inline 
261 #endif
262 void CldGetTokenFromRank(char **msg, int rank)
263 {
264   CldProcInfo proc = CpvAccessOther(CldProc, rank);
265   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
266   CmiLock(cldlock);
267   *msg = _CldGetTokenMsg(proc);
268   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
269   CmiUnlock(cldlock);
270 }
271
272 static 
273 #if CMK_C_INLINE
274 inline 
275 #endif
276 void * _CldGetTokenMsgAt(CldProcInfo proc, CldToken tok)
277 {
278   void *msg;
279   
280   if (tok == proc->sentinel) return NULL;
281   tok->pred->succ = tok->succ;
282   tok->succ->pred = tok->pred;
283   proc->load --;
284   msg = tok->msg;
285   tok->msg = 0;
286   return msg;
287 }
288
289 /* called at node level */
290 /* get token from processor of rank pe */
291 static 
292 #if CMK_C_INLINE
293 inline 
294 #endif
295 void CldGetTokenFromRankAt(char **msg, int rank, CldToken tok)
296 {
297   CldProcInfo proc = CpvAccessOther(CldProc, rank);
298   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
299   CmiLock(cldlock);
300   *msg = _CldGetTokenMsgAt(proc, tok);
301   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
302   CmiUnlock(cldlock);
303 }
304
305 /* Bit Vector Stuff */
306
307 int CldPresentPE(int pe)
308 {
309   return CpvAccess(CldPEBitVector)[pe];
310 }
311
312 void CldMoveAllSeedsAway()
313 {
314   char *msg;
315   int len, queueing, priobits, pe;
316   unsigned int *prioptr;
317   CldInfoFn ifn;  CldPackFn pfn;
318
319   CldGetToken(&msg);
320   while (msg != 0) {
321     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
322     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
323     CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
324     pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
325     while (!CldPresentPE(pe))
326       pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
327     CmiSyncSendAndFree(pe, len, msg);
328     CldGetToken(&msg);
329   }
330 }
331
332 void CldSetPEBitVector(const char *newBV)
333 {
334   int i;
335   
336   for (i=0; i<CmiNumPes(); i++)
337     CpvAccess(CldPEBitVector)[i] = newBV[i];
338   if (!CldPresentPE(CmiMyPe()))
339     CldMoveAllSeedsAway();
340 }
341
342 /* End Bit Vector Stuff */
343
344 static int _cldb_cs = 0;
345
346 void CldModuleGeneralInit(char **argv)
347 {
348   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
349   CldProcInfo proc;
350   int i;
351
352   CpvInitialize(CldProcInfo, CldProc);
353   CpvInitialize(int, CldLoadOffset);
354   CpvAccess(CldLoadOffset) = 0;
355   CpvInitialize(int, CldLoadNotify);
356   CpvInitialize(BitVector, CldPEBitVector);
357   CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
358   for (i=0; i<CmiNumPes(); i++)
359     CpvAccess(CldPEBitVector)[i] = 1;
360   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
361   proc = CpvAccess(CldProc);
362   proc->load = 0;
363   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
364   proc->sentinel = sentinel;
365   sentinel->succ = sentinel;
366   sentinel->pred = sentinel;
367
368   /* lock to protect token queue for immediate message and smp */
369   CpvInitialize(CmiNodeLock, cldLock);
370   CpvAccess(cldLock) = CmiCreateLock();
371
372   _cldb_cs = CmiGetArgFlagDesc(argv, "+cldb_cs", "Converse> Print seed load balancing statistics.");
373   
374   if (CmiMyPe() == 0) {
375     char *stra = CldGetStrategy();
376     if (strcmp(stra, "rand") != 0) {
377       CmiPrintf("Charm++> %s seed load balancer.\n", stra);
378     }
379   } 
380 }
381
382 /* function can be called in an immediate handler at node level
383    rank specify the rank of processor for the node to represent
384    This function can also send as immeidate messages
385 */
386 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
387 {
388   char **msgs;
389   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
390   unsigned int *prioptr;
391   CldInfoFn ifn;
392   CldPackFn pfn;
393
394   msgs = (char **)calloc(numToSend, sizeof(char *));
395   msgSizes = (int *)calloc(numToSend, sizeof(int));
396
397   while (!done) {
398     numSent = 0;
399     parcelSize = 0;
400     for (i=0; i<numToSend; i++) {
401       CldGetTokenFromRank(&msgs[i], rank);
402       if (msgs[i] != 0) {
403         done = 1;
404         numSent++;
405         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
406         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
407         msgSizes[i] = len;
408         parcelSize += len;
409         CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
410         if (immed) CmiBecomeImmediate(msgs[i]);
411       }
412       else {
413         done = 1;
414         break;
415       }
416       if (parcelSize > MAXMSGBFRSIZE) {
417         if(i<numToSend-1)
418           done = 0;
419         numToSend -= numSent;
420         break;
421       }
422     }
423     if (numSent > 1) {
424       if (immed)
425         CmiMultipleIsend(pe, numSent, msgSizes, msgs);
426       else
427         CmiMultipleSend(pe, numSent, msgSizes, msgs);
428       for (i=0; i<numSent; i++)
429         CmiFree(msgs[i]);
430       CpvAccessOther(CldRelocatedMessages, rank) += numSent;
431       CpvAccessOther(CldMessageChunks, rank)++;
432     }
433     else if (numSent == 1) {
434       if (immed) CmiBecomeImmediate(msgs[0]);
435       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
436       CpvAccessOther(CldRelocatedMessages, rank)++;
437       CpvAccessOther(CldMessageChunks, rank)++;
438     }
439   }
440   free(msgs);
441   free(msgSizes);
442 }
443
444 /* function can be called in an immediate handler at node level
445    rank specify the rank of processor for the node to represent
446    This function can also send as immeidate messages
447 */
448 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed)
449 {
450   char **msgs;
451   int len, queueing, priobits, *msgSizes, i;
452   unsigned int *prioptr;
453   CldInfoFn ifn;
454   CldPackFn pfn;
455   CldToken tok;
456   CldProcInfo proc = CpvAccess(CldProc);
457   int count = 0;
458
459   if (numToSend ==0) return;
460   msgs = (char **)calloc(numToSend, sizeof(char *));
461   msgSizes = (int *)calloc(numToSend, sizeof(int));
462
463   tok = proc->sentinel->succ;
464   if (tok == proc->sentinel) return;
465   tok = tok->succ;
466   while (tok!=proc->sentinel) {
467     tok = tok->succ;
468     if (tok == proc->sentinel) break;
469     CldGetTokenFromRankAt(&msgs[count], rank, tok->pred);
470     if (msgs[i] != 0) {
471         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
472         ifn(msgs[count], &pfn, &len, &queueing, &priobits, &prioptr);
473         msgSizes[count] = len;
474         CldSwitchHandler(msgs[count], CpvAccessOther(CldBalanceHandlerIndex, rank));
475         if (immed) CmiBecomeImmediate(msgs[count]);
476         count ++;
477     }
478     tok = tok->succ;
479   }
480   if (count > 1) {
481       if (immed)
482         CmiMultipleIsend(pe, count, msgSizes, msgs);
483       else
484         CmiMultipleSend(pe, count, msgSizes, msgs);
485       for (i=0; i<count; i++)
486         CmiFree(msgs[i]);
487       CpvAccessOther(CldRelocatedMessages, rank) += count;
488       CpvAccessOther(CldMessageChunks, rank)++;
489   }
490   else if (count == 1) {
491       if (immed) CmiBecomeImmediate(msgs[0]);
492       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
493       CpvAccessOther(CldRelocatedMessages, rank)++;
494       CpvAccessOther(CldMessageChunks, rank)++;
495   }
496   free(msgs);
497   free(msgSizes);
498 }
499
500 /* simple scheme - just send one by one. useful for multicore */
501 void CldSimpleMultipleSend(int pe, int numToSend, int rank)
502 {
503   char *msg;
504   int len, queueing, priobits, *msgSizes, i, numSent, done=0;
505   unsigned int *prioptr;
506   CldInfoFn ifn;
507   CldPackFn pfn;
508
509   if (numToSend == 0)
510     return;
511
512   numSent = 0;
513   while (!done) {
514     for (i=0; i<numToSend; i++) {
515       CldGetTokenFromRank(&msg, rank);
516       if (msg != 0) {
517         done = 1;
518         numToSend--;
519         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
520         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
521         CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, rank));
522         CmiSyncSendAndFree(pe, len, msg);
523         if (numToSend == 0) done = 1;
524       }
525       else {
526         done = 1;
527         break;
528       }
529     }
530   }
531 }
532
533 void seedBalancerExit()
534 {
535   if (_cldb_cs)
536     CmiPrintf("[%d] Relocate message number is %d\n", CmiMyPe(), CpvAccess(CldRelocatedMessages));
537 }