3d44eda53ff7d50b9bc4ab2600bf3ae0d0c8dec2
[charm.git] / src / ck-ldb / CentralLB.C
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <charm++.h>
9 #include <LBDatabase.h>
10 #include "CentralLB.h"
11 #include "CentralLB.def.h"
12
13 #define  DEBUGF(x)    // CmiPrintf x;
14
15 CkGroupID loadbalancer;
16 char ** avail_vector_address;
17 int * lb_ptr;
18 int load_balancer_created;
19
20 #if CMK_LBDB_ON
21
22 void CreateCentralLB()
23 {
24   loadbalancer = CProxy_CentralLB::ckNew();
25 }
26
27 void set_avail_vector(char * bitmap){
28     int count;
29     int assigned = 0;
30     for(count = 0; count < CkNumPes(); count++){
31         (*avail_vector_address)[count] = bitmap[count];
32         if((bitmap[count] == 1) && !assigned){
33             *lb_ptr = count;
34             assigned = 1;
35         }
36     }   
37 }
38
39 void CentralLB::staticMigrated(void* data, LDObjHandle h)
40 {
41   CentralLB *me = (CentralLB*)(data);
42
43   me->Migrated(h);
44 }
45
46 void CentralLB::staticAtSync(void* data)
47 {
48   CentralLB *me = (CentralLB*)(data);
49
50   me->AtSync();
51 }
52
53 CentralLB::CentralLB()
54   :thisproxy(thisgroup)
55 {
56   mystep = 0;
57   //  CkPrintf("Construct in %d\n",CkMyPe());
58   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
59   theLbdb->
60     AddLocalBarrierReceiver((LDBarrierFn)(staticAtSync),(void*)(this));
61   theLbdb->
62     NotifyMigrated((LDMigratedFn)(staticMigrated),(void*)(this));
63
64   stats_msg_count = 0;
65   statsMsgsList = new CLBStatsMsg*[CkNumPes()];
66   for(int i=0; i < CkNumPes(); i++)
67     statsMsgsList[i] = 0;
68
69   statsDataList = new LDStats[CkNumPes()];
70   myspeed = theLbdb->ProcessorSpeed();
71   theLbdb->CollectStatsOn();
72   migrates_completed = 0;
73   migrates_expected = -1;
74
75   cur_ld_balancer = 0;
76   new_ld_balancer = 0;
77   int num_proc = CkNumPes();
78   avail_vector = new char[num_proc];
79   for(int proc = 0; proc < num_proc; proc++)
80       avail_vector[proc] = 1;
81   avail_vector_address = &(avail_vector);
82   lb_ptr = &new_ld_balancer;
83
84   load_balancer_created = 1;
85 }
86
87 CentralLB::~CentralLB()
88 {
89   CkPrintf("Going away\n");
90 }
91
92 void CentralLB::AtSync()
93 {
94   DEBUGF(("[%d] CentralLB At Sync step %d!!!!\n",CkMyPe(),mystep));
95
96   if (!QueryBalanceNow(step())) {
97     MigrationDone();
98     return;
99   }
100   thisproxy [CkMyPe()].ProcessAtSync();
101 }
102
103 void CentralLB::ProcessAtSync()
104 {
105   if (CkMyPe() == cur_ld_balancer) {
106     start_lb_time = CmiWallTimer();
107     CmiPrintf("Load balancing step %d starting at %f in %d\n",
108     step(),start_lb_time, cur_ld_balancer);
109   }
110   // Send stats
111   const int osz = theLbdb->GetObjDataSz();
112   const int csz = theLbdb->GetCommDataSz();
113   
114   int npes = CkNumPes();
115   CLBStatsMsg* msg = new(osz, csz, npes, 0) CLBStatsMsg;
116   msg->from_pe = CkMyPe();
117   // msg->serial = rand();
118   msg->serial = CrnRand();
119
120   theLbdb->TotalTime(&msg->total_walltime,&msg->total_cputime);
121   theLbdb->IdleTime(&msg->idletime);
122   theLbdb->BackgroundLoad(&msg->bg_walltime,&msg->bg_cputime);
123   msg->pe_speed = myspeed;
124   //  CkPrintf(
125   //    "Processors %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
126   //    CkMyPe(),msg->total_walltime,msg->total_cputime,
127   //    msg->idletime,msg->bg_walltime,msg->bg_cputime);
128
129   msg->n_objs = osz;
130   theLbdb->GetObjData(msg->objData);
131   msg->n_comm = csz;
132   theLbdb->GetCommData(msg->commData);
133   theLbdb->ClearLoads();
134   DEBUGF(("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
135            CkMyPe(),msg->serial,msg->n_objs,msg->n_comm));
136
137   // Scheduler PART.
138
139   if(CkMyPe() == 0){
140       int num_proc = CkNumPes();
141       for(int proc = 0; proc < num_proc; proc++){
142           msg->avail_vector[proc] = avail_vector[proc];
143       } 
144       msg->next_lb = new_ld_balancer;
145   }
146
147   thisproxy [cur_ld_balancer].ReceiveStats(msg);
148 }
149
150 void CentralLB::Migrated(LDObjHandle h)
151 {
152   migrates_completed++;
153   //  CkPrintf("[%d] An object migrated! %d %d\n",
154   //       CkMyPe(),migrates_completed,migrates_expected);
155   if (migrates_completed == migrates_expected) {
156     MigrationDone();
157   }
158 }
159
160 void CentralLB::ReceiveStats(CLBStatsMsg *m)
161 {
162   int proc;
163   const int pe = m->from_pe;
164 //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
165 //         pe,stats_msg_count,m->n_objs,m->serial,m);
166
167   if((pe == 0) && (CkMyPe() != 0)){
168       new_ld_balancer = m->next_lb;
169       int num_proc = CkNumPes();
170       for(int proc = 0; proc < num_proc; proc++)
171           avail_vector[proc] = m->avail_vector[proc]; 
172   }
173
174   DEBUGF(("ReceiveStats from %d step: %d\n", pe, mystep));
175   if (statsMsgsList[pe] != 0) {
176     CkPrintf("*** Unexpected CLBStatsMsg in ReceiveStats from PE %d ***\n",
177              pe);
178   } else {
179     statsMsgsList[pe] = m;
180     statsDataList[pe].total_walltime = m->total_walltime;
181     statsDataList[pe].total_cputime = m->total_cputime;
182     statsDataList[pe].idletime = m->idletime;
183     statsDataList[pe].bg_walltime = m->bg_walltime;
184     statsDataList[pe].bg_cputime = m->bg_cputime;
185     statsDataList[pe].pe_speed = m->pe_speed;
186     statsDataList[pe].utilization = 1.0;
187     statsDataList[pe].available = CmiTrue;
188
189     statsDataList[pe].n_objs = m->n_objs;
190     statsDataList[pe].objData = m->objData;
191     statsDataList[pe].n_comm = m->n_comm;
192     statsDataList[pe].commData = m->commData;
193     stats_msg_count++;
194   }
195
196   const int clients = CkNumPes();
197   if (stats_msg_count == clients) {
198     double strat_start_time = CmiWallTimer();
199
200 //    CkPrintf("Before setting bitmap\n");
201     for(proc = 0; proc < clients; proc++)
202       statsDataList[proc].available = (CmiBool)avail_vector[proc];
203     
204 //    CkPrintf("Before Calling Strategy\n");
205
206     CLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
207
208 //    CkPrintf("returned successfully\n");
209     int num_proc = CkNumPes();
210
211     for(proc = 0; proc < num_proc; proc++)
212         migrateMsg->avail_vector[proc] = avail_vector[proc];
213     migrateMsg->next_lb = new_ld_balancer;
214
215 //    CkPrintf("calling recv migration\n");
216     thisproxy.ReceiveMigration(migrateMsg);
217
218     // Zero out data structures for next cycle
219     for(int i=0; i < clients; i++) {
220       delete statsMsgsList[i];
221       statsMsgsList[i]=0;
222     }
223     stats_msg_count=0;
224     double strat_end_time = CmiWallTimer();
225     //    CkPrintf("Strat elapsed time %f\n",strat_end_time-strat_start_time);
226   }
227   
228 }
229
230 void CentralLB::ReceiveMigration(CLBMigrateMsg *m)
231 {
232   DEBUGF(("[%d] in ReceiveMigration %d moves\n",CkMyPe(),m->n_moves));
233   migrates_expected = 0;
234   for(int i=0; i < m->n_moves; i++) {
235     MigrateInfo& move = m->moves[i];
236     const int me = CkMyPe();
237     if (move.from_pe == me && move.to_pe != me) {
238       DEBUGF(("[%d] migrating object to %d\n",move.from_pe,move.to_pe));
239       theLbdb->Migrate(move.obj,move.to_pe);
240     } else if (move.from_pe != me && move.to_pe == me) {
241         //  CkPrintf("[%d] expecting object from %d\n",move.to_pe,move.from_pe);
242       migrates_expected++;
243     }
244   }
245   
246   cur_ld_balancer = m->next_lb;
247   if((CkMyPe() == cur_ld_balancer) && (cur_ld_balancer != 0)){
248       int num_proc = CkNumPes();
249       for(int proc = 0; proc < num_proc; proc++)
250           avail_vector[proc] = m->avail_vector[proc];
251   }
252
253   if (migrates_expected == 0 || migrates_completed == migrates_expected)
254     MigrationDone();
255   delete m;
256 }
257
258
259 void CentralLB::MigrationDone()
260 {
261   if (CkMyPe() == cur_ld_balancer) {
262     double end_lb_time = CmiWallTimer();
263     // CkPrintf("Load balancing step %d finished at %f duration %f\n",
264              // step(),end_lb_time,end_lb_time - start_lb_time);
265   }
266   migrates_completed = 0;
267   migrates_expected = -1;
268   // Increment to next step
269   mystep++;
270   thisproxy [CkMyPe()].ResumeClients();
271 }
272
273 void CentralLB::ResumeClients()
274 {
275   DEBUGF(("Resuming clients on PE %d\n",CkMyPe()));
276   theLbdb->ResumeClients();
277 }
278
279 CLBMigrateMsg* CentralLB::Strategy(LDStats* stats,int count)
280 {
281   for(int j=0; j < count; j++) {
282     int i;
283     LDObjData *odata = stats[j].objData;
284     const int osz = stats[j].n_objs;
285
286     CkPrintf(
287       "Proc %d Sp %d Total time (wall,cpu) = %f %f Idle = %f Bg = %f %f\n",
288       j,stats[j].pe_speed,stats[j].total_walltime,stats[j].total_cputime,
289       stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime);
290     CkPrintf("------------- Object Data: PE %d: %d objects -------------\n",
291              j,osz);
292     for(i=0; i < osz; i++) {
293       CkPrintf("Object %d\n",i);
294       CkPrintf("     id = %d\n",odata[i].id().id[0]);
295       CkPrintf("  OM id = %d\n",odata[i].omID().id);
296       CkPrintf("   Mig. = %d\n",odata[i].migratable);
297       CkPrintf("    CPU = %f\n",odata[i].cpuTime);
298       CkPrintf("   Wall = %f\n",odata[i].wallTime);
299     }
300
301     LDCommData *cdata = stats[j].commData;
302     const int csz = stats[j].n_comm;
303
304     CkPrintf("------------- Comm Data: PE %d: %d records -------------\n",
305              j,csz);
306     for(i=0; i < csz; i++) {
307       CkPrintf("Link %d\n",i);
308       
309       if (cdata[i].from_proc())
310         CkPrintf("    sender PE = %d\n",cdata[i].src_proc);
311       else
312         CkPrintf("    sender id = %d:%d\n",
313                  cdata[i].senderOM.id,cdata[i].sender.id[0]);
314
315       if (cdata[i].to_proc())
316         CkPrintf("  receiver PE = %d\n",cdata[i].dest_proc);
317       else      
318         CkPrintf("  receiver id = %d:%d\n",
319                  cdata[i].receiverOM.id,cdata[i].receiver.id[0]);
320       
321       CkPrintf("     messages = %d\n",cdata[i].messages);
322       CkPrintf("        bytes = %d\n",cdata[i].bytes);
323     }
324   }
325
326   int sizes=0;
327   CLBMigrateMsg* msg = new(&sizes,1) CLBMigrateMsg;
328   msg->n_moves = 0;
329
330   return msg;
331 }
332
333
334 void* CLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
335 {
336   int totalsize = size + array[0] * sizeof(CentralLB::MigrateInfo) 
337     + CkNumPes() * sizeof(char);
338
339   CLBMigrateMsg* ret =
340     (CLBMigrateMsg*)(CkAllocMsg(msgnum,totalsize,priobits));
341
342   ret->moves = (CentralLB::MigrateInfo*) ((char*)(ret)+ size);
343
344   ret->avail_vector = (char *)(ret->moves + array[0]);
345   return (void*)(ret);
346 }
347
348 void* CLBMigrateMsg::pack(CLBMigrateMsg* m)
349 {
350   m->moves = (CentralLB::MigrateInfo*)
351     ((char*)(m->moves) - (char*)(&m->moves));
352
353   m->avail_vector =(char*)(m->avail_vector
354       - (char*)(&m->avail_vector));
355
356   return (void*)(m);
357 }
358
359 CLBMigrateMsg* CLBMigrateMsg::unpack(void *m)
360 {
361   CLBMigrateMsg* ret_val = (CLBMigrateMsg*)(m);
362
363   ret_val->moves = (CentralLB::MigrateInfo*)
364     ((char*)(&ret_val->moves) 
365      + (size_t)(ret_val->moves));
366
367   ret_val->avail_vector =
368     (char*)((char*)(&ret_val->avail_vector)
369                             +(size_t)(ret_val->avail_vector));
370
371   return ret_val;
372 }
373
374 #endif