Merge nodehelper lib and example codes into charm
[charm.git] / src / ck-ldb / TopoCentLB.C
1 /**************************************************************************
2 ** Amit Sharma (asharma6@uiuc.edu)
3 ** November 23, 2004
4 **
5 ** This is a topology conscious load balancer.
6 ** It migrates objects to new processors based on the topology in which the processors are connected.
7 ****************************************************************************/
8
9 #include <math.h>
10 #include <stdlib.h>
11 #include "TopoCentLB.decl.h"
12 #include "TopoCentLB.h"
13
14 #define alpha PER_MESSAGE_SEND_OVERHEAD_DEFAULT  /*Startup time per message, seconds*/
15 #define beta PER_BYTE_SEND_OVERHEAD_DEFAULT     /*Long-message time per byte, seconds*/
16 #define DEG_THRES 0.50
17
18 //#define MAX_EDGE
19 //#define RAND_COMM
20 #define make_mapping 0
21
22 CreateLBFunc_Def(TopoCentLB,"Balance objects based on the network topology")
23
24
25 /*static void lbinit (void)
26 {
27   LBRegisterBalancer ("TopoCentLB",
28                       CreateTopoCentLB,
29                       AllocateTopoCentLB,
30                       "Balance objects based on the network topology");
31 }*/
32
33
34 TopoCentLB::TopoCentLB(const CkLBOptions &opt) : CentralLB (opt)
35 {
36   lbname = "TopoCentLB";
37   if (CkMyPe () == 0) {
38     CkPrintf ("[%d] TopoCentLB created\n",CkMyPe());
39   }
40 }
41
42
43 CmiBool TopoCentLB::QueryBalanceNow (int _step)
44 {
45   return CmiTrue;
46 }
47
48 TopoCentLB::~TopoCentLB(){
49         if(topo) delete topo;
50 }
51
52 /*This routine partitions the task graph minimizing the communication and balancing the object load on all partitions*/
53 /*It uses METIS library to accomplish that*/
54 void TopoCentLB::computePartitions(CentralLB::LDStats *stats,int count,int *newmap)
55 {
56         
57   int numobjs = stats->n_objs;
58         int i, j, m;
59
60   // allocate space for the computing data
61   double *objtime = new double[numobjs];
62   int *objwt = new int[numobjs];
63   int *origmap = new int[numobjs];
64   LDObjHandle *handles = new LDObjHandle[numobjs];
65   
66         for(i=0;i<numobjs;i++) {
67     objtime[i] = 0.0;
68     objwt[i] = 0;
69     origmap[i] = 0;
70   }
71
72   //Prepare compute loads for METIS library
73   for (i=0; i<stats->n_objs; i++) {
74     LDObjData &odata = stats->objData[i];
75     if (!odata.migratable) 
76       CmiAbort("MetisLB doesnot dupport nonmigratable object.\n");
77     int frompe = stats->from_proc[i];
78     origmap[i] = frompe;
79     objtime[i] = odata.wallTime*stats->procs[frompe].pe_speed;
80     handles[i] = odata.handle;
81   }
82
83   // to convert the weights on vertices to integers
84   double max_objtime = objtime[0];
85   for(i=0; i<numobjs; i++) {
86     if(max_objtime < objtime[i])
87       max_objtime = objtime[i];
88   }
89         int maxobj=0;
90         int totalwt=0;
91   double ratio = 1000.0/max_objtime;
92   for(i=0; i<numobjs; i++) {
93       objwt[i] = (int)(objtime[i]*ratio);
94                         if(maxobj<objwt[i])
95                                 maxobj=objwt[i];
96                         totalwt+=objwt[i];
97   }
98         
99   int **comm = new int*[numobjs];
100   for (i=0; i<numobjs; i++) {
101     comm[i] = new int[numobjs];
102     for (j=0; j<numobjs; j++)  {
103       comm[i][j] = 0;
104     }
105   }
106
107   //Prepare communication for METIS library
108   const int csz = stats->n_comm;
109   for(i=0; i<csz; i++) {
110       LDCommData &cdata = stats->commData[i];
111       //if(cdata.from_proc() || cdata.receiver.get_type() != LD_OBJ_MSG)
112         //continue;
113                         if(!cdata.from_proc() && cdata.receiver.get_type() == LD_OBJ_MSG){
114         int senderID = stats->getHash(cdata.sender);
115         int recverID = stats->getHash(cdata.receiver.get_destObj());
116         CmiAssert(senderID < numobjs);
117         CmiAssert(recverID < numobjs);
118                                 comm[senderID][recverID] += cdata.messages;
119         comm[recverID][senderID] += cdata.messages;
120                                 //Use bytes or messages -- do i include messages for objlist too...??
121                         }
122                         else if (cdata.receiver.get_type() == LD_OBJLIST_MSG) {
123                                 //CkPrintf("in objlist..\n");
124         int nobjs;
125         LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
126         int senderID = stats->getHash(cdata.sender);
127         for (j=0; j<nobjs; j++) {
128            int recverID = stats->getHash(objs[j]);
129            if((senderID == -1)||(recverID == -1))
130               if (_lb_args.migObjOnly()) continue;
131               else CkAbort("Error in search\n");
132            comm[senderID][recverID] += cdata.messages;
133            comm[recverID][senderID] += cdata.messages;
134         }
135                         }
136                 }
137
138 // ignore messages sent from an object to itself
139   for (i=0; i<numobjs; i++)
140     comm[i][i] = 0;
141
142   // construct the graph in CSR format
143   int *xadj = new int[numobjs+1];
144   int numedges = 0;
145   for(i=0;i<numobjs;i++) {
146     for(j=0;j<numobjs;j++) {
147       if(comm[i][j] != 0)
148         numedges++;
149     }
150   }
151   int *adjncy = new int[numedges];
152   int *edgewt = new int[numedges];
153         int factor = 10;
154   xadj[0] = 0;
155   int count4all = 0;
156   for (i=0; i<numobjs; i++) {
157     for (j=0; j<numobjs; j++) { 
158       if (comm[i][j] != 0) { 
159         adjncy[count4all] = j;
160         edgewt[count4all++] = comm[i][j]/factor;
161       }
162     }
163     xadj[i+1] = count4all;
164   }
165
166   //Call METIS routine
167   int wgtflag = 3; // Weights both on vertices and edges
168   int numflag = 0; // C Style numbering
169   int options[5];
170   int edgecut;
171   options[0] = 0;
172
173   if (count < 1) {
174     CkPrintf("error: Number of Pe less than 1!");
175   }
176   else if (count == 1) {
177         for(m=0;m<numobjs;m++) 
178                         newmap[i] = origmap[i];
179   }
180   else {
181                 /*
182         if (count > 8)
183                         METIS_PartGraphKway(&numobjs, xadj, adjncy, objwt, edgewt, 
184                             &wgtflag, &numflag, &count, options, 
185                             &edgecut, newmap);
186           else
187                         METIS_PartGraphRecursive(&numobjs, xadj, adjncy, objwt, edgewt, 
188                                  &wgtflag, &numflag, &count, options, 
189                                  &edgecut, newmap);
190                 */
191                 METIS_PartGraphRecursive(&numobjs, xadj, adjncy, objwt, edgewt,
192                                  &wgtflag, &numflag, &count, options,
193                                  &edgecut, newmap);
194   }
195          
196  
197   //Debugging code: Checking load on each partition
198   if(_lb_args.debug() >=2){
199           int total=0;
200           int *chkwt = new int[count];
201           for(i=0;i<count;i++)
202                   chkwt[i]=0;
203           for(i=0;i<numobjs;i++){
204                 chkwt[newmap[i]] += objwt[i];
205                   total += objwt[i];
206           }
207           for(i=0;i<count;i++)
208                   CkPrintf("%d -- %d\n",i,chkwt[i]);
209           CkPrintf("Totalwt of all partitions after call to METIS:%d, Avg is %d\n",total,total/count);
210   }
211
212   //Clean up all the variables allocated in this routine
213   for(i=0;i<numobjs;i++)
214     delete[] comm[i];
215   delete[] comm;
216   delete[] objtime;
217   delete[] xadj;
218   delete[] adjncy;
219   delete[] objwt;
220   delete[] edgewt;
221         delete[] handles;
222   delete[] origmap;
223
224 }
225
226 int TopoCentLB::findMaxObjs(int *map,int totalobjs,int count)
227 {
228         int *max_num = new int[count];
229         int i;
230         int maxobjs=0;
231         
232         for(i=0;i<count;i++)
233                 max_num[i]=0;
234                 
235         for(i=0;i<totalobjs;i++)
236                 max_num[map[i]]++;
237         
238         for(i=0;i<count;i++)
239                 if(max_num[i]>maxobjs)
240                         maxobjs = max_num[i];
241         
242         delete[] max_num;
243
244         return maxobjs;
245 }
246
247 void TopoCentLB::Heapify(HeapNode *heap, int node, int heapSize)
248 {
249   int left = 2*node+1;
250   int right = 2*node+2;
251   int xchange;
252         
253   if (left < heapSize && (heap[left].key > heap[node].key))
254     xchange = left;
255   else 
256                 xchange = node;
257   
258   if (right < heapSize && (heap[right].key > heap[xchange].key))
259     xchange = right;
260
261   if (xchange != node) {
262     HeapNode tmp;
263     tmp = heap[node];
264     heap[node] = heap[xchange];
265     heap[xchange] = tmp;
266                 heapMapping[heap[node].node]=node;
267                 heapMapping[heap[xchange].node]=xchange;
268     Heapify(heap,xchange,heapSize);
269   }    
270 }
271
272
273 TopoCentLB::HeapNode TopoCentLB::extractMax(HeapNode *heap,int *heapSize){
274
275         if(*heapSize < 1)
276                 CmiAbort("Empty Heap passed to extractMin!\n");
277
278         HeapNode max = heap[0];
279         heap[0] = heap[*heapSize-1];
280         heapMapping[heap[0].node]=0;
281         *heapSize = *heapSize - 1;
282         Heapify(heap,0,*heapSize);
283         return max;
284 }
285
286 void TopoCentLB::BuildHeap(HeapNode *heap,int heapSize){
287         for(int i=heapSize/2; i >= 0; i--)
288                 Heapify(heap,i,heapSize);
289 }
290
291 void TopoCentLB :: increaseKey(HeapNode *heap,int i,double wt){
292         if(wt != -1.00){
293                 #ifdef MAX_EDGE
294                         if(wt>heap[i].key)
295                                 heap[i].key = wt;
296                 #else
297                         heap[i].key += wt;
298                 #endif
299         }
300         int parent = (i-1)/2;
301         
302         if(heap[parent].key >= heap[i].key)
303                 return;
304         else {
305                 HeapNode tmp = heap[parent];
306                 heap[parent] = heap[i];
307                 heap[i] = tmp;
308                 heapMapping[heap[parent].node]=parent;
309                 heapMapping[heap[i].node]=i;
310                 increaseKey(heap,parent,-1.00);
311         }
312 }
313
314 /*This routine implements the algorithm used to produce the partition-processor mapping*/
315 /*The algorithm uses an idea similar to the standard MST algorithm*/
316 void TopoCentLB :: calculateMST(PartGraph *partgraph,LBTopology *topo,int *proc_mapping,int max_comm_part) {
317
318   int *inHeap;
319   double *keys;
320   int count = partgraph->n_nodes;
321   int i=0,j=0;
322
323   //Arrays needed for keeping information
324   inHeap = new int[partgraph->n_nodes];
325   keys = new double[partgraph->n_nodes];
326
327   int *assigned_procs = new int[count];
328
329   hopCount = new double*[count];
330   for(i=0;i<count;i++){
331     proc_mapping[i]=-1;
332     assigned_procs[i]=0;
333     hopCount[i] = new double[count];
334     for(j=0;j<count;j++)
335       hopCount[i][j] = 0;
336   }
337
338   //Call a topology routine to fill up hopCount
339   topo->get_pairwise_hop_count(hopCount);
340         
341   int max_neighbors = topo->max_neighbors();
342         
343   HeapNode *heap = new HeapNode[partgraph->n_nodes];
344   heapMapping = new int[partgraph->n_nodes];
345         
346   int heapSize = 0;
347
348   for(i=0;i<partgraph->n_nodes;i++){
349     heap[i].key = 0.00;
350     heap[i].node = i;
351     keys[i] = 0.00;
352     inHeap[i] = 1;
353     heapMapping[i]=i;
354   }
355
356   //Assign the max comm partition first
357   heap[max_comm_part].key = 1.00;
358         
359   heapSize = partgraph->n_nodes;
360   BuildHeap(heap,heapSize);
361
362   int k=0,comm_cnt=0,m=0;
363   int *commParts = new int[partgraph->n_nodes];
364         
365   //srand(count);
366
367   while(heapSize > 0){
368
369     /****Phase1: Extracting appropriate partition from heap****/
370
371     HeapNode max = extractMax(heap,&heapSize);
372     inHeap[max.node] = 0;
373
374     for(i=0;i<partgraph->n_nodes;i++){
375       commParts[i]=-1;
376       PartGraph::Edge wt = partgraph->edges[max.node][i];
377       if(wt == 0)
378         continue;
379       if(inHeap[i]){
380 #ifdef MAX_EDGE
381         if(wt>keys[i])
382           keys[i]=wt;
383 #else
384         keys[i] += wt;
385 #endif
386         /*This part has been COMMENTED out for optimizing the code: we handle the updation using heapMapping*/
387         /*array instead of searching for node in the heap everytime*/
388
389         //Update in the heap too
390         //First, find where this node is..in the heap
391         /*for(j=0;j<heapSize;j++)
392           if(heap[j].node == i)
393           break;
394           if(j==heapSize)
395           CmiAbort("Some error in heap...\n");*/
396         increaseKey(heap,heapMapping[i],wt);
397       }
398     }
399                  
400     /*Phase2: ASSIGNING partition to processor*/
401                 
402     //Special case
403     if(heapSize == partgraph->n_nodes-1){ //Assign max comm partition to 0th proc in the topology
404       proc_mapping[max.node]=0;
405       assigned_procs[0]=1;
406       continue;
407     }
408                 
409     m=0;
410
411     comm_cnt=0;
412
413     double min_cost=-1;
414     int min_cost_index=-1;
415     double cost=0;
416     int p=0;
417     //int q=0;
418
419     for(k=0;k<partgraph->n_nodes;k++){
420       if(!inHeap[k] && partgraph->edges[k][max.node]){
421         commParts[comm_cnt]=k;
422         comm_cnt++;
423       }
424     }
425
426     //Optimized the loop by commenting out the get_hop_count code and getting all the hop counts initially
427     for(m=0;m<count;m++){
428       if(!assigned_procs[m]){
429         cost=0;
430         for(p=0;p<comm_cnt;p++){
431           //if(!hopCount[proc_mapping[commParts[p]]][m])
432           //hopCount[proc_mapping[commParts[p]]][m]=topo->get_hop_count(proc_mapping[commParts[p]],m);
433           cost += hopCount[proc_mapping[commParts[p]]][m]*partgraph->edges[commParts[p]][max.node];
434         }
435         if(min_cost==-1 || cost<min_cost){
436           min_cost=cost;
437           min_cost_index=m;
438         }
439       }
440     }
441
442     proc_mapping[max.node]=min_cost_index;
443     assigned_procs[min_cost_index]=1;
444   }
445
446   //clear up memory
447   delete[] inHeap;
448   delete[] keys;
449   delete[] assigned_procs;
450   delete[] heap;
451   delete[] commParts;
452 }
453
454
455 void TopoCentLB :: work(LDStats *stats)
456 {
457   int proc;
458   int i,j;
459   int n_pes = stats->nprocs();
460         
461   if (_lb_args.debug() >= 2) {
462     CkPrintf("In TopoCentLB Strategy...\n");
463   }
464   
465   // Make sure that there is at least one available processor.
466   for (proc = 0; proc < n_pes; proc++) {
467     if (stats->procs[proc].available) {
468       break;
469     }
470   }
471
472   if (proc == n_pes) {
473     CmiAbort ("TopoCentLB: no available processors!");
474   }
475
476   
477   removeNonMigratable(stats, n_pes);
478   int *newmap = new int[stats->n_objs];
479
480
481   if(make_mapping)
482     computePartitions(stats, n_pes, newmap);
483   else {
484     //mapping taken from previous algo
485     for(i=0;i<stats->n_objs;i++) {
486       newmap[i]=stats->from_proc[i];
487     }
488   }
489
490   //Debugging Code
491   if(_lb_args.debug() >=2){
492     CkPrintf("Map obtained from partitioning:\n");
493     for(i=0;i<stats->n_objs;i++)
494       CkPrintf(" %d,%d ",i,newmap[i]);
495   }
496
497   int max_objs = findMaxObjs(newmap,stats->n_objs, n_pes);
498         
499   partgraph = new PartGraph(n_pes, max_objs);
500
501   //Fill up the partition graph - first fill the nodes and then, the edges
502
503   for(i=0;i<stats->n_objs;i++)
504     {
505       PartGraph::Node* n = &partgraph->nodes[newmap[i]];
506       n->obj_list[n->num_objs]=i;
507       n->num_objs++;
508     }
509
510   int *addedComm=new int[n_pes];
511   
512   stats->makeCommHash();
513   
514   int max_comm_part=-1;
515         
516   double max_comm=0;
517
518   //Try putting random amount of communication on the partition graph edges to see if things work fine
519   //This also checks the running time of the algorithm since number of edges is high than in a practical scenario
520 #ifdef RAND_COMM
521   for(i = 0; i < n_pes; i++) {
522     for(j = i+1; j < n_pes; j++) {
523       int val;
524       if(rand()%5==0)
525         val=0;
526       else
527         val= rand()%1000;
528                                 
529       partgraph->edges[i][j] = val;
530       partgraph->edges[j][i] = val;
531                         
532       partgraph->nodes[i].comm += val;
533       partgraph->nodes[j].comm += val;
534                         
535       if(partgraph->nodes[i].comm > max_comm){
536         max_comm = partgraph->nodes[i].comm;
537         max_comm_part = i;
538       }
539       if(partgraph->nodes[j].comm > max_comm){
540         max_comm = partgraph->nodes[j].comm;
541         max_comm_part = j;
542       }
543     }
544   }
545 #else
546   //Adding communication to the partition graph edges
547   for(i=0;i<stats->n_comm;i++)
548     {
549       //DO I consider other comm too....i.e. to or from a processor
550       LDCommData &cdata = stats->commData[i];
551       if(!cdata.from_proc() && cdata.receiver.get_type() == LD_OBJ_MSG){
552         int senderID = stats->getHash(cdata.sender);
553         int recverID = stats->getHash(cdata.receiver.get_destObj());
554         CmiAssert(senderID < stats->n_objs);
555         CmiAssert(recverID < stats->n_objs);
556                 
557         if(newmap[senderID]==newmap[recverID])
558           continue;
559         
560         if(partgraph->edges[newmap[senderID]][newmap[recverID]] == 0){
561           partgraph->nodes[newmap[senderID]].degree++;
562           partgraph->nodes[newmap[recverID]].degree++;
563         }
564                 
565         partgraph->edges[newmap[senderID]][newmap[recverID]] += cdata.bytes;
566         partgraph->edges[newmap[recverID]][newmap[senderID]] += cdata.bytes;
567                         
568         partgraph->nodes[newmap[senderID]].comm += cdata.bytes;
569         partgraph->nodes[newmap[recverID]].comm += cdata.bytes;
570
571         //Keeping track of maximum communiacting partition
572         if(partgraph->nodes[newmap[senderID]].comm > max_comm){
573           max_comm = partgraph->nodes[newmap[senderID]].comm;
574           max_comm_part = newmap[senderID];
575         }
576         if(partgraph->nodes[newmap[recverID]].comm > max_comm){
577           max_comm = partgraph->nodes[newmap[recverID]].comm;
578           max_comm_part = newmap[recverID];
579         }
580       }
581       else if(cdata.receiver.get_type() == LD_OBJLIST_MSG) {
582         int nobjs;
583         LDObjKey *objs = cdata.receiver.get_destObjs(nobjs);
584         int senderID = stats->getHash(cdata.sender);
585         for(j = 0; j < n_pes; j++)
586           addedComm[j]=0;
587         for (j=0; j<nobjs; j++) {
588           int recverID = stats->getHash(objs[j]);
589           if((senderID == -1)||(recverID == -1))
590             if (_lb_args.migObjOnly()) continue;
591             else CkAbort("Error in search\n");
592                                         
593           if(newmap[senderID]==newmap[recverID])
594             continue;
595         
596           if(partgraph->edges[newmap[senderID]][newmap[recverID]] == 0){
597             partgraph->nodes[newmap[senderID]].degree++;
598             partgraph->nodes[newmap[recverID]].degree++;
599           }
600
601           //Communication added only once for a message sent to many objects on a single processor
602           if(!addedComm[newmap[recverID]]){
603             partgraph->edges[newmap[senderID]][newmap[recverID]] += cdata.bytes;
604             partgraph->edges[newmap[recverID]][newmap[senderID]] += cdata.bytes;
605         
606             partgraph->nodes[newmap[senderID]].comm += cdata.bytes;
607             partgraph->nodes[newmap[recverID]].comm += cdata.bytes;
608
609             if(partgraph->nodes[newmap[senderID]].comm > max_comm){
610               max_comm = partgraph->nodes[newmap[senderID]].comm;
611               max_comm_part = newmap[senderID];
612             }
613             if(partgraph->nodes[newmap[recverID]].comm > max_comm){
614               max_comm = partgraph->nodes[newmap[recverID]].comm;
615               max_comm_part = newmap[recverID];
616             }
617             //bytesComm[newmap[senderID]][newmap[recverID]] += cdata.bytes;
618             //bytesComm[newmap[recverID]][newmap[senderID]] += cdata.bytes;
619             addedComm[newmap[recverID]]=1;
620           }
621         }
622       }
623
624     }
625 #endif
626         
627   int *proc_mapping = new int[n_pes];
628         
629   delete [] addedComm;
630                 
631   LBtopoFn topofn;
632
633   //Parsing the command line input for getting the processor topology
634   char *lbcopy = strdup(_lbtopo);
635   char *ptr = strchr(lbcopy, ':');
636   if (ptr!=NULL)
637     ptr = strtok(lbcopy, ":");
638   else
639     ptr=lbcopy;
640
641   topofn = LBTopoLookup(ptr);
642   if (topofn == NULL) {
643     char str[1024];
644     CmiPrintf("TopoCentLB> Fatal error: Unknown topology: %s. Choose from:\n", ptr);
645     printoutTopo();
646     sprintf(str, "TopoCentLB> Fatal error: Unknown topology: %s", ptr);
647     CmiAbort(str);
648   }
649   
650   topo = topofn(n_pes);
651
652   //Call the core routine to produce the partition processor mapping
653   calculateMST(partgraph,topo,proc_mapping,max_comm_part);
654   //Returned partition graph is a Maximum Spanning Tree -- converted in above function itself
655
656   //Debugging code: Result of mapping partition graph onto processor graph
657   if (_lb_args.debug()>1) {
658     CkPrintf("Resultant mapping..(partition,processor)\n");
659     for(i = 0; i < n_pes; i++)
660       CkPrintf("%d,%d\n",i,proc_mapping[i]);
661   }
662
663   //Store the result in the load balancing database
664   int pe;
665   PartGraph::Node* n;
666   for(i = 0; i < n_pes; i++){
667     pe = proc_mapping[i];
668     n = &partgraph->nodes[i];
669     for(j=0;j<n->num_objs;j++){
670       stats->to_proc[n->obj_list[j]] = pe;
671       if (_lb_args.debug()>1) 
672         CkPrintf("[%d] Obj %d migrating from %d to %d\n", CkMyPe(),n->obj_list[j],stats->from_proc[n->obj_list[j]],pe);
673     }
674   }
675
676   delete[] newmap;
677   delete[] proc_mapping;
678   //Delete hopCount
679   for(i = 0; i < n_pes; i++)
680     delete[] hopCount[i];
681
682   delete[] hopCount;
683   delete[] heapMapping;
684         
685   delete partgraph;
686 }
687
688 #include "TopoCentLB.def.h"