Added communication tracking for load balancing
[charm.git] / src / conv-ldb / cldb.test.c
1 #include <stdio.h>
2 #include "converse.h"
3 #define PERIOD 100
4 #define MAXMSGBFRSIZE 100000
5
6 void LoadNotifyFn(int l)
7 {
8 }
9
10 char *CldGetStrategy(void)
11 {
12   return "test";
13 }
14
15 CpvDeclare(int, CldHandlerIndex);
16 CpvDeclare(int, CldBalanceHandlerIndex);
17 CpvDeclare(int, CldRelocatedMessages);
18 CpvDeclare(int, CldLoadBalanceMessages);
19 CpvDeclare(int, CldMessageChunks);
20
21 void CldMultipleSend(int pe, int numToSend)
22 {
23   void **msgs;
24   int len, queueing, priobits, *msgSizes, i, numSent, done=0, parcelSize;
25   unsigned int *prioptr;
26   CldInfoFn ifn;
27   CldPackFn pfn;
28
29   if (numToSend == 0)
30     return;
31
32   msgs = (void **)calloc(numToSend, sizeof(void *));
33   msgSizes = (int *)calloc(numToSend, sizeof(int));
34
35   while (!done) {
36     numSent = 0;
37     parcelSize = 0;
38     for (i=0; i<numToSend; i++) {
39       CldGetToken(&msgs[i]);
40       if (msgs[i] != 0) {
41         done = 1;
42         numSent++;
43         ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msgs[i]));
44         ifn(msgs[i], &pfn, &len, &queueing, &priobits, &prioptr);
45         msgSizes[i] = len;
46         parcelSize += len;
47         CldSwitchHandler(msgs[i], CpvAccess(CldBalanceHandlerIndex));
48       }
49       else {
50         done = 1;
51         break;
52       }
53       if (parcelSize > MAXMSGBFRSIZE) {
54         if (i<numToSend-1)
55           done = 0;
56         numToSend -= numSent;
57         break;
58       }
59     }
60     if (numSent > 1) {
61       CmiMultipleSend(pe, numSent, msgSizes, (char**) msgs);
62       for (i=0; i<numSent; i++)
63         CmiFree(msgs[i]);
64       CpvAccess(CldRelocatedMessages) += numSent;
65       CpvAccess(CldMessageChunks)++;
66     }
67     else if (numSent == 1) {
68       CmiSyncSend(pe, msgSizes[0], msgs[0]);
69       CmiFree(msgs[0]);
70       CpvAccess(CldRelocatedMessages)++;
71       CpvAccess(CldMessageChunks)++;
72     }
73   }
74   free(msgs);
75   free(msgSizes);
76 }
77
78 void CldDistributeTokens()
79 {
80   int destPe = (CmiMyPe()+1)%CmiNumPes();
81   int numToSend;
82
83   numToSend = CldEstimate() / 2;
84   if (numToSend > CldCountTokens())
85     numToSend = CldCountTokens() / 2;
86
87   if (numToSend > 0)
88     CldMultipleSend(destPe, numToSend);
89
90   CcdCallFnAfter((CcdVoidFn)CldDistributeTokens, NULL, PERIOD);
91 }
92
93 void CldBalanceHandler(void *msg)
94 {
95   CmiGrabBuffer((void **)&msg);
96   CldRestoreHandler(msg);
97   CldPutToken(msg);
98 }
99
100 void CldHandler(void *msg)
101 {
102   CldInfoFn ifn; CldPackFn pfn;
103   int len, queueing, priobits; unsigned int *prioptr;
104   
105   CmiGrabBuffer((void **)&msg);
106   CldRestoreHandler(msg);
107   ifn = (CldInfoFn)CmiHandlerToFunction(CmiGetInfo(msg));
108   ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
109   CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
110 }
111
112 void CldEnqueue(int pe, void *msg, int infofn)
113 {
114   int len, queueing, priobits; unsigned int *prioptr;
115   CldInfoFn ifn = (CldInfoFn)CmiHandlerToFunction(infofn);
116   CldPackFn pfn;
117
118   if ((pe == CLD_ANYWHERE) && (CmiNumPes() > 1)) {
119     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
120     CmiSetInfo(msg,infofn);
121     CldPutToken(msg); 
122   } 
123   else if ((pe == CmiMyPe()) || (CmiNumPes() == 1)) {
124     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
125     CmiSetInfo(msg,infofn);
126     CsdEnqueueGeneral(msg, queueing, priobits, prioptr);
127   }
128   else {
129     ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
130     if (pfn) {
131       pfn(&msg);
132       ifn(msg, &pfn, &len, &queueing, &priobits, &prioptr);
133     }
134     CldSwitchHandler(msg, CpvAccess(CldHandlerIndex));
135     CmiSetInfo(msg,infofn);
136     if (pe==CLD_BROADCAST) 
137       CmiSyncBroadcastAndFree(len, msg);
138     else if (pe==CLD_BROADCAST_ALL)
139       CmiSyncBroadcastAllAndFree(len, msg);
140     else CmiSyncSendAndFree(pe, len, msg);
141   }
142 }
143
144 void CldHelpModuleInit()
145 {
146   CpvInitialize(int, CldBalanceHandlerIndex);
147
148   CpvAccess(CldBalanceHandlerIndex) = 
149     CmiRegisterHandler(CldBalanceHandler);
150
151   if (CmiNumPes() > 1)
152     CldDistributeTokens();
153 }
154
155 void CldModuleInit()
156 {
157   CpvInitialize(int, CldHandlerIndex);
158   CpvInitialize(int, CldRelocatedMessages);
159   CpvInitialize(int, CldLoadBalanceMessages);
160   CpvInitialize(int, CldMessageChunks);
161   CpvAccess(CldHandlerIndex) = CmiRegisterHandler(CldHandler);
162   CpvAccess(CldRelocatedMessages) = CpvAccess(CldLoadBalanceMessages) = 
163     CpvAccess(CldMessageChunks) = 0;
164   CldModuleGeneralInit();
165   CldHelpModuleInit();
166 }