Removed all STL template code from load balancer. Too bad STL doesn't work
[charm.git] / src / ck-ldb / NeighborLB.C
1 #include <unistd.h>
2 #include <charm++.h>
3 #include <LBDatabase.h>
4 #include "NeighborLB.h"
5 #include "NeighborLB.def.h"
6
7 CkGroupID loadbalancer;
8
9 #if CMK_LBDB_ON
10
11 void CreateNeighborLB()
12 {
13   loadbalancer = CProxy_NeighborLB::ckNew();
14 }
15
16 void NeighborLB::staticMigrated(void* data, LDObjHandle h)
17 {
18   NeighborLB *me = static_cast<NeighborLB*>(data);
19
20   me->Migrated(h);
21 }
22
23 void NeighborLB::staticAtSync(void* data)
24 {
25   NeighborLB *me = static_cast<NeighborLB*>(data);
26
27   me->AtSync();
28 }
29
30 NeighborLB::NeighborLB()
31 {
32   mystep = 0;
33   theLbdb = CProxy_LBDatabase(lbdb).ckLocalBranch();
34   theLbdb->
35     AddLocalBarrierReceiver(reinterpret_cast<LDBarrierFn>(staticAtSync),
36                             static_cast<void*>(this));
37   theLbdb->
38     NotifyMigrated(reinterpret_cast<LDMigratedFn>(staticMigrated),
39                    static_cast<void*>(this));
40
41
42   // I had to move neighbor initialization outside the constructor
43   // in order to get the virtual functions of any derived classes
44   // so I'll just set them to illegal values here.
45   neighbor_pes = 0;
46   stats_msg_count = 0;
47   statsMsgsList = 0;
48   statsDataList = 0;
49   migrates_completed = 0;
50   migrates_expected = -1;
51   mig_msgs_received = 0;
52   mig_msgs = 0;
53
54   myStats.proc_speed = theLbdb->ProcessorSpeed();
55   char hostname[80];
56   gethostname(hostname,79);
57   CkPrintf("[%d] host %s speed %d\n",CkMyPe(),hostname,myStats.proc_speed);
58   myStats.obj_data_sz = 0;
59   myStats.comm_data_sz = 0;
60   receive_stats_ready = 0;
61
62   theLbdb->CollectStatsOn();
63 }
64
65 NeighborLB::~NeighborLB()
66 {
67   CkPrintf("Going away\n");
68 }
69
70 void NeighborLB::FindNeighbors()
71 {
72   if (neighbor_pes == 0) { // Neighbors never initialized, so init them
73                            // and other things that depend on the number
74                            // of neighbors
75     statsMsgsList = new NLBStatsMsg*[num_neighbors()];
76     for(int i=0; i < num_neighbors(); i++)
77       statsMsgsList[i] = 0;
78     statsDataList = new LDStats[num_neighbors()];
79
80     neighbor_pes = new int[num_neighbors()];
81     neighbors(neighbor_pes);
82     mig_msgs_expected = num_neighbors();
83     mig_msgs = new NLBMigrateMsg*[num_neighbors()];
84   }
85
86 }
87
88 void NeighborLB::AtSync()
89 {
90   //  CkPrintf("[%d] NeighborLB At Sync step %d!!!!\n",CkMyPe(),mystep);
91
92   if (neighbor_pes == 0) FindNeighbors();
93
94   if (!QueryBalanceNow(step()) || num_neighbors() == 0) {
95     MigrationDone();
96     return;
97   }
98
99   NLBStatsMsg* msg = AssembleStats();
100
101   int i;
102   for(i=1; i < num_neighbors(); i++) {
103     NLBStatsMsg* m2 = (NLBStatsMsg*) CkCopyMsg((void**)&msg);
104     CProxy_NeighborLB(thisgroup).ReceiveStats(m2,neighbor_pes[i]);
105   }
106   if (0 < num_neighbors()) {
107     CProxy_NeighborLB(thisgroup).ReceiveStats(msg,neighbor_pes[0]);
108   } else delete msg;
109
110   // Tell our own node that we are ready
111   ReceiveStats((NLBStatsMsg*)0);
112 }
113
114 NLBStatsMsg* NeighborLB::AssembleStats()
115 {
116   // Get stats
117   theLbdb->TotalTime(&myStats.total_walltime,&myStats.total_cputime);
118   theLbdb->IdleTime(&myStats.idletime);
119   theLbdb->BackgroundLoad(&myStats.bg_walltime,&myStats.bg_cputime);
120   myStats.obj_data_sz = theLbdb->GetObjDataSz();
121   myStats.objData = new LDObjData[myStats.obj_data_sz];
122   theLbdb->GetObjData(myStats.objData);
123
124   myStats.comm_data_sz = theLbdb->GetCommDataSz();
125   myStats.commData = new LDCommData[myStats.comm_data_sz];
126   theLbdb->GetCommData(myStats.commData);
127
128   myStats.obj_walltime = myStats.obj_cputime = 0;
129   for(int i=0; i < myStats.obj_data_sz; i++) {
130     myStats.obj_walltime += myStats.objData[i].wallTime;
131     myStats.obj_cputime += myStats.objData[i].cpuTime;
132   }    
133
134   NLBStatsMsg* msg = new NLBStatsMsg;
135
136   msg->from_pe = CkMyPe();
137   msg->serial = rand();
138   msg->proc_speed = myStats.proc_speed;
139   msg->total_walltime = myStats.total_walltime;
140   msg->total_cputime = myStats.total_cputime;
141   msg->idletime = myStats.idletime;
142   msg->bg_walltime = myStats.bg_walltime;
143   msg->bg_cputime = myStats.bg_cputime;
144   msg->obj_walltime += myStats.obj_walltime;
145   msg->obj_cputime += myStats.obj_cputime;
146
147   //  CkPrintf(
148   //    "Proc %d speed=%d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f Obj=%f %f\n",
149   //    CkMyPe(),msg->proc_speed,msg->total_walltime,msg->total_cputime,
150   //    msg->idletime,msg->bg_walltime,msg->bg_cputime,
151   //    msg->obj_walltime,msg->obj_cputime);
152
153   //  CkPrintf("PE %d sending %d to ReceiveStats %d objs, %d comm\n",
154   //       CkMyPe(),msg->serial,msg->n_objs,msg->n_comm);
155   return msg;
156 }
157
158 void NeighborLB::Migrated(LDObjHandle h)
159 {
160   migrates_completed++;
161   //  CkPrintf("[%d] An object migrated! %d %d\n",
162   //       CkMyPe(),migrates_completed,migrates_expected);
163   if (migrates_completed == migrates_expected) {
164     MigrationDone();
165   }
166 }
167
168 void NeighborLB::ReceiveStats(NLBStatsMsg *m)
169 {
170   if (neighbor_pes == 0) FindNeighbors();
171
172   if (m == 0) { // This is from our own node
173     receive_stats_ready = 1;
174   } else {
175     const int pe = m->from_pe;
176     //  CkPrintf("Stats msg received, %d %d %d %d %p\n",
177     //             pe,stats_msg_count,m->n_objs,m->serial,m);
178     int peslot = -1;
179     for(int i=0; i < num_neighbors(); i++) {
180       if (pe == neighbor_pes[i]) {
181         peslot = i;
182         break;
183       }
184     }
185     if (peslot == -1 || statsMsgsList[peslot] != 0) {
186       CkPrintf("*** Unexpected NLBStatsMsg in ReceiveStats from PE %d ***\n",
187                pe);
188     } else {
189       statsMsgsList[peslot] = m;
190       statsDataList[peslot].from_pe = m->from_pe;
191       statsDataList[peslot].total_walltime = m->total_walltime;
192       statsDataList[peslot].total_cputime = m->total_cputime;
193       statsDataList[peslot].idletime = m->idletime;
194       statsDataList[peslot].bg_walltime = m->bg_walltime;
195       statsDataList[peslot].bg_cputime = m->bg_cputime;
196       statsDataList[peslot].proc_speed = m->proc_speed;
197       statsDataList[peslot].obj_walltime = m->obj_walltime;
198       statsDataList[peslot].obj_cputime = m->obj_cputime;
199       stats_msg_count++;
200     }
201   }
202
203   const int clients = num_neighbors();
204   if (stats_msg_count == clients && receive_stats_ready) {
205     receive_stats_ready = 0;
206     NLBMigrateMsg* migrateMsg = Strategy(statsDataList,clients);
207
208     int i;
209
210     // Migrate messages from me to elsewhere
211     for(i=0; i < migrateMsg->n_moves; i++) {
212       MigrateInfo& move = migrateMsg->moves[i];
213       const int me = CkMyPe();
214       if (move.from_pe == me && move.to_pe != me) {
215         theLbdb->Migrate(move.obj,move.to_pe);
216       } else if (move.from_pe != me) {
217         CkPrintf("[%d] error, strategy wants to move from %d to  %d\n",
218                  me,move.from_pe,move.to_pe);
219       }
220     }
221     
222     // Now, send migrate messages to neighbors
223     for(i=1; i < num_neighbors(); i++) {
224       NLBMigrateMsg* m2 = (NLBMigrateMsg*) CkCopyMsg((void**)&migrateMsg);
225       CProxy_NeighborLB(thisgroup).ReceiveMigration(m2,neighbor_pes[i]);
226     }
227     if (0 < num_neighbors())
228       CProxy_NeighborLB(thisgroup).ReceiveMigration(migrateMsg,
229                                                     neighbor_pes[0]);
230     else delete migrateMsg;
231     
232     // Zero out data structures for next cycle
233     for(i=0; i < clients; i++) {
234       delete statsMsgsList[i];
235       statsMsgsList[i]=0;
236     }
237     stats_msg_count=0;
238
239     theLbdb->ClearLoads();
240   }
241   
242 }
243
244 void NeighborLB::ReceiveMigration(NLBMigrateMsg *msg)
245 {
246   if (neighbor_pes == 0) FindNeighbors();
247
248   if (mig_msgs_received == 0) migrates_expected = 0;
249
250   mig_msgs[mig_msgs_received] = msg;
251   mig_msgs_received++;
252   //  CkPrintf("[%d] Received migration msg %d of %d\n",
253   //       CkMyPe(),mig_msgs_received,mig_msgs_expected);
254
255   if (mig_msgs_received > mig_msgs_expected) {
256     CkPrintf("[%d] NeighborLB Error! Too many migration messages received\n",
257              CkMyPe());
258   }
259
260   if (mig_msgs_received != mig_msgs_expected) {
261     return;
262   }
263
264   //  CkPrintf("[%d] in ReceiveMigration %d moves\n",CkMyPe(),msg->n_moves);
265   for(int neigh=0; neigh < mig_msgs_received;neigh++) {
266     NLBMigrateMsg* m = mig_msgs[neigh];
267     for(int i=0; i < m->n_moves; i++) {
268       MigrateInfo& move = m->moves[i];
269       const int me = CkMyPe();
270       if (move.from_pe != me && move.to_pe == me) {
271         migrates_expected++;
272       }
273     }
274     delete m;
275     mig_msgs[neigh]=0;
276   }
277   //  CkPrintf("[%d] in ReceiveMigration %d expected\n",
278   //       CkMyPe(),migrates_expected);
279   mig_msgs_received = 0;
280   if (migrates_expected == 0 || migrates_expected == migrates_completed)
281     MigrationDone();
282 }
283
284
285 void NeighborLB::MigrationDone()
286 {
287   migrates_completed = 0;
288   migrates_expected = -1;
289   // Increment to next step
290   mystep++;
291   CProxy_NeighborLB(thisgroup).ResumeClients(CkMyPe());
292 }
293
294 void NeighborLB::ResumeClients()
295 {
296   theLbdb->ResumeClients();
297 }
298
299 NLBMigrateMsg* NeighborLB::Strategy(LDStats* stats,int count)
300 {
301   for(int j=0; j < count; j++) {
302     CkPrintf(
303     "[%d] Proc %d Speed %d Total(wall,cpu)=%f %f Idle=%f Bg=%f %f obj=%f %f\n",
304     CkMyPe(),stats[j].from_pe,stats[j].proc_speed,
305     stats[j].total_walltime,stats[j].total_cputime,
306     stats[j].idletime,stats[j].bg_walltime,stats[j].bg_cputime,
307     stats[j].obj_walltime,stats[j].obj_cputime);
308   }
309
310   delete [] myStats.objData;
311   myStats.obj_data_sz = 0;
312   delete [] myStats.commData;
313   myStats.comm_data_sz = 0;
314
315   int sizes=0;
316   NLBMigrateMsg* msg = new(&sizes,1) NLBMigrateMsg;
317   msg->n_moves = 0;
318
319   return msg;
320 }
321
322 void* NLBMigrateMsg::alloc(int msgnum, size_t size, int* array, int priobits)
323 {
324   int totalsize = size + array[0] * sizeof(NeighborLB::MigrateInfo);
325
326   NLBMigrateMsg* ret =
327     static_cast<NLBMigrateMsg*>(CkAllocMsg(msgnum,totalsize,priobits));
328
329   ret->moves = reinterpret_cast<NeighborLB::MigrateInfo*>
330     (reinterpret_cast<char*>(ret)+ size);
331
332   return static_cast<void*>(ret);
333 }
334
335 void* NLBMigrateMsg::pack(NLBMigrateMsg* m)
336 {
337   m->moves = reinterpret_cast<NeighborLB::MigrateInfo*>
338     (reinterpret_cast<char*>(m->moves) - reinterpret_cast<char*>(&m->moves));
339
340   return static_cast<void*>(m);
341 }
342
343 NLBMigrateMsg* NLBMigrateMsg::unpack(void *m)
344 {
345   NLBMigrateMsg* ret_val = static_cast<NLBMigrateMsg*>(m);
346
347   ret_val->moves = reinterpret_cast<NeighborLB::MigrateInfo*>
348     (reinterpret_cast<char*>(&ret_val->moves) 
349      + reinterpret_cast<size_t>(ret_val->moves));
350
351   return ret_val;
352 }
353
354 #endif