changed for immediate msg
[charm.git] / src / conv-ldb / cldb.c
1
2 #include <stdlib.h>
3 #include "queueing.h"
4 #include "cldb.h"
5 #include <math.h>
6
7 typedef char *BitVector;
8
9 CpvDeclare(int, CldHandlerIndex);
10 CpvDeclare(BitVector, CldPEBitVector);
11 CpvDeclare(int, CldBalanceHandlerIndex);
12
13 CpvDeclare(int, CldRelocatedMessages);
14 CpvDeclare(int, CldLoadBalanceMessages);
15 CpvDeclare(int, CldMessageChunks);
16 CpvDeclare(int, CldLoadNotify);
17
18 CpvDeclare(CmiNodeLock, cldLock);
19
20 extern void LoadNotifyFn(int);
21
22 char* _lbtopo = "torus_nd_5";
23
24 /* Estimator stuff.  Of any use? */
25 /*
26 CpvStaticDeclare(CldEstimatorTable, _estfns);
27 */
28 void CldRegisterEstimator(CldEstimator fn)
29 {
30   /*CpvAccess(_estfns).fns[CpvAccess(_estfns).count++] = fn;*/
31 }
32
33 /* 
34 int CldEstimate(void)
35 {
36   CldEstimatorTable *estab = &(CpvAccess(_estfns));
37   int i, load=0;
38   for(i=0; i<estab->count; i++)
39     load += (*(estab->fns[i]))();
40   return load;
41 }
42
43 static int CsdEstimator(void)
44 {
45   return CsdLength();
46 }
47 */
48
49 CpvDeclare(int, CldLoadOffset);
50
51
52 int CldRegisterInfoFn(CldInfoFn fn)
53 {
54   return CmiRegisterHandler((CmiHandler)fn);
55 }
56
57 int CldRegisterPackFn(CldPackFn fn)
58 {
59   return CmiRegisterHandler((CmiHandler)fn);
60 }
61
62 /* CldSwitchHandler takes a message and a new handler number.  It
63  * changes the handler number to the new handler number and move the
64  * old to the Xhandler part of the header.  When the message gets
65  * handled, the handler should call CldRestoreHandler to put the old
66  * handler back.
67  *
68  * CldPutToken puts a message in the scheduler queue in such a way
69  * that it can be retreived from the queue.  Once the message gets
70  * handled, it can no longer be retreived.  CldGetToken removes a
71  * message that was placed in the scheduler queue in this way.
72  * CldCountTokens tells you how many tokens are currently retreivable.  
73 */
74
75 void CldSwitchHandler(char *cmsg, int handler)
76 {
77 #if CMK_MEM_CHECKPOINT
78   int old_phase = CmiGetRestartPhase(cmsg);
79 #endif
80   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
81   CmiSetHandler(cmsg, handler);
82 #if CMK_MEM_CHECKPOINT
83   CmiGetRestartPhase(cmsg) = old_phase;
84 #endif
85 }
86
87 void CldRestoreHandler(char *cmsg)
88 {
89 #if CMK_MEM_CHECKPOINT
90   int old_phase = CmiGetRestartPhase(cmsg);
91 #endif
92   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
93 #if CMK_MEM_CHECKPOINT
94   CmiGetRestartPhase(cmsg) = old_phase;
95 #endif
96 }
97
98 void Cldhandler(char *);
99  
100 typedef struct CldToken_s {
101   char msg_header[CmiMsgHeaderSizeBytes];
102   char *msg;  /* if null, message already removed */
103   struct CldToken_s *pred;
104   struct CldToken_s *succ;
105 } *CldToken;
106
107 typedef struct CldProcInfo_s {
108   int tokenhandleridx;
109   int load; /* number of items in doubly-linked circle besides sentinel */
110   CldToken sentinel;
111 } *CldProcInfo;
112
113 CpvDeclare(CldProcInfo, CldProc);
114
115 static void CldTokenHandler(CldToken tok)
116 {
117   CldProcInfo proc = CpvAccess(CldProc);
118   if (tok->msg) {
119     tok->pred->succ = tok->succ;
120     tok->succ->pred = tok->pred;
121     proc->load --;
122     CmiHandleMessage(tok->msg);
123   }
124   else 
125     CpvAccess(CldLoadOffset)--;
126   if (CpvAccess(CldLoadNotify))
127     LoadNotifyFn(CpvAccess(CldProc)->load);
128   CmiFree(tok);
129 }
130
131 int CldCountTokensRank(int rank)
132 {
133   return CpvAccessOther(CldProc, rank)->load;
134 }
135
136 int CldCountTokens(void)
137 {
138   return (CpvAccess(CldProc)->load);
139 }
140
141 int CldLoad(void)
142 {
143   return (CsdLength() - CpvAccess(CldLoadOffset));
144 }
145
146 int CldLoadRank(int rank)
147 {
148   int len, offset;
149   /* CmiLock(CpvAccessOther(cldLock, rank));  */
150   len = CqsLength(CpvAccessOther(CsdSchedQueue, rank));
151      /* CldLoadOffset is the empty token counter */
152   offset = CpvAccessOther(CldLoadOffset, rank);
153   /* CmiUnlock(CpvAccessOther(cldLock, rank)); */
154   return len - offset;
155 }
156
157 void CldPutToken(char *msg)
158 {
159   CldProcInfo proc = CpvAccess(CldProc);
160   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
161   CldToken tok;
162   int len, queueing, priobits; unsigned int *prioptr;
163   CldPackFn pfn;
164
165   CmiLock(CpvAccess(cldLock));
166   tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
167   tok->msg = msg;
168
169   /* add token to the doubly-linked circle */
170   tok->pred = proc->sentinel->pred;
171   tok->succ = proc->sentinel;
172   tok->pred->succ = tok;
173   tok->succ->pred = tok;
174   proc->load ++;
175   /* add token to the scheduler */
176   CmiSetHandler(tok, proc->tokenhandleridx);
177   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
178   /* not sigio or thread safe */
179   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
180   CmiUnlock(CpvAccess(cldLock));
181 }
182
183
184 static 
185 #if CMK_C_INLINE
186 inline 
187 #endif
188 void * _CldGetTokenMsg(CldProcInfo proc)
189 {
190   CldToken tok;
191   void *msg;
192   
193   tok = proc->sentinel->succ;
194   if (tok == proc->sentinel) {
195     return NULL;
196   }
197   tok->pred->succ = tok->succ;
198   tok->succ->pred = tok->pred;
199   proc->load --;
200   msg = tok->msg;
201   tok->msg = 0;
202   return msg;
203 }
204
205 void CldGetToken(char **msg)
206 {
207   CldProcInfo proc = CpvAccess(CldProc);
208   CmiNodeLock cldlock = CpvAccess(cldLock);
209   CmiLock(cldlock);
210   *msg = _CldGetTokenMsg(proc);
211   if (*msg) CpvAccess(CldLoadOffset)++;
212   CmiUnlock(cldlock);
213 }
214
215 /* called at node level */
216 /* get token from processor of rank pe */
217 static 
218 #if CMK_C_INLINE
219 inline 
220 #endif
221 void CldGetTokenFromRank(char **msg, int rank)
222 {
223   CldProcInfo proc = CpvAccessOther(CldProc, rank);
224   CmiNodeLock cldlock = CpvAccessOther(cldLock, rank);
225   CmiLock(cldlock);
226   *msg = _CldGetTokenMsg(proc);
227   if (*msg) CpvAccessOther(CldLoadOffset, rank)++;
228   CmiUnlock(cldlock);
229 }
230
231 /* Bit Vector Stuff */
232
233 int CldPresentPE(int pe)
234 {
235   return CpvAccess(CldPEBitVector)[pe];
236 }
237
238 void CldMoveAllSeedsAway()
239 {
240   char *msg;
241   int len, queueing, priobits, pe;
242   unsigned int *prioptr;
243   CldInfoFn ifn;  CldPackFn pfn;
244
245   CldGetToken(&msg);
246   while (msg != 0) {
247     ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
248     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
249     CldSwitchHandler(msg, CpvAccess(CldBalanceHandlerIndex));
250     pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
251     while (!CldPresentPE(pe))
252       pe = (((CrnRand()+CmiMyPe())&0x7FFFFFFF)%CmiNumPes());
253     CmiSyncSendAndFree(pe, len, msg);
254     CldGetToken(&msg);
255   }
256 }
257
258 void CldSetPEBitVector(const char *newBV)
259 {
260   int i;
261   
262   for (i=0; i<CmiNumPes(); i++)
263     CpvAccess(CldPEBitVector)[i] = newBV[i];
264   if (!CldPresentPE(CmiMyPe()))
265     CldMoveAllSeedsAway();
266 }
267
268 /* End Bit Vector Stuff */
269
270 void CldModuleGeneralInit(char **argv)
271 {
272   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
273   CldProcInfo proc;
274   int i;
275
276   CpvInitialize(CldProcInfo, CldProc);
277   CpvInitialize(int, CldLoadOffset);
278   CpvAccess(CldLoadOffset) = 0;
279   CpvInitialize(int, CldLoadNotify);
280   CpvInitialize(BitVector, CldPEBitVector);
281   CpvAccess(CldPEBitVector) = (char *)malloc(CmiNumPes()*sizeof(char));
282   for (i=0; i<CmiNumPes(); i++)
283     CpvAccess(CldPEBitVector)[i] = 1;
284   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
285   proc = CpvAccess(CldProc);
286   proc->load = 0;
287   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
288   proc->sentinel = sentinel;
289   sentinel->succ = sentinel;
290   sentinel->pred = sentinel;
291
292   /* lock to protect token queue for immediate message and smp */
293   CpvInitialize(CmiNodeLock, cldLock);
294   CpvAccess(cldLock) = CmiCreateLock();
295
296   
297   if (CmiMyPe() == 0) {
298     char *stra = CldGetStrategy();
299     if (strcmp(stra, "rand") != 0) {
300       CmiPrintf("Charm++> %s seed load balancer.\n", stra);
301     }
302   } 
303 }
304
305 /* function can be called in an immediate handler at node level
306    rank specify the rank of processor for the node to represent
307    This function can also send as immeidate messages
308 */
309 void CldMultipleSend(int pe, int numToSend, int rank, int immed)
310 {
311   char **msgs;
312   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
313   unsigned int *prioptr;
314   CldInfoFn ifn;
315   CldPackFn pfn;
316
317   msgs = (char **)calloc(numToSend, sizeof(char *));
318   msgSizes = (int *)calloc(numToSend, sizeof(int));
319
320   while (!done) {
321     numSent = 0;
322     parcelSize = 0;
323     for (i=0; i<numToSend; i++) {
324       CldGetTokenFromRank(&msgs[i], rank);
325       if (msgs[i] != 0) {
326         done = 1;
327         numSent++;
328         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
329         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
330         msgSizes[i] = len;
331         parcelSize += len;
332         CldSwitchHandler(msgs[i], CpvAccessOther(CldBalanceHandlerIndex, rank));
333         if (immed) CmiBecomeImmediate(msgs[i]);
334       }
335       else {
336         done = 1;
337         break;
338       }
339       if (parcelSize > MAXMSGBFRSIZE) {
340         if(i<numToSend-1)
341           done = 0;
342         numToSend -= numSent;
343         break;
344       }
345     }
346     if (numSent > 1) {
347       if (immed)
348         CmiMultipleIsend(pe, numSent, msgSizes, msgs);
349       else
350         CmiMultipleSend(pe, numSent, msgSizes, msgs);
351       for (i=0; i<numSent; i++)
352         CmiFree(msgs[i]);
353       CpvAccessOther(CldRelocatedMessages, rank) += numSent;
354       CpvAccessOther(CldMessageChunks, rank)++;
355     }
356     else if (numSent == 1) {
357       if (immed) CmiBecomeImmediate(msgs[0]);
358       CmiSyncSendAndFree(pe, msgSizes[0], msgs[0]);
359       CpvAccessOther(CldRelocatedMessages, rank)++;
360       CpvAccessOther(CldMessageChunks, rank)++;
361     }
362   }
363   free(msgs);
364   free(msgSizes);
365 }
366
367 /* simple scheme - just send one by one. useful for multicore */
368 void CldSimpleMultipleSend(int pe, int numToSend)
369 {
370   char *msg;
371   int len, queueing, priobits, *msgSizes, i, numSent, done=0;
372   unsigned int *prioptr;
373   CldInfoFn ifn;
374   CldPackFn pfn;
375
376   if (numToSend == 0)
377     return;
378
379   numSent = 0;
380   while (!done) {
381     for (i=0; i<numToSend; i++) {
382       CldGetToken(&msg);
383       if (msg != 0) {
384         done = 1;
385         numToSend--;
386         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
387         ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
388         CldSwitchHandler(msg, CpvAccessOther(CldBalanceHandlerIndex, pe));
389         CmiSyncSendAndFree(pe, len, msg);
390         if (numToSend == 0) done = 1;
391       }
392       else {
393         done = 1;
394         break;
395       }
396     }
397   }
398 }