all load balancers inherit from BaseLB now.
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <charm++.h>
9 #include "CentralLB.h"
10 #include "CentralLB.def.h"
11
12 #define  DEBUGF(x)    // CmiPrintf x;
13
14 CkGroupID loadbalancer;
15 char ** avail_vector_address;
16 int * lb_ptr;
17 int load_balancer_created;
18
19 #if CMK_LBDB_ON
20
21 void CreateCentralLB()
22 {
23   loadbalancer = CProxy_CentralLB::ckNew();
24 }
25
26 void set_avail_vector(char * bitmap){
27     int count;
28     int assigned = 0;
29     for(count = 0; count < CkNumPes(); count++){
30         (*avail_vector_address)[count] = bitmap[count];
31         if((bitmap[count] == 1) && !assigned){
32             *lb_ptr = count;
33             assigned = 1;
34         }
35     }   
36 }
37
38 void CentralLB::staticMigrated(void* data, LDObjHandle h)
39 {
40   CentralLB *me = (CentralLB*)(data);
41
42   me->Migrated(h);
43 }
44
45 void CentralLB::staticAtSync(void* data)
46 {
47   CentralLB *me = (CentralLB*)(data);
48
49   me->AtSync();
50 }
51
52 CentralLB::CentralLB()
53 {
54   mystep = 0;
55   //  CkPrintf("Construct in %d\n",CkMyPe());
56   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
57   theLbdb->
58     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
59   theLbdb->
60     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
61
62   stats_msg_count = 0;
63   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
64   for(int i=0; i < CkNumPes(); i++)
65     statsMsgsList[i] = 0;
66
67   statsDataList = new LDStats[CkNumPes()];
68   myspeed = theLbdb->ProcessorSpeed();
69   theLbdb->CollectStatsOn();
70   migrates_completed = 0;
71   migrates_expected = -1;
72
73   cur_ld_balancer = 0;
74   new_ld_balancer = 0;
75   int num_proc = CkNumPes();
76   avail_vector = new char[num_proc];
77   for(int proc = 0; proc < num_proc; proc++)
78       avail_vector[proc] = 1;
79   avail_vector_address = &(avail_vector);
80   lb_ptr = &new_ld_balancer;
81
82   load_balancer_created = 1;
83 }
84
85 CentralLB::~CentralLB()
86 {
87   CkPrintf("Going away\n");
88 }
89
90 void CentralLB::AtSync()
91 {
92   DEBUGF(("[%d] CentralLB At Sync step %d!!!!\n",CkMyPe(),mystep));
93
94   if (!QueryBalanceNow(step())) {
95     MigrationDone();
96     return;
97   }
98   thisProxy [CkMyPe()].ProcessAtSync();
99 }
100
101 void CentralLB::ProcessAtSync()
102 {
103   if (CkMyPe() == cur_ld_balancer) {
104     start_lb_time = CmiWallTimer();
105     CmiPrintf("Load balancing step %d starting at %f in %d\n",
106     step(),start_lb_time, cur_ld_balancer);
107   }
108   // Send stats
109   const int osz = theLbdb->GetObjDataSz();
110   const int csz = theLbdb->GetCommDataSz();
111   
112   int npes = CkNumPes();
113   CLBStatsMsg* msg = new(osz, csz, npes, 0) CLBStatsMsg;
114   msg->from_pe = CkMyPe();
115   // msg->serial = rand();
116   msg->serial = CrnRand();
117
118   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
119   theLbdb->IdleTime(&msg->idletime);
120   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
121   msg->pe_speed = myspeed;
122   //  CkPrintf(
123   //    "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
124   //    CkMyPe(),msg->total_walltime,msg->total_cputime,
125   //    msg->idletime,msg->bg_walltime,msg->bg_cputime);
126
127   msg->n_objs = osz;
128   theLbdb->GetObjData(msg->objData);
129   msg->n_comm = csz;
130   theLbdb->GetCommData(msg->commData);
131   theLbdb->ClearLoads();
132   DEBUGF(("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
133            CkMyPe(),msg->serial,msg->n_objs,msg->n_comm));
134
135   // Scheduler PART.
136
137   if(CkMyPe() == 0){
138       int num_proc = CkNumPes();
139       for(int proc = 0; proc < num_proc; proc++){
140           msg->avail_vector[proc] = avail_vector[proc];
141       } 
142       msg->next_lb = new_ld_balancer;
143   }
144
145   thisProxy [cur_ld_balancer].ReceiveStats(msg);
146 }
147
148 void CentralLB::Migrated(LDObjHandle h)
149 {
150   migrates_completed++;
151   //  CkPrintf("[%d] An object migrated! %d %d\n",
152   //       CkMyPe(),migrates_completed,migrates_expected);
153   if (migrates_completed == migrates_expected) {
154     MigrationDone();
155   }
156 }
157
158 void CentralLB::ReceiveStats(CLBStatsMsg *m)
159 {
160   int proc;
161   const int pe = m->from_pe;
162 //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
163 //         pe,stats_msg_count,m->n_objs,m->serial,m);
164
165   if((pe == 0) && (CkMyPe() != 0)){
166       new_ld_balancer = m->next_lb;
167       int num_proc = CkNumPes();
168       for(int proc = 0; proc < num_proc; proc++)
169           avail_vector[proc] = m->avail_vector[proc]; 
170   }
171
172   DEBUGF(("ReceiveStats from %d step: %d\n", pe, mystep));
173   if (statsMsgsList[pe] != 0) {
174     CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
175              pe);
176   } else {
177     statsMsgsList[pe] = m;
178     statsDataList[pe].total_walltime = m->total_walltime;
179     statsDataList[pe].total_cputime = m->total_cputime;
180     statsDataList[pe].idletime = m->idletime;
181     statsDataList[pe].bg_walltime = m->bg_walltime;
182     statsDataList[pe].bg_cputime = m->bg_cputime;
183     statsDataList[pe].pe_speed = m->pe_speed;
184     statsDataList[pe].utilization = 1.0;
185     statsDataList[pe].available = CmiTrue;
186
187     statsDataList[pe].n_objs = m->n_objs;
188     statsDataList[pe].objData = m->objData;
189     statsDataList[pe].n_comm = m->n_comm;
190     statsDataList[pe].commData = m->commData;
191     stats_msg_count++;
192   }
193
194   const int clients = CkNumPes();
195   if (stats_msg_count == clients) {
196     double strat_start_time = CmiWallTimer();
197
198 //    CkPrintf("Before setting bitmap\n");
199     for(proc = 0; proc < clients; proc++)
200       statsDataList[proc].available = (CmiBool)avail_vector[proc];
201     
202 //    CkPrintf("Before Calling Strategy\n");
203
204     CLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
205
206 //    CkPrintf("returned successfully\n");
207     int num_proc = CkNumPes();
208
209     for(proc = 0; proc < num_proc; proc++)
210         migrateMsg->avail_vector[proc] = avail_vector[proc];
211     migrateMsg->next_lb = new_ld_balancer;
212
213 //    CkPrintf("calling recv migration\n");
214     thisProxy.ReceiveMigration(migrateMsg);
215
216     // Zero out data structures for next cycle
217     for(int i=0; i < clients; i++) {
218       delete statsMsgsList[i];
219       statsMsgsList[i]=0;
220     }
221     stats_msg_count=0;
222     double strat_end_time = CmiWallTimer();
223     //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
224   }
225   
226 }
227
228 void CentralLB::ReceiveMigration(CLBMigrateMsg *m)
229 {
230   DEBUGF(("[%d] in ReceiveMigration %d moves\n",CkMyPe(),m->n_moves));
231   migrates_expected = 0;
232   for(int i=0; i < m->n_moves; i++) {
233     MigrateInfo& move = m->moves[i];
234     const int me = CkMyPe();
235     if (move.from_pe == me && move.to_pe != me) {
236       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
237       theLbdb->Migrate(move.obj,move.to_pe);
238     } else if (move.from_pe != me && move.to_pe == me) {
239         //  CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
240       migrates_expected++;
241     }
242   }
243   
244   cur_ld_balancer = m->next_lb;
245   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
246       int num_proc = CkNumPes();
247       for(int proc = 0; proc < num_proc; proc++)
248           avail_vector[proc] = m->avail_vector[proc];
249   }
250
251   if (migrates_expected == 0 || migrates_completed == migrates_expected)
252     MigrationDone();
253   delete m;
254 }
255
256
257 void CentralLB::MigrationDone()
258 {
259   if (CkMyPe() == cur_ld_balancer) {
260     double end_lb_time = CmiWallTimer();
261     // CkPrintf("Load balancing step %d finished at %f duration %f\n",
262              // step(),end_lb_time,end_lb_time - start_lb_time);
263   }
264   migrates_completed = 0;
265   migrates_expected = -1;
266   // Increment to next step
267   mystep++;
268   thisProxy [CkMyPe()].ResumeClients();
269 }
270
271 void CentralLB::ResumeClients()
272 {
273   DEBUGF(("Resuming clients on PE %d\n",CkMyPe()));
274   theLbdb->ResumeClients();
275 }
276
277 CLBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
278 {
279   for(int j=0; j < count; j++) {
280     int i;
281     LDObjData *odata = stats[j].objData;
282     const int osz = stats[j].n_objs;
283
284     CkPrintf(
285       "Proc %d Sp %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
286       j,stats[j].pe_speed,stats[j].total_walltime,stats[j].total_cputime,
287       stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime);
288     CkPrintf("------------- Object Data: PE %d: %d objects -------------\n",
289              j,osz);
290     for(i=0; i < osz; i++) {
291       CkPrintf("Object %d\n",i);
292       CkPrintf("     id = %d\n",odata[i].id().id[0]);
293       CkPrintf("  OM id = %d\n",odata[i].omID().id);
294       CkPrintf("   Mig. = %d\n",odata[i].migratable);
295       CkPrintf("    CPU = %f\n",odata[i].cpuTime);
296       CkPrintf("   Wall = %f\n",odata[i].wallTime);
297     }
298
299     LDCommData *cdata = stats[j].commData;
300     const int csz = stats[j].n_comm;
301
302     CkPrintf("------------- Comm Data: PE %d: %d records -------------\n",
303              j,csz);
304     for(i=0; i < csz; i++) {
305       CkPrintf("Link %d\n",i);
306       
307       if (cdata[i].from_proc())
308         CkPrintf("    sender PE = %d\n",cdata[i].src_proc);
309       else
310         CkPrintf("    sender id = %d:%d\n",
311                  cdata[i].senderOM.id,cdata[i].sender.id[0]);
312
313       if (cdata[i].to_proc())
314         CkPrintf("  receiver PE = %d\n",cdata[i].dest_proc);
315       else      
316         CkPrintf("  receiver id = %d:%d\n",
317                  cdata[i].receiverOM.id,cdata[i].receiver.id[0]);
318       
319       CkPrintf("     messages = %d\n",cdata[i].messages);
320       CkPrintf("        bytes = %d\n",cdata[i].bytes);
321     }
322   }
323
324   int sizes=0;
325   CLBMigrateMsg* msg = new(&sizes,1) CLBMigrateMsg;
326   msg->n_moves = 0;
327
328   return msg;
329 }
330
331
332 void* CLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
333 {
334   int totalsize = size + array[0] * sizeof(CentralLB::MigrateInfo) 
335     + CkNumPes() * sizeof(char);
336
337   CLBMigrateMsg* ret =
338     (CLBMigrateMsg*)(CkAllocMsg(msgnum,totalsize,priobits));
339
340   ret->moves = (CentralLB::MigrateInfo*) ((char*)(ret)+ size);
341
342   ret->avail_vector = (char *)(ret->moves + array[0]);
343   return (void*)(ret);
344 }
345
346 void* CLBMigrateMsg::pack(CLBMigrateMsg* m)
347 {
348   m->moves = (CentralLB::MigrateInfo*)
349     ((char*)(m->moves) - (char*)(&m->moves));
350
351   m->avail_vector =(char*)(m->avail_vector
352       - (char*)(&m->avail_vector));
353
354   return (void*)(m);
355 }
356
357 CLBMigrateMsg* CLBMigrateMsg::unpack(void *m)
358 {
359   CLBMigrateMsg* ret_val = (CLBMigrateMsg*)(m);
360
361   ret_val->moves = (CentralLB::MigrateInfo*)
362     ((char*)(&ret_val->moves) 
363      + (size_t)(ret_val->moves));
364
365   ret_val->avail_vector =
366     (char*)((char*)(&ret_val->avail_vector)
367                             +(size_t)(ret_val->avail_vector));
368
369   return ret_val;
370 }
371
372 #endif