Removed all STL template code from load balancer. Too bad STL doesn't work
[charm.git] / src / ck-ldb / CentralLB.C
1 #include <charm++.h>
2 #include <LBDatabase.h>
3 #include "CentralLB.h"
4 #include "CentralLB.def.h"
5
6 CkGroupID loadbalancer;
7
8 #if CMK_LBDB_ON
9
10 void CreateCentralLB()
11 {
12   loadbalancer = CProxy_CentralLB::ckNew();
13 }
14
15 void CentralLB::staticMigrated(void* data, LDObjHandle h)
16 {
17   CentralLB *me = static_cast<CentralLB*>(data);
18
19   me->Migrated(h);
20 }
21
22 void CentralLB::staticAtSync(void* data)
23 {
24   CentralLB *me = static_cast<CentralLB*>(data);
25
26   me->AtSync();
27 }
28
29 CentralLB::CentralLB()
30 {
31   mystep = 0;
32   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
33   theLbdb->
34     AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
35                             static_cast<void*>(this));
36   theLbdb->
37     NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
38                    static_cast<void*>(this));
39
40   stats_msg_count = 0;
41   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
42   for(int i=0; i < CkNumPes(); i++)
43     statsMsgsList[i] = 0;
44
45   statsDataList = new LDStats[CkNumPes()];
46   theLbdb->CollectStatsOn();
47   migrates_completed = 0;
48   migrates_expected = -1;
49 }
50
51 CentralLB::~CentralLB()
52 {
53   CkPrintf("Going away\n");
54 }
55
56 void CentralLB::AtSync()
57 {
58   //  CkPrintf("[%d] CentralLB At Sync step %d!!!!\n",CkMyPe(),mystep);
59
60   if (!QueryBalanceNow(step())) {
61     MigrationDone();
62     return;
63   }
64
65   // Send stats
66   int sizes[2];
67   const int osz = sizes[0] = theLbdb->GetObjDataSz();
68   const int csz = sizes[1] = theLbdb->GetCommDataSz();
69   
70   CLBStatsMsg* msg = new(sizes,2) CLBStatsMsg;
71   msg->from_pe = CkMyPe();
72   msg->serial = rand();
73
74   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
75   theLbdb->IdleTime(&msg->idletime);
76   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
77   //  CkPrintf(
78   //    "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
79   //    CkMyPe(),msg->total_walltime,msg->total_cputime,
80   //    msg->idletime,msg->bg_walltime,msg->bg_cputime);
81
82   msg->n_objs = osz;
83   theLbdb->GetObjData(msg->objData);
84   msg->n_comm = csz;
85   theLbdb->GetCommData(msg->commData);
86   theLbdb->ClearLoads();
87   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
88   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
89   CProxy_CentralLB(thisgroup).ReceiveStats(msg,0);
90 }
91
92 void CentralLB::Migrated(LDObjHandle h)
93 {
94   migrates_completed++;
95   //  CkPrintf("[%d] An object migrated! %d %d\n",
96   //       CkMyPe(),migrates_completed,migrates_expected);
97   if (migrates_completed == migrates_expected) {
98     MigrationDone();
99   }
100 }
101
102 void CentralLB::ReceiveStats(CLBStatsMsg *m)
103 {
104   const int pe = m->from_pe;
105   //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
106   //       pe,stats_msg_count,m->n_objs,m->serial,m);
107   if (statsMsgsList[pe] != 0) {
108     CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
109              pe);
110   } else {
111     statsMsgsList[pe] = m;
112     statsDataList[pe].total_walltime = m->total_walltime;
113     statsDataList[pe].total_cputime = m->total_cputime;
114     statsDataList[pe].idletime = m->idletime;
115     statsDataList[pe].bg_walltime = m->bg_walltime;
116     statsDataList[pe].bg_cputime = m->bg_cputime;
117     statsDataList[pe].n_objs = m->n_objs;
118     statsDataList[pe].n_objs = m->n_objs;
119     statsDataList[pe].objData = m->objData;
120     statsDataList[pe].n_comm = m->n_comm;
121     statsDataList[pe].commData = m->commData;
122     stats_msg_count++;
123   }
124
125   const int clients = CkNumPes();
126   if (stats_msg_count == clients) {
127
128     CLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
129     CProxy_CentralLB(thisgroup).ReceiveMigration(migrateMsg);
130
131     // Zero out data structures for next cycle
132     for(int i=0; i < clients; i++) {
133       delete statsMsgsList[i];
134       statsMsgsList[i]=0;
135     }
136     stats_msg_count=0;
137   }
138   
139 }
140
141 void CentralLB::ReceiveMigration(CLBMigrateMsg *m)
142 {
143   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),m->n_moves);
144   migrates_expected = 0;
145   for(int i=0; i < m->n_moves; i++) {
146     MigrateInfo& move = m->moves[i];
147     const int me = CkMyPe();
148     if (move.from_pe == me && move.to_pe != me) {
149       CkPrintf("[%d] migrating object to %d\n",move.from_pe,move.to_pe);
150       theLbdb->Migrate(move.obj,move.to_pe);
151     } else if (move.from_pe != me && move.to_pe == me) {
152       //      CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
153       migrates_expected++;
154     }
155   }
156   if (migrates_expected == 0 || migrates_completed == migrates_expected)
157     MigrationDone();
158   delete m;
159 }
160
161
162 void CentralLB::MigrationDone()
163 {
164   migrates_completed = 0;
165   migrates_expected = -1;
166   // Increment to next step
167   mystep++;
168   CProxy_CentralLB(thisgroup).ResumeClients(CkMyPe());
169 }
170
171 void CentralLB::ResumeClients()
172 {
173   //  CkPrintf("Resuming clients on PE %d\n",CkMyPe());
174   theLbdb->ResumeClients();
175 }
176
177 CLBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
178 {
179   for(int j=0; j < count; j++) {
180     int i;
181     LDObjData *odata = stats[j].objData;
182     const int osz = stats[j].n_objs;
183
184     CkPrintf(
185       "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
186       j,stats[j].total_walltime,stats[j].total_cputime,
187       stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime);
188     CkPrintf("------------- Object Data: PE %d: %d objects -------------\n",
189              j,osz);
190     for(i=0; i < osz; i++) {
191       CkPrintf("Object %d\n",i);
192       CkPrintf("     id = %d\n",odata[i].id.id[0]);
193       CkPrintf("  OM id = %d\n",odata[i].omID.id);
194       CkPrintf("    CPU = %f\n",odata[i].cpuTime);
195       CkPrintf("   Wall = %f\n",odata[i].wallTime);
196     }
197
198     LDCommData *cdata = stats[j].commData;
199     const int csz = stats[j].n_comm;
200
201     CkPrintf("------------- Comm Data: PE %d: %d records -------------\n",
202              j,csz);
203     for(i=0; i < csz; i++) {
204       CkPrintf("Link %d\n",i);
205       
206       if (cdata[i].from_proc)
207         CkPrintf("    sender PE = %d\n",cdata[i].src_proc);
208       else
209         CkPrintf("    sender id = %d:%d\n",
210                  cdata[i].senderOM.id,cdata[i].sender.id[0]);
211
212       if (cdata[i].to_proc)
213         CkPrintf("  receiver PE = %d\n",cdata[i].dest_proc);
214       else      
215         CkPrintf("  receiver id = %d:%d\n",
216                  cdata[i].receiverOM.id,cdata[i].receiver.id[0]);
217       
218       CkPrintf("     messages = %d\n",cdata[i].messages);
219       CkPrintf("        bytes = %d\n",cdata[i].bytes);
220     }
221   }
222
223   int sizes=0;
224   CLBMigrateMsg* msg = new(&sizes,1) CLBMigrateMsg;
225   msg->n_moves = 0;
226
227   return msg;
228 }
229
230 void* CLBStatsMsg::alloc(int msgnum, size_t size, int* array, int priobits)
231 {
232   int totalsize = size + array[0] * sizeof(LDObjData) 
233     + array[1] * sizeof(LDCommData);
234
235   CLBStatsMsg* ret =
236     static_cast<CLBStatsMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
237
238   ret->objData = reinterpret_cast<LDObjData*>((reinterpret_cast<char*>(ret) 
239                                                + size));
240   ret->commData = reinterpret_cast<LDCommData*>(ret->objData + array[0]);
241
242   return static_cast<void*>(ret);
243 }
244
245 void* CLBStatsMsg::pack(CLBStatsMsg* m)
246 {
247   m->objData = 
248     reinterpret_cast<LDObjData*>(reinterpret_cast<char*>(m->objData)
249       - reinterpret_cast<char*>(&m->objData));
250   m->commData = 
251     reinterpret_cast<LDCommData*>(reinterpret_cast<char*>(m->commData)
252       - reinterpret_cast<char*>(&m->commData));
253   return static_cast<void*>(m);
254 }
255
256 CLBStatsMsg* CLBStatsMsg::unpack(void *m)
257 {
258   CLBStatsMsg* ret_val = static_cast<CLBStatsMsg*>(m);
259
260   ret_val->objData = 
261     reinterpret_cast<LDObjData*>(reinterpret_cast<char*>(&ret_val->objData)
262       + reinterpret_cast<size_t>(ret_val->objData));
263   ret_val->commData = 
264     reinterpret_cast<LDCommData*>(reinterpret_cast<char*>(&ret_val->commData)
265       + reinterpret_cast<size_t>(ret_val->commData));
266   return ret_val;
267 }
268
269 void* CLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
270 {
271   int totalsize = size + array[0] * sizeof(CentralLB::MigrateInfo);
272
273   CLBMigrateMsg* ret =
274     static_cast<CLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
275
276   ret->moves = reinterpret_cast<CentralLB::MigrateInfo*>
277     (reinterpret_cast<char*>(ret)+ size);
278
279   return static_cast<void*>(ret);
280 }
281
282 void* CLBMigrateMsg::pack(CLBMigrateMsg* m)
283 {
284   m->moves = reinterpret_cast<CentralLB::MigrateInfo*>
285     (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
286
287   return static_cast<void*>(m);
288 }
289
290 CLBMigrateMsg* CLBMigrateMsg::unpack(void *m)
291 {
292   CLBMigrateMsg* ret_val = static_cast<CLBMigrateMsg*>(m);
293
294   ret_val->moves = reinterpret_cast<CentralLB::MigrateInfo*>
295     (reinterpret_cast<char*>(&ret_val->moves) 
296      + reinterpret_cast<size_t>(ret_val->moves));
297
298   return ret_val;
299 }
300
301 #endif