a31f532bd4c38d8b45680a99acda6e9df4070268
[charm.git] / tests / charm++ / load_balancing / lb_test / lb_test.C
1 /** \file lb_test.C
2  *  Load-balancing test program:
3  *  Orion Sky Lawlor, 1999/10/19
4  *
5  *  Added more complex comm patterns
6  *  Robert Brunner, 1999/11/03
7  *
8  *  Updated by Gengbin Zheng
9  *
10  *  Cleaned up to be up to date with current load balancing framework
11  *  Abhinav Bhatele, 2010/11/26
12  */
13
14 #include <stdio.h>
15 #include <math.h>
16 #include "charm++.h"
17 #include "Topo.h"
18
19 #include "lb_test.decl.h"
20
21 #if defined(_WIN32)
22 #define strcasecmp stricmp
23 #endif
24
25 /*readonly*/ CProxy_main mainProxy;
26 /*readonly*/ CkGroupID topoid;
27 /*readonly*/ CProxy_Lb_array lbproxy;
28 /*readonly*/ int element_count;
29 /*readonly*/ int step_count, print_count;
30 /*readonly*/ int min_us, max_us;
31 /*readonly*/ int n_loadbalance;
32
33 #define N_LOADBALANCE 500 /*Times around ring until we load balance*/
34 #define DEBUGF(x)       // CmiPrintf x
35
36 #define DYNAMIC_IMBALANCE 1
37
38 int specialTracing = 0;
39
40 void initialize()
41 {
42   if (traceIsOn() == 0) {
43     if (CkMyPe() == 0)
44     CkPrintf("traceprojections was off at initial time.\n");
45     specialTracing = 1;
46   }
47 }
48
49 class HiMsg : public CMessage_HiMsg {
50 public:
51   int length;
52   int chksum;
53   int refnum;
54   char* data;
55 };
56
57 class main : public CBase_main {
58 public:
59   main(CkMigrateMessage *m) {}
60   main(CkArgMsg* m);
61
62   void maindone(void) {
63     CkPrintf("All done\n");
64     CkExit();
65   };
66         void resume(void){
67                 CkPrintf("Resuming...\n");
68                 lbproxy.ForwardMessages();
69         };
70
71
72 private:
73   void arg_error(char* argv0);
74 };
75
76 static void programBegin(void *dummy,int size,void *data)
77 {
78   // Start computing
79   lbproxy.ForwardMessages();
80 }
81
82 main::main(CkArgMsg *m) 
83 {
84   char *topology;       // String name for communication topology
85   int cur_arg = 1;
86
87   if (m->argc > cur_arg)
88     element_count=atoi(m->argv[cur_arg++]);
89   else arg_error(m->argv[0]);
90
91   if (m->argc > cur_arg)
92     step_count=atoi(m->argv[cur_arg++]);
93   else arg_error(m->argv[0]);
94   
95   if (m->argc > cur_arg)
96     print_count=atoi(m->argv[cur_arg++]);
97   else arg_error(m->argv[0]);
98   
99   if (m->argc > cur_arg)
100     n_loadbalance=atoi(m->argv[cur_arg++]);
101   else arg_error(m->argv[0]);
102
103   if (m->argc > cur_arg)
104     min_us=atoi(m->argv[cur_arg++]);
105   else arg_error(m->argv[0]);
106
107   if (m->argc > cur_arg)
108     max_us=atoi(m->argv[cur_arg++]);
109   else arg_error(m->argv[0]);
110
111   if (m->argc > cur_arg)
112     topology=m->argv[cur_arg++];
113   else arg_error(m->argv[0]);
114
115   CkPrintf("Running lb_test on %d processors with %d elements\n", CkNumPes(), element_count);
116   CkPrintf("Print every %d steps\n", print_count);
117   CkPrintf("Sync every %d steps\n", n_loadbalance);
118   CkPrintf("First node busywaits %d usec; last node busywaits %d usec\n\n", min_us, max_us);
119
120   mainProxy = thisProxy;
121
122   topoid = Topo::Create(element_count,topology,min_us,max_us);
123   if (topoid.isZero())
124     CkAbort("ERROR! Topology not found!  \n");
125
126         // TODO: this code looks wrong, since reduction client is set AFTER array creation,
127         // which, according to Charm++ manual, should be done BEFORE array is created
128   lbproxy = CProxy_Lb_array::ckNew(element_count);
129   lbproxy.setReductionClient(programBegin, NULL);
130 }
131
132 void main::arg_error(char* argv0)
133 {
134   CkPrintf("Usage: %s \n"
135     "<elements> <steps> <print-freq> <lb-freq> <min-dur us> <max-dur us>\n"
136     "<topology>\n", argv0);
137
138   int topoNo = 0;
139   CkPrintf("<topology> is the object connection topology:\n");
140   while (TopoTable[topoNo].name) {
141     CkPrintf("  %s\n",TopoTable[topoNo].desc);
142     topoNo++;
143   }
144
145   CmiPrintf("\n"
146            "The program creates a ring of element_count array elements,\n"
147            "which all compute and send to their neighbor.\n"
148            "Computation proceeds across the entire ring simultaniously.\n"
149            "Orion Sky Lawlor, olawlor@acm.org, PPL, 10/14/1999\n");
150   CmiAbort("Abort!");
151 }
152
153 class Lb_array : public CBase_Lb_array {
154 public:
155   Lb_array(void) {
156     // CkPrintf("Element %d created\n", thisIndex);
157
158     // Find out who to send to, and how many to receive
159     TopoMap = CProxy_Topo::ckLocalBranch(topoid);
160     send_count = TopoMap->SendCount(thisIndex);
161     send_to = new Topo::MsgInfo[send_count];
162     TopoMap->SendTo(thisIndex,send_to);
163     recv_count = TopoMap->RecvCount(thisIndex)+1;
164     
165     // Benchmark the work function
166     work_per_sec = CalibrateWork();
167
168     // Create massive load imbalance by making load
169     // linear in processor number.
170     usec = (int)TopoMap->Work(thisIndex);
171     DEBUGF(("Element %d working for %d ms\n",thisIndex,usec));
172
173     // msec=meanms+(devms-meanms)*thisIndex/(element_count-1);
174
175     // Initialize some more variables
176     nTimes=0;
177     sendTime=0;
178     // lastTime=CmiWallTimer();
179     n_received = 0;
180     resumed = 1;
181     busywork = (int)(usec*1e-6*work_per_sec);
182     
183     int i;
184     for(i=0; i < future_bufsz; i++)
185       future_receives[i]=0;
186         
187     usesAtSync=CmiTrue;
188
189     contribute(sizeof(i), &i, CkReduction::sum_int);
190   }
191
192   //Packing/migration utilities
193   Lb_array(CkMigrateMessage *m) {
194     DEBUGF(("Migrated element %d to processor %d\n",thisIndex,CkMyPe()));
195     TopoMap = CProxy_Topo::ckLocalBranch(topoid);
196     // Find out who to send to, and how many to receive
197     send_count = TopoMap->SendCount(thisIndex);
198     send_to = new Topo::MsgInfo[send_count];
199     TopoMap->SendTo(thisIndex,send_to);
200     recv_count = TopoMap->RecvCount(thisIndex)+1;
201     resumed = 0;
202     lastTime = CmiWallTimer();
203   }
204
205   virtual void pup(PUP::er &p)
206   {
207      ArrayElement1D::pup(p);            // pack our superclass
208      p(nTimes);
209      p(sendTime);
210      p(usec);
211      // p(lastTime);
212      p(work_per_sec);
213      p(busywork);
214      p(n_received);
215      p(future_receives,future_bufsz);
216
217     if(p.isSizing()) {
218       PUP::sizer *sizep = (PUP::sizer *)&p;
219       int pupsize = sizep->size();
220       // if(thisIndex == 0)
221       // CkPrintf("PUP::sizer shows a size of %d bytes\n", pupsize);
222     }
223   }
224
225   void Compute(HiMsg *m) { 
226     // Perform computation
227     if (m->refnum > nTimes) {
228       // CkPrintf("[%d] Future message received %d %d\n", thisIndex,nTimes,m->refnum);
229       int future_indx = m->refnum - nTimes - 1;
230       if (future_indx >= future_bufsz) {
231         CkPrintf("[%d] future_indx is too far in the future %d, expecting %d, got %d\n",
232                  thisIndex,future_indx,nTimes,m->refnum);
233         thisProxy[thisIndex].Compute(m);
234       } else {
235         future_receives[future_indx]++;
236         delete m;
237       }
238     } else if (m->refnum < nTimes) {
239       CkPrintf("[%d] Stale message received %d %d\n",
240                thisIndex,nTimes,m->refnum);
241       delete m;
242     } else {
243       n_received++;
244
245       // CkPrintf("[%d] %d n_received=%d of %d\n",
246       //               CkMyPe(),thisIndex,n_received,recv_count);
247       if (n_received == recv_count) {
248         // CkPrintf("[%d] %d computing %d\n",CkMyPe(),thisIndex,nTimes);
249
250         if (nTimes && (nTimes % print_count == 0) ) {
251           // Write out the current time
252           if (thisIndex == 1) {
253             double now = CmiWallTimer();
254             CkPrintf("TIME PER STEP\t%d\t%lf\t%lf\n", nTimes, now, now-lastTime);
255             lastTime = now;
256           }
257         }
258
259         n_received = future_receives[0];
260
261         // Move all the future_receives down one slot
262         int i;
263         for(i=1;i<future_bufsz;i++)
264           future_receives[i-1] = future_receives[i];
265         future_receives[future_bufsz-1] = 0;
266
267         nTimes++;//Increment our "times around" 
268
269         double startTime = CmiWallTimer();
270         // First check contents of message
271         //     int chksum = 0;
272         //     for(int i=0; i < m->length; i++)
273         //       chksum += m->data[i];
274       
275         //     if (chksum != m->chksum)
276         //       CkPrintf("Checksum mismatch! %d %d\n",chksum,m->chksum);
277
278         //Do Computation:
279         work(busywork,&result);
280         
281         int loadbalancing = 0;
282         if (nTimes == step_count) {
283           // We're done-- send a message to main telling it to die
284           CkCallback cb(CkIndex_main::maindone(), mainProxy);
285           contribute(0, NULL, CkReduction::sum_int, cb);
286         } else if (nTimes % n_loadbalance == 0) {
287           if (specialTracing) {
288             if (nTimes/n_loadbalance == 1) traceBegin();
289             if (nTimes/n_loadbalance == 3) traceEnd();
290           }
291           // We're not done yet...
292           // Either load balance, or send a message to the next guy
293           DEBUGF(("Element %d AtSync on PE %d\n",thisIndex,CkMyPe()));
294 #if 0
295           AtSync();
296 #else
297           CkCallback cb(CkIndex_Lb_array::pause(), thisProxy);
298           contribute(0, NULL, CkReduction::sum_int, cb);
299 #endif
300           loadbalancing = 1;
301         } 
302 #if DYNAMIC_IMBALANCE
303         else if(nTimes > n_loadbalance && (nTimes-(n_loadbalance/2)) % n_loadbalance == 0) {
304                 printf("Here at %d\n",nTimes);
305                 contribute(CkCallback(CkIndex_Topo::shuffleLoad(),topoid));
306         } 
307 #endif
308         else ForwardMessages();
309       }
310       delete m;
311     }
312   }
313
314   void ResumeFromSync(void) { //Called by Load-balancing framework
315 #if 0
316     resumed = 1;
317     DEBUGF(("Element %d resumeFromSync on PE %d\n",thisIndex,CkMyPe()));
318     thisProxy[thisIndex].ForwardMessages();
319 #else
320     CkCallback cb(CkIndex_Lb_array::restart(), thisProxy);
321     contribute(0, NULL, CkReduction::sum_int, cb);
322 #endif
323   }
324
325   void pause() {
326     AtSync();
327   }
328
329   void restart() {
330     resumed = 1;
331     lastTime = CmiWallTimer();
332     DEBUGF(("Element %d resumeFromSync on PE %d\n",thisIndex,CkMyPe()));
333     thisProxy[thisIndex].ForwardMessages();
334   }
335
336   void ForwardMessages(void) { // Pass it on
337     if(sendTime == 0) lastTime = CmiWallTimer();
338
339     if (resumed != 1)
340       CkPrintf("[%d] %d forwarding %d %d %d\n",CkMyPe(),thisIndex,
341                sendTime,nTimes,resumed);
342     for(int s=0; s < send_count; s++) {
343       int msgbytes = send_to[s].bytes;
344       if (msgbytes != 1000)
345         CkPrintf("[%d] %d forwarding %d bytes (%d,%d,%p) obj %p to %d\n",
346                  CkMyPe(),thisIndex,msgbytes,s,send_count,send_to,
347                  this,send_to[s].obj);
348       HiMsg* msg = new(msgbytes,0) HiMsg;
349       msg->length = msgbytes;
350       //      msg->chksum = 0;
351       //      for(int i=0; i < msgbytes; i++) {
352       //        msg->data[i] = i;
353       //        msg->chksum += msg->data[i];
354       //      }
355       msg->refnum = sendTime;
356
357       //      CkPrintf("[%d] %d sending to %d at %d:%d\n",
358       //               CkMyPe(),thisIndex,send_to[s].obj,nTimes,nCycles);
359       thisProxy[send_to[s].obj].Compute(msg);
360     }
361     int mybytes=1;
362     HiMsg* msg = new(mybytes,0) HiMsg;
363     msg->length = mybytes;
364     msg->refnum = sendTime;
365     thisProxy[thisIndex].Compute(msg);
366
367     sendTime++;
368   }
369
370 private:
371   int CalibrateWork() {
372     static int calibrated=-1;
373
374     if (calibrated != -1) return calibrated;
375     const double calTime=0.05; //Time to spend in calibration
376     double wps = 0;
377     // First, count how many iterations for 1 second.
378     // Since we are doing lots of function calls, this will be rough
379     const double end_time = CmiWallTimer()+calTime;
380     wps = 0;
381     while(CmiWallTimer() < end_time) {
382       work(100,&result);
383       wps+=100;
384     }
385
386     // Now we have a rough idea of how many iterations there are per
387     // second, so just perform a few cycles of correction by
388     // running for what we think is 1 second.  Then correct
389     // the number of iterations per second to make it closer
390     // to the correct value
391
392 #if 0
393     CkPrintf("[%d] Iter  %.0f per second\n",CmiMyPe(), wps);
394 #endif
395     for(int i=0; i < 2; i++) {
396       const double start_time = CmiWallTimer();
397       work((int)wps,&result);
398       const double end_time = CmiWallTimer();
399       const double correction = calTime / (end_time-start_time);
400       wps *= correction;
401 #if 0
402       CkPrintf("Iter %d -> %.0f per second\n",i,wps);
403 #endif
404     }
405     calibrated = (int)(wps/calTime);
406     if (CkMyPe() == 0) CkPrintf("calibrated iterations %d\n",calibrated);
407     return calibrated;
408   };
409
410   void work(int iter_block,int* _result) {
411     *_result=0;
412     for(int i=0; i < iter_block; i++) {
413       *_result=(int)(sqrt(1+cos(*_result*1.57)));
414     }
415   };
416
417 public:
418   enum { future_bufsz = 50 };
419
420 private:
421   int nTimes;           // Number of times I've been called
422   int sendTime;         // Step number for sending (in case I finish receiving
423                         // before sending
424   int usec;             // Milliseconds to "compute"
425   double lastTime;      // Last time recorded
426   int work_per_sec;
427   int busywork;
428   int result;
429
430   Topo* TopoMap;
431   int send_count;
432   int recv_count;
433   Topo::MsgInfo* send_to;
434   int n_received;
435   int future_receives[future_bufsz];
436   int resumed;
437 };
438
439 #include "lb_test.def.h"
440