Changed CldEnqueue to three parameters ( no pack function )
[charm.git] / src / conv-ldb / cldb.c
1 #include "converse.h"
2
3 /*
4  * CldSwitchHandler takes a message and a new handler number.  It
5  * changes the handler number to the new handler number and move the
6  * old to the Xhandler part of the header.  When the message gets
7  * handled, the handler should call CldRestoreHandler to put the old
8  * handler back.
9  *
10  * CldPutToken puts a message in the scheduler queue in such a way
11  * that it can be retreived from the queue.  Once the message gets
12  * handled, it can no longer be retreived.  CldGetToken removes a
13  * message that was placed in the scheduler queue in this way.
14  * CldCountTokens tells you how many tokens are currently retreivable.
15  *
16  * Caution: these functions are using the function "CmiReference"
17  * which I just added to the Cmi memory allocator (it increases the
18  * reference count field, making it possible to free the memory
19  * twice.)  I'm not sure how well this is going to work.  I need this
20  * because the message should not be freed until it's out of the
21  * scheduler queue AND out of the user's hands.  It needs to stay
22  * around while it's in the scheduler queue because it may contain
23  * a priority.  I should probably rewrite these subroutines so that
24  * they simply copy the priority, I would feel safer that way.
25  *
26  */
27
28 int CldRegisterInfoFn(CldInfoFn fn)
29 {
30   return CmiRegisterHandler((CmiHandler)fn);
31 }
32
33 int CldRegisterPackFn(CldPackFn fn)
34 {
35   return CmiRegisterHandler((CmiHandler)fn);
36 }
37
38 void CldSwitchHandler(char *cmsg, int handler)
39 {
40   CmiSetXHandler(cmsg, CmiGetHandler(cmsg));
41   CmiSetHandler(cmsg, handler);
42 }
43
44 void CldRestoreHandler(char *cmsg)
45 {
46   CmiSetHandler(cmsg, CmiGetXHandler(cmsg));
47 }
48
49 void Cldhandler(void *);
50  
51 typedef struct CldToken_s {
52   char msg_header[CmiMsgHeaderSizeBytes];
53   void *msg;  /* if null, message already removed */
54   struct CldToken_s *pred;
55   struct CldToken_s *succ;
56 } *CldToken;
57
58 typedef struct CldProcInfo_s {
59   int tokenhandleridx;
60   int load; /* number of items in doubly-linked circle besides sentinel */
61   CldToken sentinel;
62 } *CldProcInfo;
63
64 CpvDeclare(CldProcInfo, CldProc);
65
66 static void CldTokenHandler(CldToken tok)
67 {
68   CldProcInfo proc = CpvAccess(CldProc);
69   CldToken pred, succ;
70   if (tok->pred) {
71     tok->pred->succ = tok->succ;
72     tok->succ->pred = tok->pred;
73     proc->load --;
74     CmiHandleMessage(tok->msg);
75   } else {
76     /* CmiFree(tok->msg); */
77   }
78 }
79
80 int CldCountTokens()
81 {
82   CldProcInfo proc = CpvAccess(CldProc);
83   return proc->load;
84 }
85
86 void CldPutToken(void *msg)
87 {
88   CldProcInfo proc = CpvAccess(CldProc);
89   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
90   CldToken tok = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
91   int len, queueing, priobits; unsigned int *prioptr;
92   CldPackFn pfn;
93
94   tok->msg = msg;
95
96   /* add token to the doubly-linked circle */
97   tok->pred = proc->sentinel->pred;
98   tok->succ = proc->sentinel;
99   tok->pred->succ = tok;
100   tok->succ->pred = tok;
101   proc->load ++;
102   
103   /* add token to the scheduler */
104   CmiSetHandler(tok, proc->tokenhandleridx);
105   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
106   CsdEnqueueGeneral(tok, queueing, priobits, prioptr);
107 }
108
109 void CldGetToken(void **msg)
110 {
111   CldProcInfo proc = CpvAccess(CldProc);
112   CldToken tok;
113   tok = proc->sentinel->succ;
114   if (tok == proc->sentinel) {
115     *msg = 0; return;
116   }
117   tok->pred->succ = tok->succ;
118   tok->succ->pred = tok->pred;
119   tok->succ = 0;
120   tok->pred = 0;
121   proc->load --;
122   *msg = tok->msg;
123   CmiReference(*msg);
124 }
125
126 void CldModuleGeneralInit()
127 {
128   CldToken sentinel = (CldToken)CmiAlloc(sizeof(struct CldToken_s));
129   CldProcInfo proc;
130
131   CpvInitialize(CldProcInfo, CldProc);
132   CpvAccess(CldProc) = (CldProcInfo)CmiAlloc(sizeof(struct CldProcInfo_s));
133   proc = CpvAccess(CldProc);
134   proc->load = 0;
135   proc->tokenhandleridx = CmiRegisterHandler((CmiHandler)CldTokenHandler);
136   proc->sentinel = sentinel;
137   sentinel->succ = sentinel;
138   sentinel->pred = sentinel;
139 }