Fixing a race condition that occurs when redistributing the data. A new assertion...
[charm.git] / src / ck-cp / arrayRedistributor.h
1 /** 
2
3     A system for exposing application and runtime "control points" 
4     to the dynamic optimization framework.
5
6 */
7 #ifndef __ARRAYREDISTRIBUTOR_H__
8 #define __ARRAYREDISTRIBUTOR_H__
9
10 #include <vector>
11 #include <list>
12 #include <map>
13 #include <cmath>
14 #include "ControlPoints.decl.h"
15
16 #include<pup_stl.h>
17
18
19
20 /**
21  * \addtogroup ControlPointFramework
22  *   @{
23  */
24
25
26 /// A message containing a chunk of a data array used when redistributing to a different set of active chares 
27 class redistributor2DMsg : public CMessage_redistributor2DMsg { 
28  public: 
29   int top;         
30   int left; 
31   int height;      
32   int width; 
33   int new_chare_cols; 
34   int  new_chare_rows; 
35   int which_array; 
36   double *data;
37 }; 
38  
39
40
41 /// Integer Maximum
42 static int maxi(int a, int b){
43   if(a>b)
44     return a;
45   else
46     return b;
47 }
48
49 /// Integer Minimum
50 static int mini(int a, int b){
51   if(a<b)
52     return a;
53   else
54     return b;
55 }
56
57
58 /// A chare group that can redistribute user data arrays. It is used by binding it to a user's Chare Array
59 class redistributor2D: public CBase_redistributor2D {
60  public:
61
62   std::map<int,double*> data_arrays;
63   std::map<int,int> data_arrays_sizes;
64
65   /// The array associated with this data redistribution
66   CProxyElement_ArrayElement associatedArray;
67   
68   int incoming_count;
69   std::map<int,double*> data_arrays_incoming;
70   std::map<int,int> data_arrays_incoming_sizes;
71
72   /// Is this array element active
73   bool thisElemActive;
74
75   bool resizeGranulesHasBeenCalled;
76
77   CkVec<redistributor2DMsg *> bufferedMsgs;
78
79  private:
80
81
82   void *fakeMemoryUsage;
83
84
85   CkCallback dataRedistributedCallback;
86
87   int x_chares; // number of active chares in x dimension
88   int y_chares; // number of active chares in y dimension
89
90   int data_width;  // The width of the global array, not the local piece
91   int data_height; // The height of the global array, not the local piece
92
93   int data_x_ghost; // The padding in the x dimension on each side of the data
94   int data_y_ghost; // The padding in the y dimension on each side of the data
95
96
97  public:
98
99   void pup(PUP::er &p) {
100     CBase_redistributor2D::pup(p);
101
102     p | data_arrays_sizes;
103     p | data_arrays_incoming_sizes;
104     p | incoming_count;
105     p | associatedArray;
106     p | thisElemActive;
107
108     p | dataRedistributedCallback;
109
110     p | resizeGranulesHasBeenCalled;
111
112     p | x_chares;
113     p | y_chares;
114     p | data_width;
115     p | data_height;
116     p | data_x_ghost;
117     p | data_y_ghost;
118
119
120     CkAssert(bufferedMsgs.size() == 0);
121
122     if(p.isPacking() && fakeMemoryUsage!=NULL)
123       free(fakeMemoryUsage);
124
125     fakeMemoryUsage = NULL;
126
127     ////////////////////////////////
128     // when packing, iterate through data_arrays
129     // when unpacking
130
131     {
132       std::map<int,int>::iterator iter;
133       for(iter = data_arrays_sizes.begin(); iter != data_arrays_sizes.end(); iter++){
134         int whichArray = iter->first;
135         int arraySize = iter->second;
136
137         //      CkPrintf("Pupping data array %d\n",whichArray);
138         p | whichArray;
139
140         if(p.isUnpacking())
141           data_arrays[whichArray] = new double[arraySize];
142
143         PUParray(p,data_arrays[whichArray] ,arraySize);
144         
145         if(p.isPacking())
146           delete[] data_arrays[whichArray];
147         
148       }
149     }
150
151
152     ///////////////////////////////
153     {
154       std::map<int,int>::iterator iter;
155       for(iter = data_arrays_incoming_sizes.begin(); iter != data_arrays_incoming_sizes.end(); iter++){
156         int whichArray = iter->first;
157         int arraySize = iter->second;
158
159         //      CkPrintf("Pupping incoming array %d\n",whichArray);
160         p | whichArray;
161
162         if(p.isUnpacking() && data_arrays_incoming_sizes[whichArray] > 0)
163           data_arrays_incoming[whichArray] = new double[arraySize];
164         
165         PUParray(p,data_arrays_incoming[whichArray],arraySize);
166         
167         if(p.isPacking())
168           delete[] data_arrays_incoming[whichArray];
169         
170       }
171     }
172
173     //    CkPrintf("pup redistributor2D\n");
174
175   } 
176
177
178   void ckJustMigrated(){
179     //   CkPrintf("redistributor element %02d %02d migrated to %d", thisIndex.x, thisIndex.y, CkMyPe());
180   }
181
182
183   // ------------ Some routines for computing the array bounds for this chare  ------------ 
184
185   // The index in the global array for my top row  
186   int top_data_idx();
187  
188   int bottom_data_idx();
189  
190   int left_data_idx();
191  
192   int right_data_idx();
193  
194   int top_neighbor();
195    
196   int bottom_neighbor();
197    
198   int left_neighbor();
199  
200   int right_neighbor();
201   
202   
203   /// the width of the non-ghost part of the local partition 
204   int mywidth();
205     
206     
207   // the height of the non-ghost part of the local partition 
208   int myheight();
209     
210
211
212   // ------------ Some routines for computing the array bounds for arbitrary chares  ------------ 
213
214   int top_data_idx(int y, int y_total){
215     return (data_height * y) / y_total;
216   }
217
218   int bottom_data_idx(int y, int y_total){
219     return ((data_height * (y+1)) / y_total) - 1;
220   }
221
222   int left_data_idx(int x, int x_total){
223     return (data_width * x) / x_total;
224   }
225
226   int right_data_idx(int x, int x_total){
227     return ((data_width * (x+1)) / x_total) - 1;
228   }
229
230
231   int top_data_idx(int y){
232     return (data_height * y) / y_chares;
233   }
234
235   int bottom_data_idx(int y){
236     return ((data_height * (y+1)) / y_chares) - 1;
237   }
238
239   int left_data_idx(int x){
240     return (data_width * x) / x_chares;
241   }
242
243   int right_data_idx(int x){
244     return ((data_width * (x+1)) / x_chares) - 1;
245   }
246
247   /// Return which chare array element(x index) owns the global data item i
248   int who_owns_idx_x(int i){
249     int w=0;
250     while(1){
251       if( i >= left_data_idx(w) && i <= right_data_idx(w) ){
252         return w;
253       }
254       w++;
255     }
256   }
257   
258   /// Return which chare array element(y index) owns the global data item i
259   int who_owns_idx_y(int i){
260     int w=0;
261     while(1){
262       if( i >= top_data_idx(w) && i <= bottom_data_idx(w) ){
263         return w;
264       }
265       w++;
266     }
267   }
268   
269
270
271
272   
273   // Convert a local column,row id (0 to mywidth()-1, 0 to myheight()-1) to the index in the padded array
274   int local_to_padded(int x, int y){
275     CkAssert(thisElemActive);
276     CkAssert(x < (mywidth()+data_x_ghost) && x >= (0-data_x_ghost) && y < (myheight()+data_y_ghost) && y >= (0-data_y_ghost) );
277     return (mywidth()+2*data_x_ghost)*(y+data_y_ghost)+x+data_x_ghost;
278   }
279
280   // get a data value
281   double data_local(int which, int x, int y){
282     CkAssert(local_to_padded(x,y) < data_arrays_sizes[which]);
283     return data_arrays[which][local_to_padded(x,y)];
284   }
285
286
287   // Convert a local column id (0 to mywidth-1) to the global column id (0 to data_width-1)
288   int local_to_global_x(int x){
289     return left_data_idx() + x;
290   }
291
292   // Convert a local row id (0 to myheight-1) to the global row id (0 to data_height-1)
293   int local_to_global_y(int y){
294     return top_data_idx() + y;
295   }
296
297   int global_array_width(){
298     return data_width;
299   }
300
301   int global_array_height(){
302     return data_height;
303   }
304
305   int global_array_size(){
306     return global_array_width() * global_array_height();
307   }
308
309   int my_array_width(){
310     return mywidth()+2*data_x_ghost;
311   }
312
313   int my_array_height(){
314     return myheight()+2*data_y_ghost;
315   }
316
317   // Total size of arrays including ghost layers
318   int my_array_size(){
319     return my_array_width() * my_array_height();
320   }
321
322   /// Create an array. If multiple arrays are needed, each should have its own index
323   template <typename t> t* createDataArray(int which=0) {
324     t* data = new t[my_array_size()];
325     data_arrays[which] = data;
326     data_arrays_sizes[which] = my_array_size();
327
328     if(thisIndex.x==0 && thisIndex.y==0)  
329       CkPrintf("data_arrays_sizes[which] set to %d\n", data_arrays_sizes[which] );  
330
331
332     CkAssert(data_arrays[which] != NULL);
333 #if DEBUG > 2
334     CkPrintf("Allocated array of size %d at %p\n", my_array_size(), data_arrays[which] );
335 #endif
336     return data;
337   }
338   
339   template <typename t> t* getDataArray(int which=0) {
340     return data_arrays[which]; 
341   }
342
343   /// Constructor takes in the dimensions of the array, including any desired ghost layers
344   /// The local part of the arrays will have (mywidth+x_ghosts*2)*(myheight+y_ghosts*2) elements 
345   void setInitialDimensions(int width, int height, int x_chares_, int y_chares_, int x_ghosts=0, int y_ghosts=0){
346     data_width = width;      // These values cannot change after this method is called.
347     data_height = height;
348     data_x_ghost = x_ghosts;
349     data_y_ghost = y_ghosts;
350     
351     setDimensions(x_chares_, y_chares_);
352   }
353   
354
355   void setDimensions( int x_chares_, int y_chares_){
356     x_chares = x_chares_;
357     y_chares = y_chares_;
358     
359     
360     if( thisIndex.x < x_chares && thisIndex.y < y_chares ){
361       thisElemActive = true;
362     } else {
363       thisElemActive = false;
364     }
365     
366   }
367
368
369   redistributor2D(){
370     incoming_count = 0;
371     fakeMemoryUsage = NULL;
372     CkAssert(bufferedMsgs.size() == 0);
373   }
374
375
376   redistributor2D(CkMigrateMessage*){
377     CkAssert(bufferedMsgs.size() == 0);
378   }
379
380
381   void startup(){
382 #if DEBUG > 3 
383    CkPrintf("redistributor 2D startup %03d,%03d\n", thisIndex.x, thisIndex.y);
384 #endif
385     contribute();
386   }
387   
388
389   void printArrays(){
390 #if DEBUG > 2
391     CkAssert(data_arrays.size()==2);
392     for(std::map<int,double*>::iterator diter = data_arrays.begin(); diter != data_arrays.end(); diter++){
393       int which_array = diter->first;
394       double *data = diter->second;
395       CkPrintf("%d,%d data_arrays[%d] = %p\n", thisIndex.x, thisIndex.y, which_array, data);
396     }
397 #endif
398   }
399
400   
401   // Called on all elements involved with the new granularity or containing part of the old data
402   void resizeGranules(int new_active_chare_cols, int new_active_chare_rows){
403
404 #if DEBUG>1
405     CkPrintf("Resize Granules called for elem %d,%d\n", thisIndex.x, thisIndex.y);      
406 #endif
407
408     resizeGranulesHasBeenCalled = true;
409
410     const bool previouslyActive = thisElemActive;
411     const int old_top = top_data_idx();
412     const int old_left = left_data_idx();
413     const int old_bottom = top_data_idx()+myheight()-1;
414     const int old_right = left_data_idx()+mywidth()-1;
415     const int old_myheight = myheight();
416     const int old_mywidth = mywidth();
417
418     setDimensions(new_active_chare_cols, new_active_chare_rows); // update dimensions & thisElemActive
419     
420     const int new_mywidth = mywidth();
421     const int new_myheight = myheight();
422
423     // Transpose Data
424     // Assume only one new owner of my data
425
426     if(previouslyActive){
427      
428       // Send all my data to any blocks that will need it
429
430       int newOwnerXmin = who_owns_idx_x(old_left);
431       int newOwnerXmax = who_owns_idx_x(old_right);
432       int newOwnerYmin = who_owns_idx_y(old_top);
433       int newOwnerYmax = who_owns_idx_y(old_bottom);
434
435       for(int newx=newOwnerXmin; newx<=newOwnerXmax; newx++){
436         for(int newy=newOwnerYmin; newy<=newOwnerYmax; newy++){
437           
438           // Determine overlapping region between my data and this destination
439 #if DEBUG > 2
440           CkPrintf("newy(%d)*new_myheight(%d)=%d, old_top=%d\n",newy,new_myheight,newy*new_myheight,old_top);
441 #endif
442           // global range for overlapping area
443           int global_top = maxi(top_data_idx(newy),old_top);
444           int global_left = maxi(left_data_idx(newx),old_left);
445           int global_bottom = mini(bottom_data_idx(newy),old_bottom);
446           int global_right = mini(right_data_idx(newx),old_right);
447           int w = global_right-global_left+1;
448           int h = global_bottom-global_top+1;
449          
450           CkAssert(w*h>0);
451
452           int x_offset = global_left - old_left;
453           int y_offset = global_top - old_top;
454
455 #if DEBUG > 2     
456           CkPrintf("w=%d h=%d x_offset=%d y_offset=%d\n", w, h, x_offset, y_offset);
457 #endif
458           
459           std::map<int,double*>::iterator diter;
460           for(diter =data_arrays.begin(); diter != data_arrays.end(); diter++){
461             
462             redistributor2DMsg* msg = new(w*h) redistributor2DMsg;  
463             //      CkPrintf("Created message msg %p\n", msg);  
464             
465             int which_array = diter->first;
466             double *t = diter->second;
467             int s = data_arrays_sizes[which_array];
468             
469             for(int j=0; j<h; j++){
470               for(int i=0; i<w; i++){           
471                 CkAssert(j*w+i < w*h);
472                 CkAssert((data_x_ghost*2+old_mywidth)*(j+y_offset+data_y_ghost)+(i+ x_offset+data_x_ghost) < s);
473                 msg->data[j*w+i] = t[(data_x_ghost*2+old_mywidth)*(j+y_offset+data_y_ghost)+(i+ x_offset+data_x_ghost)];
474               }
475             }
476             
477             msg->top = global_top;
478             msg->left = global_left;
479             msg->height = h;
480             msg->width = w;
481             msg->new_chare_cols = new_active_chare_cols;
482             msg->new_chare_rows = new_active_chare_rows; 
483             msg->which_array = which_array;
484
485             //      CkPrintf("Sending message msg %p\n", msg);      
486             thisProxy(newx, newy).receiveTransposeData(msg);
487             
488           }
489           
490         }
491         
492         
493       }
494     } 
495     
496     if(!thisElemActive){
497 #if DEBUG > 2
498       CkPrintf("Element %d,%d is no longer active\n", thisIndex.x, thisIndex.y);
499 #endif
500
501       // Free my arrays
502       for(std::map<int,double*>::iterator diter = data_arrays.begin(); diter != data_arrays.end(); diter++){
503         int which_array = diter->first;
504         delete data_arrays[which_array]; 
505         data_arrays[which_array] = NULL;
506         data_arrays_sizes[which_array] = 0;
507       }
508       continueToNextStep();
509       
510     }
511
512     int newPe = (thisIndex.y * new_active_chare_cols + thisIndex.x) % CkNumPes();
513     
514     if(newPe == CkMyPe()){
515       //      CkPrintf("Keeping %02d , %02d on PE %d\n", thisIndex.x, thisIndex.y, newPe);
516     }
517     else{
518       // CkPrintf("Migrating %02d , %02d to PE %d\n", thisIndex.x, thisIndex.y, newPe);
519       //    migrateMe(newPe);
520     }
521     
522
523     // Call receiveTransposeData for any buffered messages.
524     int size = bufferedMsgs.size();
525     for(int i=0;i<size;i++){
526       redistributor2DMsg *msg = bufferedMsgs[i];
527       //      CkPrintf("Delivering buffered receiveTransposeData(msg=%p)\n", msg);
528       receiveTransposeData(msg); // this will delete the message
529     }
530     bufferedMsgs.removeAll();
531
532     
533   }
534   
535   
536   void continueToNextStep(){
537 #if DEBUG > 2
538     CkPrintf("Elem %d,%d is ready to continue\n", thisIndex.x, thisIndex.y);
539 #endif
540
541     resizeGranulesHasBeenCalled = false;
542
543     for(std::map<int,double*>::iterator diter =data_arrays.begin(); diter != data_arrays.end(); diter++){
544       int which_array = diter->first;
545       double *data = diter->second;
546       if( ! ((data==NULL && !thisElemActive) || (data!=NULL && thisElemActive) )){
547         CkPrintf("[%d] ERROR: ! ((data==NULL && !thisElemActive) || (data!=NULL && thisElemActive) )",CkMyPe());
548         CkPrintf("[%d] ERROR: data=%p thisElemActive=%d  (perhaps continueToNextStep was called too soon)\n",CkMyPe(), data, (int)thisElemActive );
549
550         CkAbort("ERROR");       
551       }
552     }
553     
554     
555 #if USE_EXTRAMEMORY
556 #error NO USE_EXTRAMEMORY ALLOWED YET
557     if(thisElemActive){
558
559       long totalArtificialMemory = controlPoint("Artificial Memory Usage", 100, 500);
560       long artificialMemoryPerChare = totalArtificialMemory *1024*1024 / x_chares / y_chares;
561       
562       CkPrintf("Allocating fake memory of %d MB (of the total %d MB) (xchares=%d y_chares=%d)\n", artificialMemoryPerChare/1024/1024, totalArtificialMemory, x_chares, y_chares);
563       free(fakeMemoryUsage);
564       fakeMemoryUsage = malloc(artificialMemoryPerChare);
565       CkAssert(fakeMemoryUsage != NULL);
566     } else {
567       free(fakeMemoryUsage);
568       fakeMemoryUsage = NULL;
569     }
570 #endif
571
572
573
574     incoming_count = 0; // prepare for future granularity change 
575     contribute();
576   }
577   
578   
579
580
581
582
583   
584   void receiveTransposeData(redistributor2DMsg *msg){
585     
586     // buffer this message until resizeGranules Has Been Called
587     if(!resizeGranulesHasBeenCalled){
588       bufferedMsgs.push_back(msg);
589       //      CkPrintf("Buffering receiveTransposeData(msg=%p)\n", msg);
590       return;
591     }
592     
593     CkAssert(resizeGranulesHasBeenCalled);
594     
595     int top_new = top_data_idx(thisIndex.y, msg->new_chare_rows);
596     int bottom_new = bottom_data_idx(thisIndex.y, msg->new_chare_rows);
597     int left_new = left_data_idx(thisIndex.x, msg->new_chare_cols);
598     int right_new = right_data_idx(thisIndex.x, msg->new_chare_cols);    
599
600     int new_height = bottom_new - top_new + 1;
601     int new_width = right_new - left_new + 1;
602
603     if(incoming_count == 0){
604       // Allocate new arrays 
605       std::map<int,double*>::iterator diter;
606       for(diter =data_arrays.begin(); diter != data_arrays.end(); diter++){
607         int w = diter->first;
608         data_arrays_incoming[w] = new double[(new_width+2*data_x_ghost)*(new_height+2*data_y_ghost)];
609         data_arrays_incoming_sizes[w] = (new_width+2*data_x_ghost)*(new_height+2*data_y_ghost);
610
611         //      CkPrintf("data_arrays_incoming_sizes[%d] set to %d\n", w, data_arrays_incoming_sizes[w] );  
612
613       }
614     }
615     
616     
617     // Copy values from the incoming array to the appropriate place in data_arrays_incoming
618     // Current top left of my new array
619
620
621     double *localData = data_arrays_incoming[msg->which_array];
622     int s = data_arrays_incoming_sizes[msg->which_array];
623
624     //    CkPrintf("%d,%d data_arrays_incoming.size() = %d\n", thisIndex.x, thisIndex.y, data_arrays_incoming.size() );
625     //    CkPrintf("msg->which_array=%d   localData=%p   s=%d\n", msg->which_array, localData, s);
626     CkAssert(localData != NULL);
627
628     for(int j=0; j<msg->height; j++){
629       for(int i=0; i<msg->width; i++){
630
631         if( (msg->top+j >= top_new) && (msg->top+j <= bottom_new) && (msg->left+i >= left_new) && (msg->left+i <= right_new) ) {
632           CkAssert(j*msg->width+i<msg->height*msg->width);
633           CkAssert((msg->top+j-top_new)*new_width+(msg->left+i-left_new) < new_width*new_height);
634           CkAssert((msg->top+j-top_new)*new_width+(msg->left+i-left_new) >= 0);
635           
636           CkAssert((msg->top+j-top_new+data_y_ghost)*(new_width+2*data_x_ghost)+(msg->left+i-left_new+data_x_ghost) < s);
637           localData[(msg->top+j-top_new+data_y_ghost)*(new_width+2*data_x_ghost)+(msg->left+i-left_new+data_x_ghost)] = msg->data[j*msg->width+i];
638           incoming_count++;
639           
640         }
641         
642       }
643     }
644     
645     //    CkPrintf("Deleting message msg %p\n", msg); 
646     delete msg;
647
648
649     if(incoming_count == new_height*new_width*data_arrays.size()){
650
651       std::map<int,double*>::iterator diter;
652       for(diter =data_arrays.begin(); diter != data_arrays.end(); diter++){
653         int w = diter->first;
654         delete[] data_arrays[w];
655         data_arrays[w] = data_arrays_incoming[w];
656         data_arrays_sizes[w] = data_arrays_incoming_sizes[w];
657         data_arrays_incoming[w] = NULL;
658         data_arrays_incoming_sizes[w] = 0;
659
660         //        if(thisIndex.x==0 && thisIndex.y==0)   
661           //          CkPrintf("data_arrays_incoming_sizes[%d] set to %d\n",w, data_arrays_incoming_sizes[w] );   
662
663           //        if(thisIndex.x==0 && thisIndex.y==0) 
664           //  CkPrintf("data_arrays_sizes[%d] set to %d\n",w, data_arrays_sizes[w] ); 
665
666       }
667
668       continueToNextStep();
669     }
670     
671   }
672 };
673
674 /** @} */
675 #endif