doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-ldb / RefinerComm.C
1 /**
2  * \addtogroup CkLdb
3 */
4 /*@{*/
5
6 /** This code is derived from RefineLB.C, and RefineLB.C should
7  be rewritten to use this, so there is no code duplication
8 */
9
10 #include "elements.h"
11 #include "ckheap.h"
12 #include "RefinerComm.h"
13
14
15 void RefinerComm::create(int count, BaseLB::LDStats* _stats, int* procs)
16 {
17   int i;
18   stats = _stats;
19   Refiner::create(count, _stats, procs);
20
21   for (i=0; i<stats->n_comm; i++) 
22   {
23         LDCommData &comm = stats->commData[i];
24         if (!comm.from_proc()) {
25           // out going message
26           int computeIdx = stats->getSendHash(comm);
27           CmiAssert(computeIdx >= 0 && computeIdx < numComputes);
28           computes[computeIdx].sendmessages.push_back(i);
29         }
30
31         // FIXME: only obj msg here
32         // incoming messages
33         if (comm.receiver.get_type() == LD_OBJ_MSG)  {
34           int computeIdx = stats->getRecvHash(comm);
35           CmiAssert(computeIdx >= 0 && computeIdx < numComputes);
36           computes[computeIdx].recvmessages.push_back(i);
37         }
38   }
39 }
40
41 void RefinerComm::computeAverage()
42 {
43   int i;
44   double total = 0.;
45   for (i=0; i<numComputes; i++) total += computes[i].load;
46
47   for (i=0; i<P; i++) {
48     if (processors[i].available == CmiTrue) {
49         total += processors[i].backgroundLoad;
50         total += commTable->overheadOnPe(i);
51     }
52   }
53
54   averageLoad = total/numAvail;
55 }
56
57 // compute the initial per processor communication overhead
58 void RefinerComm::processorCommCost()
59 {
60   int i;
61
62   for (int cidx=0; cidx < stats->n_comm; cidx++) {
63     LDCommData& cdata = stats->commData[cidx];
64     int senderPE = -1, receiverPE = -1;
65     if (cdata.from_proc())
66       senderPE = cdata.src_proc;
67     else {
68       int idx = stats->getSendHash(cdata);
69       CmiAssert(idx != -1);
70       senderPE = computes[idx].oldProcessor;    // object's original processor
71     }
72     CmiAssert(senderPE != -1);
73     int ctype = cdata.receiver.get_type();
74     if (ctype==LD_PROC_MSG || ctype==LD_OBJ_MSG) {
75       if (ctype==LD_PROC_MSG)
76         receiverPE = cdata.receiver.proc();
77       else {    // LD_OBJ_MSG
78         int idx = stats->getRecvHash(cdata);
79         CmiAssert(idx != -1);
80         receiverPE = computes[idx].oldProcessor;
81       }
82       CmiAssert(receiverPE != -1);
83       if(senderPE != receiverPE)
84       {
85         commTable->increase(true, senderPE, cdata.messages, cdata.bytes);
86         commTable->increase(false, receiverPE, cdata.messages, cdata.bytes);
87       }
88     }
89     else if (ctype == LD_OBJLIST_MSG) {
90       int nobjs;
91       LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
92       for (i=0; i<nobjs; i++) {
93         int idx = stats->getHash(objs[i]);
94         if(idx == -1)
95              if (_lb_args.migObjOnly()) continue;
96              else CkAbort("Error in search\n");
97         receiverPE = computes[idx].oldProcessor;
98         CmiAssert(receiverPE != -1);
99         if(senderPE != receiverPE)
100         {
101           commTable->increase(true, senderPE, cdata.messages, cdata.bytes);
102           commTable->increase(false, receiverPE, cdata.messages, cdata.bytes);
103         }
104       }
105     }
106   }
107   // recalcualte the cpu load
108   for (i=0; i<P; i++) 
109   {
110     processorInfo *p = &processors[i];
111     p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(i);
112   }
113 }
114
115 void RefinerComm::assign(computeInfo *c, int processor)
116 {
117   assign(c, &(processors[processor]));
118 }
119
120 void RefinerComm::assign(computeInfo *c, processorInfo *p)
121 {
122    c->processor = p->Id;
123    p->computeSet->insert((InfoRecord *) c);
124    p->computeLoad += c->load;
125 //   p->load = p->computeLoad + p->backgroundLoad;
126    // add communication cost
127    Messages m;
128    objCommCost(c->Id, p->Id, m);
129    commTable->increase(true, p->Id, m.msgSent, m.byteSent);
130    commTable->increase(false, p->Id, m.msgRecv, m.byteRecv);
131
132 //   CmiPrintf("Assign %d to %d commCost: %d %d %d %d \n", c->Id, p->Id, byteSent,msgSent,byteRecv,msgRecv);
133
134    commAffinity(c->Id, p->Id, m);
135    commTable->increase(false, p->Id, -m.msgSent, -m.byteSent);
136    commTable->increase(true, p->Id, -m.msgRecv, -m.byteRecv);   // reverse
137
138 //   CmiPrintf("Assign %d to %d commAffinity: %d %d %d %d \n", c->Id, p->Id, -byteSent,-msgSent,-byteRecv,-msgRecv);
139
140    p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(p->Id);
141 }
142
143 void  RefinerComm::deAssign(computeInfo *c, processorInfo *p)
144 {
145 //   c->processor = -1;
146    p->computeSet->remove(c);
147    p->computeLoad -= c->load;
148 //   p->load = p->computeLoad + p->backgroundLoad;
149    Messages m;
150    objCommCost(c->Id, p->Id, m);
151    commTable->increase(true, p->Id, -m.msgSent, -m.byteSent);
152    commTable->increase(false, p->Id, -m.msgRecv, -m.byteRecv);
153    
154    commAffinity(c->Id, p->Id, m);
155    commTable->increase(true, p->Id, m.msgSent, m.byteSent);
156    commTable->increase(false, p->Id, m.msgRecv, m.byteRecv);
157
158    p->load = p->computeLoad + p->backgroundLoad + commTable->overheadOnPe(p->Id);
159 }
160
161 // how much communication from compute c  to pe
162 // byteSent, msgSent are messages from object c to pe p
163 // byteRecv, msgRecv are messages from pe p to obejct c
164 void RefinerComm::commAffinity(int c, int pe, Messages &m)
165 {
166   int i;
167   m.clear();
168   computeInfo &obj = computes[c];
169
170   int nSendMsgs = obj.sendmessages.length();
171   for (i=0; i<nSendMsgs; i++) {
172     LDCommData &cdata = stats->commData[obj.sendmessages[i]];
173     bool sendtope = false;
174     if (cdata.receiver.get_type() == LD_OBJ_MSG) {
175       int recvCompute = stats->getRecvHash(cdata);
176       int recvProc = computes[recvCompute].processor;
177       if (recvProc != -1 && recvProc == pe) sendtope = true;
178     }
179     else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {  // multicast
180       int nobjs;
181       LDObjKey *recvs = cdata.receiver.get_destObjs(nobjs);
182       for (int j=0; j<nobjs; j++) {
183         int recvCompute = stats->getHash(recvs[j]);
184         int recvProc = computes[recvCompute].processor; // FIXME
185         if (recvProc != -1 && recvProc == pe) { sendtope = true; continue; }
186       }  
187     }
188     if (sendtope) {
189       m.byteSent += cdata.bytes;
190       m.msgSent += cdata.messages;
191     }
192   }  // end of for
193
194   int nRecvMsgs = obj.recvmessages.length();
195   for (i=0; i<nRecvMsgs; i++) {
196     LDCommData &cdata = stats->commData[obj.recvmessages[i]];
197     int sendProc;
198     if (cdata.from_proc()) {
199       sendProc = cdata.src_proc;
200     }
201     else {
202       int sendCompute = stats->getSendHash(cdata);
203       sendProc = computes[sendCompute].processor;
204     }
205     if (sendProc != -1 && sendProc == pe) {
206       m.byteRecv += cdata.bytes;
207       m.msgRecv += cdata.messages;
208     }
209   }  // end of for
210 }
211
212 // assume c is on pe, how much comm overhead it will be?
213 void RefinerComm::objCommCost(int c, int pe, Messages &m)
214 {
215   int i;
216   m.clear();
217   computeInfo &obj = computes[c];
218
219   // find out send overhead for every outgoing message that has receiver
220   // not same as pe
221   int nSendMsgs = obj.sendmessages.length();
222   for (i=0; i<nSendMsgs; i++) {
223     LDCommData &cdata = stats->commData[obj.sendmessages[i]];
224     bool diffPe = false;
225     if (cdata.receiver.get_type() == LD_PROC_MSG) {
226       CmiAssert(0);
227     }
228     if (cdata.receiver.get_type() == LD_OBJ_MSG) {
229       int recvCompute = stats->getRecvHash(cdata);
230       int recvProc = computes[recvCompute].processor;
231       if (recvProc!= -1 && recvProc != pe) diffPe = true;
232     }
233     else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {  // multicast
234       int nobjs;
235       LDObjKey *recvs = cdata.receiver.get_destObjs(nobjs);
236       for (int j=0; j<nobjs; j++) {
237         int recvCompute = stats->getHash(recvs[j]);
238         int recvProc = computes[recvCompute].processor; // FIXME
239         if (recvProc!= -1 && recvProc != pe) { diffPe = true; }
240       }  
241     }
242     if (diffPe) {
243       m.byteSent += cdata.bytes;
244       m.msgSent += cdata.messages;
245     }
246   }  // end of for
247
248   // find out recv overhead for every incoming message that has sender
249   // not same as pe
250   int nRecvMsgs = obj.recvmessages.length();
251   for (i=0; i<nRecvMsgs; i++) {
252     LDCommData &cdata = stats->commData[obj.recvmessages[i]];
253     bool diffPe = false;
254     if (cdata.from_proc()) {
255       if (cdata.src_proc != pe) diffPe = true;
256     }
257     else {
258       int sendCompute = stats->getSendHash(cdata);
259       int sendProc = computes[sendCompute].processor;
260       if (sendProc != -1 && sendProc != pe) diffPe = true;
261     }
262     if (diffPe) {       // sender is not pe
263       m.byteRecv += cdata.bytes;
264       m.msgRecv += cdata.messages;
265     }
266   }  // end of for
267 }
268
269 int RefinerComm::refine()
270 {
271   int i;
272   int finish = 1;
273
274   maxHeap *heavyProcessors = new maxHeap(P);
275   Set *lightProcessors = new Set();
276   for (i=0; i<P; i++) {
277     if (isHeavy(&processors[i])) {  
278       //      CkPrintf("Processor %d is HEAVY: load:%f averageLoad:%f!\n",
279      //                i, processors[i].load, averageLoad);
280       heavyProcessors->insert((InfoRecord *) &(processors[i]));
281     } else if (isLight(&processors[i])) {
282       //      CkPrintf("Processor %d is LIGHT: load:%f averageLoad:%f!\n",
283      //                i, processors[i].load, averageLoad);
284       lightProcessors->insert((InfoRecord *) &(processors[i]));
285     }
286   }
287   int done = 0;
288
289   while (!done) {
290     double bestSize;
291     computeInfo *bestCompute;
292     processorInfo *bestP;
293     
294     processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
295     if (!donor) break;
296
297     //find the best pair (c,receiver)
298     Iterator nextProcessor;
299     processorInfo *p = (processorInfo *) 
300       lightProcessors->iterator((Iterator *) &nextProcessor);
301     bestSize = 0;
302     bestP = NULL;
303     bestCompute = NULL;
304
305     while (p) {
306       Iterator nextCompute;
307       nextCompute.id = 0;
308       computeInfo *c = (computeInfo *) 
309         donor->computeSet->iterator((Iterator *)&nextCompute);
310       //CmiPrintf("Considering Procsessor : %d with load: %f for donor: %d\n", p->Id, p->load, donor->Id);
311       while (c) {
312         if (!c->migratable) {
313           nextCompute.id++;
314           c = (computeInfo *) 
315             donor->computeSet->next((Iterator *)&nextCompute);
316           continue;
317         }
318         //CkPrintf("c->load: %f p->load:%f overLoad*averageLoad:%f \n",
319         //c->load, p->load, overLoad*averageLoad);
320         Messages m;
321         objCommCost(c->Id, donor->Id, m);
322         double commcost = m.cost();
323         commAffinity(c->Id, p->Id, m);
324         double commgain = m.cost();;
325
326         //CmiPrintf("Considering Compute: %d with load %f commcost:%f commgain:%f\n", c->Id, c->load, commcost, commgain);
327         if ( c->load + p->load + commcost - commgain < overLoad*averageLoad) {
328           //CmiPrintf("[%d] comm gain %f bestSize:%f\n", c->Id, commgain, bestSize);
329           if(c->load + commcost - commgain > bestSize) {
330             bestSize = c->load + commcost - commgain;
331             bestCompute = c;
332             bestP = p;
333           }
334         }
335         nextCompute.id++;
336         c = (computeInfo *) 
337           donor->computeSet->next((Iterator *)&nextCompute);
338       }
339       p = (processorInfo *) 
340         lightProcessors->next((Iterator *) &nextProcessor);
341     }
342
343     if (bestCompute) {
344       if (_lb_args.debug())
345         CkPrintf("Assign: [%d] with load: %f from %d to %d \n",
346                bestCompute->Id, bestCompute->load, 
347                donor->Id, bestP->Id);
348       deAssign(bestCompute, donor);      
349       assign(bestCompute, bestP);
350
351       // show the load
352       if (_lb_args.debug())  printLoad();
353
354       // update commnication
355       computeAverage();
356       delete heavyProcessors;
357       delete lightProcessors;
358       heavyProcessors = new maxHeap(P);
359       lightProcessors = new Set();
360       for (i=0; i<P; i++) {
361         if (isHeavy(&processors[i])) {  
362           //      CkPrintf("Processor %d is HEAVY: load:%f averageLoad:%f!\n",
363           //           i, processors[i].load, averageLoad);
364           heavyProcessors->insert((InfoRecord *) &(processors[i]));
365         } else if (isLight(&processors[i])) {
366           lightProcessors->insert((InfoRecord *) &(processors[i]));
367         }
368       }
369       if (_lb_args.debug()) CmiPrintf("averageLoad after assignment: %f\n", averageLoad);
370     } else {
371       finish = 0;
372       break;
373     }
374
375
376 /*
377     if (bestP->load > averageLoad)
378       lightProcessors->remove(bestP);
379     
380     if (isHeavy(donor))
381       heavyProcessors->insert((InfoRecord *) donor);
382     else if (isLight(donor))
383       lightProcessors->insert((InfoRecord *) donor);
384 */
385   }  
386
387   delete heavyProcessors;
388   delete lightProcessors;
389
390   return finish;
391 }
392
393 void RefinerComm::Refine(int count, BaseLB::LDStats* stats, 
394                      int* cur_p, int* new_p)
395 {
396   //  CkPrintf("[%d] Refiner strategy\n",CkMyPe());
397
398   P = count;
399   numComputes = stats->n_objs;
400   computes = new computeInfo[numComputes];
401   processors = new processorInfo[count];
402   commTable = new CommTable(P);
403
404   // fill communication hash table
405   stats->makeCommHash();
406
407   create(count, stats, cur_p);
408
409   int i;
410   for (i=0; i<numComputes; i++)
411     assign((computeInfo *) &(computes[i]),
412            (processorInfo *) &(processors[computes[i].oldProcessor]));
413
414   commTable->clear();
415
416   // recalcualte the cpu load
417   processorCommCost();
418
419   removeComputes();
420   if (_lb_args.debug())  printLoad();
421
422   computeAverage();
423   if (_lb_args.debug()) CmiPrintf("averageLoad: %f\n", averageLoad);
424
425   multirefine();
426
427   for (int pe=0; pe < P; pe++) {
428     Iterator nextCompute;
429     nextCompute.id = 0;
430     computeInfo *c = (computeInfo *)
431       processors[pe].computeSet->iterator((Iterator *)&nextCompute);
432     while(c) {
433       new_p[c->Id] = c->processor;
434 //      if (c->oldProcessor != c->processor)
435 //      CkPrintf("Refiner::Refine: from %d to %d\n", c->oldProcessor, c->processor);
436       nextCompute.id++;
437       c = (computeInfo *) processors[pe].computeSet->
438                      next((Iterator *)&nextCompute);
439     }
440   }
441
442   delete [] computes;
443   delete [] processors;
444   delete commTable;
445 }
446
447 RefinerComm::CommTable::CommTable(int P)
448 {
449   count = P;
450   msgSentCount = new int[P]; // # of messages sent by each PE
451   msgRecvCount = new int[P]; // # of messages received by each PE
452   byteSentCount = new int[P];// # of bytes sent by each PE
453   byteRecvCount = new int[P];// # of bytes reeived by each PE
454   clear();
455 }
456
457 RefinerComm::CommTable::~CommTable()
458 {
459   delete [] msgSentCount;
460   delete [] msgRecvCount;
461   delete [] byteSentCount;
462   delete [] byteRecvCount;
463 }
464
465 void RefinerComm::CommTable::clear()
466 {
467   for(int i = 0; i < count; i++)
468     msgSentCount[i] = msgRecvCount[i] = byteSentCount[i] = byteRecvCount[i] = 0;
469 }
470
471 void RefinerComm::CommTable::increase(bool issend, int pe, int msgs, int bytes)
472 {
473   if (issend) {
474     msgSentCount[pe] += msgs;
475     byteSentCount[pe] += bytes;
476   }
477   else {
478     msgRecvCount[pe] += msgs;
479     byteRecvCount[pe] += bytes;
480   }
481 }
482
483 double RefinerComm::CommTable::overheadOnPe(int pe)
484 {
485   return msgRecvCount[pe]  * PER_MESSAGE_RECV_OVERHEAD +
486          msgSentCount[pe]  * _lb_args.alpha() +
487          byteRecvCount[pe] * PER_BYTE_RECV_OVERHEAD +
488          byteSentCount[pe] * _lb_args.beeta();
489 }
490
491 /*@}*/