I Rearranged the load balancer, so WSLB->NeighborLB, NeighborLB->NborBaseLB,
[charm.git] / src / ck-ldb / NeighborLB.C
1 #include <charm++.h>
2
3 #if CMK_LBDB_ON
4
5 #include "CkLists.h"
6
7 #include "heap.h"
8 #include "NeighborLB.h"
9 #include "NeighborLB.def.h"
10
11 void CreateNeighborLB()
12 {
13   nborBaselb = CProxy_NeighborLB::ckNew();
14 }
15
16 NeighborLB::NeighborLB()
17 {
18   if (CkMyPe() == 0)
19     CkPrintf("[%d] NeighborLB created\n",CkMyPe());
20 }
21
22 NLBMigrateMsg* NeighborLB::Strategy(NborBaseLB::LDStats* stats, int count)
23 {
24   //  CkPrintf("[%d] Strategy starting\n",CkMyPe());
25   // Compute the average load to see if we are overloaded relative
26   // to our neighbors
27   double myload = myStats.total_walltime - myStats.idletime;
28   double avgload = myload;
29   int i;
30   for(i=0; i < count; i++) {
31     // Scale times we need appropriately for relative proc speeds
32     const double scale =  ((double)myStats.proc_speed) 
33       / stats[i].proc_speed;
34
35     stats[i].total_walltime *= scale;
36     stats[i].idletime *= scale;
37
38     avgload += (stats[i].total_walltime - stats[i].idletime);
39   }
40   avgload /= (count+1);
41
42   CkVector migrateInfo;
43
44   if (myload > avgload) {
45     //    CkPrintf("[%d] OVERLOAD My load is %f, average load is %f\n",
46     //               CkMyPe(),myload,avgload);
47
48     // First, build heaps of other processors and my objects
49     // Then assign objects to other processors until either
50     //   - The smallest remaining object would put me below average, or
51     //   - I only have 1 object left, or
52     //   - The smallest remaining object would put someone else 
53     //     above average
54
55     // Build heaps
56     minHeap procs(count);
57     for(i=0; i < count; i++) {
58       InfoRecord* item = new InfoRecord;
59       item->load = stats[i].total_walltime - stats[i].idletime;
60       item->Id =  stats[i].from_pe;
61       procs.insert(item);
62     }
63       
64     maxHeap objs(myStats.obj_data_sz);
65     for(i=0; i < myStats.obj_data_sz; i++) {
66       InfoRecord* item = new InfoRecord;
67       item->load = myStats.objData[i].wallTime;
68       item->Id = i;
69       objs.insert(item);
70     }
71
72     int objs_here = myStats.obj_data_sz;
73     do {
74       if (objs_here <= 1) break;  // For now, always leave 1 object
75
76       InfoRecord* p;
77       InfoRecord* obj;
78
79       // Get the lightest-loaded processor
80       p = procs.deleteMin();
81       if (p == 0) {
82         //      CkPrintf("[%d] No destination PE found!\n",CkMyPe());
83         break;
84       }
85
86       // Get the biggest object
87       CmiBool objfound = CmiFalse;
88       do {
89         obj = objs.deleteMax();
90         if (obj == 0) break;
91
92         double new_p_load = p->load + obj->load;
93         double my_new_load = myload - obj->load;
94         if (new_p_load < my_new_load) {
95 //      if (new_p_load < avgload) {
96           objfound = CmiTrue;
97         } else {
98           // This object is too big, so throw it away
99 //        CkPrintf("[%d] Can't move object w/ load %f to proc %d load %f %f\n",
100 //                 CkMyPe(),obj->load,p->Id,p->load,avgload);
101           delete obj;
102         }
103       } while (!objfound);
104
105       if (!objfound) {
106         //      CkPrintf("[%d] No suitable object found!\n",CkMyPe());
107         break;
108       }
109
110       const int me = CkMyPe();
111       // Apparently we can give this object to this processor
112       //      CkPrintf("[%d] Obj %d of %d migrating from %d to %d\n",
113       //               CkMyPe(),obj->Id,myStats.obj_data_sz,me,p->Id);
114
115       MigrateInfo* migrateMe = new MigrateInfo;
116       migrateMe->obj = myStats.objData[obj->Id].handle;
117       migrateMe->from_pe = me;
118       migrateMe->to_pe = p->Id;
119       migrateInfo.push_back((void*)migrateMe);
120
121       objs_here--;
122       
123       // We may want to assign more to this processor, so lets
124       // update it and put it back in the heap
125       p->load += obj->load;
126       myload -= obj->load;
127       procs.insert(p);
128       
129       // This object is assigned, so we delete it from the heap
130       delete obj;
131
132     } while(myload > avgload);
133
134     // Now empty out the heaps
135     while (InfoRecord* p=procs.deleteMin())
136       delete p;
137     while (InfoRecord* obj=objs.deleteMax())
138       delete obj;
139   }  
140
141   // Now build the message to actually perform the migrations
142   int migrate_count=migrateInfo.size();
143   //  if (migrate_count > 0) {
144   //    CkPrintf("PE %d migrating %d elements\n",CkMyPe(),migrate_count);
145   //  }
146   NLBMigrateMsg* msg = new(&migrate_count,1) NLBMigrateMsg;
147   msg->n_moves = migrate_count;
148   for(i=0; i < migrate_count; i++) {
149     MigrateInfo* item = (MigrateInfo*) migrateInfo[i];
150     msg->moves[i] = *item;
151     delete item;
152     migrateInfo[i] = 0;
153   }
154
155   return msg;
156 };
157
158 #endif