build: fix travis MPI/SMP build
[charm.git] / src / conv-ldb / cldb.C
1
2 #include <stdlib.h>
3 #include "queueing.h"
4 #include "cldb.h"
5 #include <math.h>
6
7 typedef char *BitVector;
8
9 CpvDeclare(int, CldHandlerIndex);
10 CpvDeclare(int, CldNodeHandlerIndex);
11 CpvDeclare(BitVector, CldPEBitVector);
12 CpvDeclare(int, CldBalanceHandlerIndex);
13
14 CpvDeclare(int, CldRelocatedMessages);
15 CpvDeclare(int, CldLoadBalanceMessages);
16 CpvDeclare(int, CldMessageChunks);
17 CpvDeclare(int, CldLoadNotify);
18
19 CpvDeclare(CmiNodeLock, cldLock);
20
21 extern void LoadNotifyFn(int);
22
23 static char s_lbtopo_default[] = "torus_nd_5";
24 extern char *_lbtopo;
25 char *_lbtopo = s_lbtopo_default;
26
27 /* Estimator stuff.  Of any use? */
28 /*
29 CpvStaticDeclare(CldEstimatorTable, _estfns);
30 */
31 void CldRegisterEstimator(CldEstimator fn)
32 {
33   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
34 }
35
36 /* 
37 int CldEstimate(void)
38 {
39   CldEstimatorTable *estab = &(CpvAccess(_estfns));
40   int i, load=0;
41   for(i=0; i<estab->count; i++)
42     load += (*(estab->fns[i]))();
43   return load;
44 }
45
46 static int CsdEstimator(void)
47 {
48   return CsdLength();
49 }
50 */
51
52 CpvDeclare(int, CldLoadOffset);
53
54
55 int CldRegisterInfoFn(CldInfoFn fn)
56 {
57   return CmiRegisterHandler((CmiHandler)fn);
58 }
59
60 int CldRegisterPackFn(CldPackFn fn)
61 {
62   return CmiRegisterHandler((CmiHandler)fn);
63 }
64
65 /* CldSwitchHandler takes a message and a new handler number.  It
66  * changes the handler number to the new handler number and move the
67  * old to the Xhandler part of the header.  When the message gets
68  * handled, the handler should call CldRestoreHandler to put the old
69  * handler back.
70  *
71  * CldPutToken puts a message in the scheduler queue in such a way
72  * that it can be retreived from the queue.  Once the message gets
73  * handled, it can no longer be retreived.  CldGetToken removes a
74  * message that was placed in the scheduler queue in this way.
75  * CldCountTokens tells you how many tokens are currently retreivable.  
76 */
77
78 void CldSwitchHandler(char *cmsg, int handler)
79 {
80 #if CMK_MEM_CHECKPOINT
81   int old_phase = CmiGetRestartPhase(cmsg);
82 #endif
83   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
84   CmiSetHandler(cmsg, handler);
85 #if CMK_MEM_CHECKPOINT
86   CmiGetRestartPhase(cmsg) = old_phase;
87 #endif
88 }
89
90 void CldRestoreHandler(char *cmsg)
91 {
92 #if CMK_MEM_CHECKPOINT
93   int old_phase = CmiGetRestartPhase(cmsg);
94 #endif
95   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
96 #if CMK_MEM_CHECKPOINT
97   CmiGetRestartPhase(cmsg) = old_phase;
98 #endif
99 }
100
101 void Cldhandler(char *);
102  
103 typedef struct CldToken_s {
104   char msg_header[CmiMsgHeaderSizeBytes];
105   char *msg;  /* if null, message already removed */
106   struct CldToken_s *pred;
107   struct CldToken_s *succ;
108 } *CldToken;
109
110 typedef struct CldProcInfo_s {
111   int tokenhandleridx;
112   int load; /* number of items in doubly-linked circle besides sentinel */
113   CldToken sentinel;
114 } *CldProcInfo;
115
116 CpvDeclare(CldProcInfo, CldProc);
117
118 static void CldTokenHandler(CldToken tok)
119 {
120   CldProcInfo proc = CpvAccess(CldProc);
121   if (tok->msg) {
122     tok->pred->succ = tok->succ;
123     tok->succ->pred = tok->pred;
124     proc->load --;
125     CmiHandleMessage(tok->msg);
126   }
127   else 
128     CpvAccess(CldLoadOffset)--;
129   if (CpvAccess(CldLoadNotify))
130     LoadNotifyFn(CpvAccess(CldProc)->load);
131   CmiFree(tok);
132 }
133
134 int CldCountTokensRank(int rank)
135 {
136   return CpvAccessOther(CldProc, rank)->load;
137 }
138
139 int CldCountTokens(void)
140 {
141   return (CpvAccess(CldProc)->load);
142 }
143
144 int CldLoad(void)
145 {
146   return (CsdLength() - CpvAccess(CldLoadOffset));
147 }
148
149 int CldLoadRank(int rank)
150 {
151   int len, offset;
152   /* CmiLock(CpvAccessOther(cldLock, rank));  */
153   len = CqsLength((Queue)CpvAccessOther(CsdSchedQueue, rank));
154      /* CldLoadOffset is the empty token counter */
155   offset = CpvAccessOther(CldLoadOffset, rank);
156   /* CmiUnlock(CpvAccessOther(cldLock, rank)); */
157   return len - offset;
158 }
159
160 void CldPutToken(char *msg)
161 {
162   CldProcInfo proc = CpvAccess(CldProc);
163   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
164   CldToken tok;
165   int len, queueing, priobits; unsigned int *prioptr;
166   CldPackFn pfn;
167
168   CmiLock(CpvAccess(cldLock));
169   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
170   tok->msg = msg;
171
172   /* add token to the doubly-linked circle */
173   tok->pred = proc->sentinel->pred;
174   tok->succ = proc->sentinel;
175   tok->pred->succ = tok;
176   tok->succ->pred = tok;
177   proc->load ++;
178   /* add token to the scheduler */
179   CmiSetHandler(tok, proc->tokenhandleridx);
180   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
181   /* not sigio or thread safe */
182   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
183   CmiUnlock(CpvAccess(cldLock));
184 }
185
186 void CldPutTokenPrio(char *msg)
187 {
188   CldProcInfo proc = CpvAccess(CldProc);
189   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
190   CldToken tok, ptr;
191   int len, queueing, priobits; unsigned int *prioptr, ints;
192   CldPackFn pfn;
193
194   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
195   ints = (priobits+CINTBITS-1)/CINTBITS;
196
197   CmiLock(CpvAccess(cldLock));
198   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
199   tok->msg = msg;
200
201   /* find the right place */
202   ptr = proc->sentinel->succ;
203   while (ptr!=proc->sentinel) {
204     int len1, queueing1, priobits1; unsigned int *prioptr1, ints1;
205     CldPackFn pfn1;
206     ifn(ptr->msg, &pfn1, &len1, &queueing1, &priobits1, &prioptr1);
207     ints1 = (priobits1+CINTBITS-1)/CINTBITS;
208
209     if (!CqsPrioGT_(ints, prioptr, ints1, prioptr1)) { break;}
210     ptr = ptr->succ;
211   }
212
213   /* add token to the doubly-linked circle */
214   tok->succ = ptr;
215   tok->pred = ptr->pred;
216   tok->pred->succ = tok;
217   tok->succ->pred = tok;
218   proc->load ++;
219   /* add token to the scheduler */
220   CmiSetHandler(tok, proc->tokenhandleridx);
221   /* not sigio or thread safe */
222   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
223   CmiUnlock(CpvAccess(cldLock));
224 }
225
226
227 static 
228 #if CMK_C_INLINE
229 inline 
230 #endif
231 void * _CldGetTokenMsg(CldProcInfo proc)
232 {
233   CldToken tok;
234   void *msg;
235   
236   tok = proc->sentinel->succ;
237   if (tok == proc->sentinel) {
238     return NULL;
239   }
240   tok->pred->succ = tok->succ;
241   tok->succ->pred = tok->pred;
242   proc->load --;
243   msg = tok->msg;
244   tok->msg = 0;
245   return msg;
246 }
247
248 void CldGetToken(char **msg)
249 {
250   CldProcInfo proc = CpvAccess(CldProc);
251   CmiNodeLock cldlock = CpvAccess(cldLock);
252   CmiLock(cldlock);
253   *msg = (char *)_CldGetTokenMsg(proc);
254   if (*msg) CpvAccess(CldLoadOffset)++;
255   CmiUnlock(cldlock);
256 }
257
258 /* called at node level */
259 /* get token from processor of rank pe */
260 static 
261 #if CMK_C_INLINE
262 inline 
263 #endif
264 void CldGetTokenFromRank(char **msg, int rank)
265 {
266   CldProcInfo proc = CpvAccessOther(CldProc, rank);
267   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
268   CmiLock(cldlock);
269   *msg = (char *)_CldGetTokenMsg(proc);
270   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
271   CmiUnlock(cldlock);
272 }
273
274 static 
275 #if CMK_C_INLINE
276 inline 
277 #endif
278 void * _CldGetTokenMsgAt(CldProcInfo proc, CldToken tok)
279 {
280   void *msg;
281   
282   if (tok == proc->sentinel) return NULL;
283   tok->pred->succ = tok->succ;
284   tok->succ->pred = tok->pred;
285   proc->load --;
286   msg = tok->msg;
287   tok->msg = 0;
288   return msg;
289 }
290
291 /* called at node level */
292 /* get token from processor of rank pe */
293 static 
294 #if CMK_C_INLINE
295 inline 
296 #endif
297 void CldGetTokenFromRankAt(char **msg, int rank, CldToken tok)
298 {
299   CldProcInfo proc = CpvAccessOther(CldProc, rank);
300   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
301   CmiLock(cldlock);
302   *msg = (char *)_CldGetTokenMsgAt(proc, tok);
303   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
304   CmiUnlock(cldlock);
305 }
306
307 /* Bit Vector Stuff */
308
309 int CldPresentPE(int pe)
310 {
311   return CpvAccess(CldPEBitVector)[pe];
312 }
313
314 void CldMoveAllSeedsAway(void)
315 {
316   char *msg;
317   int len, queueing, priobits, pe;
318   unsigned int *prioptr;
319   CldInfoFn ifn;  CldPackFn pfn;
320
321   CldGetToken(&msg);
322   while (msg != 0) {
323     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
324     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
325     CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
326     pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
327     while (!CldPresentPE(pe))
328       pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
329     CmiSyncSendAndFree(pe, len, msg);
330     CldGetToken(&msg);
331   }
332 }
333
334 void CldSetPEBitVector(const char *newBV)
335 {
336   int i;
337   
338   for (i=0; i<CmiNumPes(); i++)
339     CpvAccess(CldPEBitVector)[i] = newBV[i];
340   if (!CldPresentPE(CmiMyPe()))
341     CldMoveAllSeedsAway();
342 }
343
344 /* End Bit Vector Stuff */
345
346 static int _cldb_cs = 0;
347
348 void CldModuleGeneralInit(char **argv)
349 {
350   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
351   CldProcInfo proc;
352   int i;
353
354   CpvInitialize(CldProcInfo, CldProc);
355   CpvInitialize(int, CldLoadOffset);
356   CpvAccess(CldLoadOffset) = 0;
357   CpvInitialize(int, CldLoadNotify);
358   CpvInitialize(BitVector, CldPEBitVector);
359   CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
360   for (i=0; i<CmiNumPes(); i++)
361     CpvAccess(CldPEBitVector)[i] = 1;
362   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
363   proc = CpvAccess(CldProc);
364   proc->load = 0;
365   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
366   proc->sentinel = sentinel;
367   sentinel->succ = sentinel;
368   sentinel->pred = sentinel;
369
370   /* lock to protect token queue for immediate message and smp */
371   CpvInitialize(CmiNodeLock, cldLock);
372   CpvAccess(cldLock) = CmiCreateLock();
373
374   _cldb_cs = CmiGetArgFlagDesc(argv, "+cldb_cs", "Converse> Print seed load balancing statistics.");
375   
376   if (CmiMyPe() == 0) {
377     const char *stra = CldGetStrategy();
378     if (strcmp(stra, "rand") != 0) {
379       CmiPrintf("Charm++> %s seed load balancer.\n", stra);
380     }
381   } 
382 }
383
384 /* function can be called in an immediate handler at node level
385    rank specify the rank of processor for the node to represent
386    This function can also send as immeidate messages
387 */
388 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
389 {
390   char **msgs;
391   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
392   unsigned int *prioptr;
393   CldInfoFn ifn;
394   CldPackFn pfn;
395
396   msgs = (char **)calloc(numToSend, sizeof(char *));
397   msgSizes = (int *)calloc(numToSend, sizeof(int));
398
399   while (!done) {
400     numSent = 0;
401     parcelSize = 0;
402     for (i=0; i<numToSend; i++) {
403       CldGetTokenFromRank(&msgs[i], rank);
404       if (msgs[i] != 0) {
405         done = 1;
406         numSent++;
407         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
408         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
409         msgSizes[i] = len;
410         parcelSize += len;
411         CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
412         if (immed) CmiBecomeImmediate(msgs[i]);
413       }
414       else {
415         done = 1;
416         break;
417       }
418       if (parcelSize > MAXMSGBFRSIZE) {
419         if(i<numToSend-1)
420           done = 0;
421         numToSend -= numSent;
422         break;
423       }
424     }
425     if (numSent > 1) {
426       if (immed)
427         CmiMultipleIsend(pe, numSent, msgSizes, msgs);
428       else
429         CmiMultipleSend(pe, numSent, msgSizes, msgs);
430       for (i=0; i<numSent; i++)
431         CmiFree(msgs[i]);
432       CpvAccessOther(CldRelocatedMessages, rank) += numSent;
433       CpvAccessOther(CldMessageChunks, rank)++;
434     }
435     else if (numSent == 1) {
436       if (immed) CmiBecomeImmediate(msgs[0]);
437       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
438       CpvAccessOther(CldRelocatedMessages, rank)++;
439       CpvAccessOther(CldMessageChunks, rank)++;
440     }
441   }
442   free(msgs);
443   free(msgSizes);
444 }
445
446 /* function can be called in an immediate handler at node level
447    rank specify the rank of processor for the node to represent
448    This function can also send as immeidate messages
449 */
450 void CldMultipleSendPrio(int pe, int numToSend, int rank, int immed)
451 {
452   char **msgs;
453   int len, queueing, priobits, *msgSizes, i;
454   unsigned int *prioptr;
455   CldInfoFn ifn;
456   CldPackFn pfn;
457   CldToken tok;
458   CldProcInfo proc = CpvAccess(CldProc);
459   int count = 0;
460
461   if (numToSend ==0) return;
462   msgs = (char **)calloc(numToSend, sizeof(char *));
463   msgSizes = (int *)calloc(numToSend, sizeof(int));
464
465   tok = proc->sentinel->succ;
466   if (tok == proc->sentinel) {
467     free(msgs);
468     free(msgSizes);
469     return;
470   }
471   tok = tok->succ;
472   while (tok!=proc->sentinel) {
473     tok = tok->succ;
474     if (tok == proc->sentinel) break;
475     CldGetTokenFromRankAt(&msgs[count], rank, tok->pred);
476     if (msgs[count] != 0) {
477         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[count]));
478         ifn(msgs[count], &pfn, &len, &queueing, &priobits, &prioptr);
479         msgSizes[count] = len;
480         CldSwitchHandler(msgs[count], CpvAccessOther(CldBalanceHandlerIndex, rank));
481         if (immed) CmiBecomeImmediate(msgs[count]);
482         count ++;
483     }
484     tok = tok->succ;
485   }
486   if (count > 1) {
487       if (immed)
488         CmiMultipleIsend(pe, count, msgSizes, msgs);
489       else
490         CmiMultipleSend(pe, count, msgSizes, msgs);
491       for (i=0; i<count; i++)
492         CmiFree(msgs[i]);
493       CpvAccessOther(CldRelocatedMessages, rank) += count;
494       CpvAccessOther(CldMessageChunks, rank)++;
495   }
496   else if (count == 1) {
497       if (immed) CmiBecomeImmediate(msgs[0]);
498       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
499       CpvAccessOther(CldRelocatedMessages, rank)++;
500       CpvAccessOther(CldMessageChunks, rank)++;
501   }
502   free(msgs);
503   free(msgSizes);
504 }
505
506 /* simple scheme - just send one by one. useful for multicore */
507 void CldSimpleMultipleSend(int pe, int numToSend, int rank)
508 {
509   char *msg;
510   int len, queueing, priobits, *msgSizes, i, numSent, done=0;
511   unsigned int *prioptr;
512   CldInfoFn ifn;
513   CldPackFn pfn;
514
515   if (numToSend == 0)
516     return;
517
518   numSent = 0;
519   while (!done) {
520     for (i=0; i<numToSend; i++) {
521       CldGetTokenFromRank(&msg, rank);
522       if (msg != 0) {
523         done = 1;
524         numToSend--;
525         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
526         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
527         CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, rank));
528         CmiSyncSendAndFree(pe, len, msg);
529         if (numToSend == 0) done = 1;
530       }
531       else {
532         done = 1;
533         break;
534       }
535     }
536   }
537 }
538
539 void seedBalancerExit(void)
540 {
541   if (_cldb_cs)
542     CmiPrintf("[%d] Relocate message number is %d\n", CmiMyPe(), CpvAccess(CldRelocatedMessages));
543 }