923e0c7b174eb09bdd8caa5558432c28b2d3ccbc
[charm.git] / src / ck-ldb / LBDBManager.C
1 #include <converse.h>
2
3 #if CMK_LBDB_ON
4
5 #include <iostream.h>
6 #include "LBDBManager.h"
7
8 struct MigrateCB;
9
10 #if CMK_STL_USE_DOT_H
11 template class vector<LBOM*>;
12 template class vector<LBObj*>;
13 template class vector<LBDB::MigrateCB*>;
14 template class vector<LocalBarrier::client*>;
15 template class vector<LocalBarrier::receiver*>;
16 #else
17 template class std::vector<LBOM*>;
18 template class std::vector<LBObj*>;
19 template class std::vector<LBDB::MigrateCB*>;
20 template class std::vector<LocalBarrier::client*>;
21 template class std::vector<LocalBarrier::receiver*>;
22 #endif
23
24 /*************************************************************
25  * LBDB Code
26  *************************************************************/
27
28 LDOMHandle LBDB::AddOM(LDOMid _userID, void* _userData, 
29                        LDCallbacks _callbacks)
30 {
31   LDOMHandle newhandle;
32
33   newhandle.ldb.handle = static_cast<void *>(this);
34   newhandle.user_ptr = _userData;
35   newhandle.id = _userID;
36
37   LBOM* om = new LBOM(this,_userID,_userData,_callbacks);
38   if (om != NULL) {
39     newhandle.handle = oms.size();
40     oms.push_back(om);
41   } else newhandle.handle = -1;
42   om->DepositHandle(newhandle);
43   omCount++;
44   return newhandle;
45 }
46
47 LDObjHandle LBDB::AddObj(LDOMHandle _h, LDObjid _id,
48                          void *_userData, CmiBool _migratable)
49 {
50   LDObjHandle newhandle;
51
52   newhandle.omhandle = _h;
53   newhandle.user_ptr = _userData;
54   newhandle.id = _id;
55   
56   LBObj *obj = new LBObj(this,_h,_id,_userData,_migratable);
57   if (obj != NULL) {
58     newhandle.handle = objs.size();
59     objs.push_back(obj);
60   } else {
61     newhandle.handle = -1;
62   }
63   obj->DepositHandle(newhandle);
64   objCount++;
65   return newhandle;
66 }
67
68 void LBDB::UnregisterObj(LDObjHandle _h)
69 {
70   objs[_h.handle]->registered=CmiFalse;
71 }
72
73 void LBDB::RegisteringObjects(LDOMHandle _h)
74 {
75   LBOM* om = oms[_h.handle];
76   if (!om->RegisteringObjs()) {
77     if (oms_registering == 0)
78       localBarrier.TurnOff();
79     oms_registering++;
80     om->SetRegisteringObjs(CmiTrue);
81   }
82 }
83
84 void LBDB::DoneRegisteringObjects(LDOMHandle _h)
85 {
86   LBOM* om = oms[_h.handle];
87   if (om->RegisteringObjs()) {
88     oms_registering--;
89     if (oms_registering == 0)
90       localBarrier.TurnOn();
91     om->SetRegisteringObjs(CmiFalse);
92   }
93 }
94
95
96 void LBDB::Send(LDOMHandle destOM, LDObjid destid, unsigned int bytes)
97 {
98   LBCommData* item_ptr;
99
100   if (obj_running) {
101     LBCommData item(runningObj,destOM.id,destid);
102     item_ptr = commTable->HashInsertUnique(item);
103
104 //     CmiPrintf("[%d] Sending %d from object manager %d, object {%d,%d,%d,%d}\n"
105 //            "     to object manager %d, object {%d,%d,%d,%d}\n",
106 //            CmiMyPe(),bytes,
107 //            runningObj.omhandle.id.id,
108 //            runningObj.id.id[0],runningObj.id.id[1],
109 //            runningObj.id.id[2],runningObj.id.id[3],
110 //            destOM.id.id,
111 //            destid.id[0],destid.id[1],
112 //            destid.id[2],destid.id[3]
113 //            );
114   } else {
115     LBCommData item(CmiMyPe(),destOM.id,destid);
116     item_ptr = commTable->HashInsertUnique(item);
117
118 //     CmiPrintf("[%d] Sending %d from processor %d\n"
119 //            "     to object manager %d, object {%d,%d,%d,%d}\n",
120 //            CmiMyPe(),bytes,
121 //            CmiMyPe(),
122 //            destOM.id.id,
123 //            destid.id[0],destid.id[1],
124 //            destid.id[2],destid.id[3]
125 //            );
126   }  
127   item_ptr->addMessage(bytes);
128 }
129
130 void LBDB::ClearLoads(void)
131 {
132   int i;
133   for(i=0; i < objCount; i++)
134     if (objs[i]->registered)
135     {
136       objs[i]->data.wallTime = 
137         objs[i]->data.cpuTime = 0.;
138     }
139   delete commTable;
140   commTable = new LBCommTable;
141   machineUtil.Clear();
142   obj_walltime = obj_cputime = 0;
143 }
144
145 int LBDB::ObjDataCount()
146 {
147   int nitems=0;
148   int i;
149   for(i=0; i < objCount; i++)
150     if (objs[i]->registered)
151       nitems++;
152   return nitems;
153 }
154
155 void LBDB::GetObjData(LDObjData *dp)
156 {
157   for(ObjList::iterator ol = objs.begin(); ol != objs.end(); ol++)
158     if ((*ol)->registered)
159       *dp++ = (*ol)->ObjData();
160 }
161
162 void LBDB::Migrate(LDObjHandle h, int dest)
163 {
164   if (h.handle > objCount)
165     CmiPrintf("[%d] Handle %d out of range 0-%d\n",CmiMyPe(),h.handle,objCount);
166   else if (!objs[h.handle]->registered)
167     CmiPrintf("[%d] Handle %d no longer registered, range 0-%d\n",
168             CmiMyPe(),h.handle,objCount);
169
170   if ((h.handle < objCount) && (objs[h.handle]->registered)) {
171     LBOM *const om = oms[objs[h.handle]->parentOM.handle];
172     om->Migrate(h, dest);
173   }
174   return;
175 }
176
177 void LBDB::Migrated(LDObjHandle h)
178 {
179   // Object migrated, inform load balancers
180
181   for(int i=0; i < migrateCBList.size(); i++)
182     (migrateCBList[i]->fn)(migrateCBList[i]->data,h);
183   
184 }
185
186 void LBDB::NotifyMigrated(LDMigratedFn fn, void* data)
187 {
188   // Save migration function
189   MigrateCB* callbk = new MigrateCB;
190
191   callbk->fn = fn;
192   callbk->data = data;
193   migrateCBList.push_back(callbk);
194 }
195
196 void LBDB::BackgroundLoad(double* walltime, double* cputime)
197 {
198   double totalwall;
199   double totalcpu;
200   TotalTime(&totalwall,&totalcpu);
201
202   double idle;
203   IdleTime(&idle);
204   
205   *walltime = totalwall - idle - obj_walltime;
206   *cputime = totalcpu - obj_cputime;
207 }
208
209 void LBDB::DumpDatabase()
210 {
211 #ifdef DEBUG  
212   CmiPrintf("Database contains %d object managers\n",omCount);
213   CmiPrintf("Database contains %d objects\n",objCount);
214 #endif
215 }
216
217 LDBarrierClient LocalBarrier::AddClient(LDResumeFn fn, void* data)
218 {
219   client* new_client = new client;
220   new_client->fn = fn;
221   new_client->data = data;
222   new_client->refcount = cur_refcount;
223
224   LDBarrierClient ret_val;
225   ret_val.serial = max_client;
226   clients.push_back(new_client);
227   max_client++;
228
229   client_count++;
230
231   return ret_val;
232 }
233
234 void LocalBarrier::RemoveClient(LDBarrierClient c)
235 {
236   const int cnum = c.serial;
237   if (cnum < max_client && clients[cnum] != 0) {
238     delete clients[cnum];
239     clients[cnum] = 0;
240     client_count--;
241   }
242 }
243
244 LDBarrierReceiver LocalBarrier::AddReceiver(LDBarrierFn fn, void* data)
245 {
246   receiver* new_receiver = new receiver;
247   new_receiver->fn = fn;
248   new_receiver->data = data;
249
250   LDBarrierReceiver ret_val;
251   ret_val.serial = max_receiver;
252   receivers.push_back(new_receiver);
253   max_receiver++;
254
255   return ret_val;
256 }
257
258 void LocalBarrier::RemoveReceiver(LDBarrierReceiver c)
259 {
260   const int cnum = c.serial;
261   if (cnum < max_receiver && receivers[cnum] != 0) {
262     delete receivers[cnum];
263     receivers[cnum] = 0;
264   }
265 }
266
267 void LocalBarrier::AtBarrier(LDBarrierClient h)
268 {
269   clients[h.serial]->refcount++;
270   at_count++;
271   CheckBarrier();
272 }
273
274 void LocalBarrier::CheckBarrier()
275 {
276   if (!on) return;
277
278   if (at_count >= client_count) {
279     CmiBool at_barrier = CmiFalse;
280
281     for(int i=0; i < max_client; i++)
282       if (clients[i] != 0 && clients[i]->refcount >= cur_refcount)
283         at_barrier = CmiTrue;
284
285     if (at_barrier) {
286       at_count -= client_count;
287       cur_refcount++;
288       CallReceivers();
289     }
290   }
291 }
292
293 void LocalBarrier::CallReceivers(void)
294 {
295   CmiBool called_receiver=CmiFalse;
296
297 //  for(int i=0; i < max_receiver; i++)
298     for (int i=max_receiver-1; i>=0; i--)
299     if (receivers[i] != 0) {
300       receivers[i]->fn(receivers[i]->data);
301       called_receiver = CmiTrue;
302     }
303
304   if (!called_receiver)
305     ResumeClients();
306   
307 }
308
309 void LocalBarrier::ResumeClients(void)
310 {
311   for(int i=0; i < max_client; i++)
312     if (clients[i] != 0) 
313       clients[i]->fn(clients[i]->data);
314 }
315
316 #endif