NDMeshStreamer: checking in a working version of the array interface
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H_
2 #define NDMESH_STREAMER_H_
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7
8 // allocate more total buffer space than the maximum buffering limit but flush 
9 //   upon reaching totalBufferCapacity_
10 #define BUFFER_SIZE_FACTOR 4
11
12 // #define DEBUG_STREAMER
13 // #define CACHE_LOCATIONS
14 // #define SUPPORT_INCOMPLETE_MESH
15
16 struct MeshLocation {
17   int dimension; 
18   int bufferIndex; 
19 }; 
20
21 template<class dtype>
22 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
23 public:
24     int numDataItems;
25     int *destinationPes;
26     dtype *data;
27
28     MeshStreamerMessage(): numDataItems(0) {}   
29
30     int addDataItem(const dtype &dataItem) {
31         data[numDataItems] = dataItem;
32         return ++numDataItems; 
33     }
34
35     void markDestination(const int index, const int destinationPe) {
36         destinationPes[index] = destinationPe;
37     }
38
39     dtype &getDataItem(const int index) {
40         return data[index];
41     }
42 };
43
44 template <class dtype>
45 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype> {
46  public:
47      virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
48      virtual void process(dtype &data)=0; 
49 };
50
51 template <class dtype>
52 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype> {
53  public:
54      // virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
55   // would like to make it pure virtual but charm will try to
56   // instantiate the abstract class, leading to errors
57   virtual void process(dtype &data) {} //=0; 
58   MeshStreamerArrayClient() {}
59   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
60 };
61
62 template <class dtype>
63 class MeshStreamer : public CBase_MeshStreamer<dtype> {
64
65 private:
66     int bufferSize_; 
67     int totalBufferCapacity_;
68     int numDataItemsBuffered_;
69
70     int numMembers_; 
71     int numDimensions_;
72     int *individualDimensionSizes_;
73     int *combinedDimensionSizes_;
74
75     int myIndex_;
76     int *myLocationIndex_;
77
78     CkCallback   userCallback_;
79     int yieldFlag_;
80
81     double progressPeriodInMs_; 
82     bool isPeriodicFlushEnabled_; 
83     double timeOfLastSend_; 
84
85
86     MeshStreamerMessage<dtype> ***dataBuffers_;
87
88 #ifdef CACHE_LOCATIONS
89     MeshLocation *cachedLocations;
90     bool *isCached; 
91 #endif
92
93     MeshLocation determineLocation(int destinationPe);
94
95     void storeMessage(int destinationPe, 
96                       const MeshLocation &destinationCoordinates, 
97                       const dtype &dataItem);
98
99     virtual void deliverToDestination(
100                  int destinationPe, 
101                  MeshStreamerMessage<dtype> *destinationBuffer) = 0;
102
103     virtual void localDeliver(dtype &dataItem) = 0; 
104
105     void flushLargestBuffer();
106
107 public:
108
109     MeshStreamer(int totalBufferCapacity, int numDimensions, 
110                  int *dimensionSizes,
111                  int yieldFlag = 0, double progressPeriodInMs = -1.0);
112     ~MeshStreamer();
113
114       // entry
115     void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
116     void flushDirect();
117     void finish(CkReductionMsg *msg);
118
119       // non entry
120     bool isPeriodicFlushEnabled() {
121       return isPeriodicFlushEnabled_;
122     }
123     virtual void insertData(dtype &dataItem, int destinationPe); 
124     void doneInserting();
125     void associateCallback(CkCallback &cb, bool automaticFinish = true) { 
126       userCallback_ = cb;
127       if (automaticFinish) {
128         CkStartQD(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), 
129                              this->thisProxy));
130       }
131     }
132     void flushAllBuffers();
133     void registerPeriodicProgressFunction();
134
135     // flushing begins only after enablePeriodicFlushing has been invoked
136
137     void enablePeriodicFlushing(){
138       isPeriodicFlushEnabled_ = true; 
139       registerPeriodicProgressFunction();
140     }
141 };
142
143 template <class dtype>
144 void MeshStreamerGroupClient<dtype>::receiveCombinedData(
145                                 MeshStreamerMessage<dtype> *msg) {
146   for (int i = 0; i < msg->numDataItems; i++) {
147     dtype data = msg->getDataItem(i);
148     process(data);
149   }
150   delete msg;
151 }
152
153 template <class dtype>
154 MeshStreamer<dtype>::MeshStreamer(
155                      int totalBufferCapacity, int numDimensions, 
156                      int *dimensionSizes, 
157                      int yieldFlag, 
158                      double progressPeriodInMs)
159  :numDimensions_(numDimensions), 
160   totalBufferCapacity_(totalBufferCapacity), 
161   yieldFlag_(yieldFlag), 
162   progressPeriodInMs_(progressPeriodInMs)
163 {
164   // limit total number of messages in system to totalBufferCapacity
165   //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
166   //   advantage of nonuniform filling of buffers
167
168   int sumAlongAllDimensions = 0;   
169   individualDimensionSizes_ = new int[numDimensions_];
170   combinedDimensionSizes_ = new int[numDimensions_ + 1];
171   myLocationIndex_ = new int[numDimensions_];
172   memcpy(individualDimensionSizes_, dimensionSizes, 
173          numDimensions * sizeof(int)); 
174   combinedDimensionSizes_[0] = 1; 
175   for (int i = 0; i < numDimensions; i++) {
176     sumAlongAllDimensions += individualDimensionSizes_[i];
177     combinedDimensionSizes_[i + 1] = 
178       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
179   }
180
181   // except for personalized messages, the buffers for dimensions with the 
182   //   same index as the sender's are not used
183   bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
184     / (sumAlongAllDimensions - numDimensions_ + 1); 
185   if (bufferSize_ <= 0) {
186     bufferSize_ = 1; 
187     CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
188              "is invalid. Defaulting to a single buffer per destination.\n");
189   }
190   totalBufferCapacity_ = totalBufferCapacity;
191   numDataItemsBuffered_ = 0; 
192   numMembers_ = CkNumPes(); 
193
194   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
195   for (int i = 0; i < numDimensions; i++) {
196     int numMembersAlongDimension = individualDimensionSizes_[i]; 
197     dataBuffers_[i] = 
198       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
199     for (int j = 0; j < numMembersAlongDimension; j++) {
200       dataBuffers_[i][j] = NULL;
201     }
202   }
203
204   myIndex_ = CkMyPe();
205   int remainder = myIndex_;
206   for (int i = numDimensions_ - 1; i >= 0; i--) {    
207     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
208     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
209   }
210
211   isPeriodicFlushEnabled_ = false; 
212
213 #ifdef CACHE_LOCATIONS
214   cachedLocations = new MeshLocation[numMembers_];
215   isCached = new bool[numMembers_];
216   std::fill(isCached, isCached + numMembers_, false);
217 #endif
218
219 }
220
221 template <class dtype>
222 MeshStreamer<dtype>::~MeshStreamer() {
223
224   for (int i = 0; i < numDimensions_; i++) {
225     for (int j=0; j < individualDimensionSizes_[i]; j++) {
226       delete[] dataBuffers_[i][j]; 
227     }
228     delete[] dataBuffers_[i]; 
229   }
230
231   delete[] individualDimensionSizes_;
232   delete[] combinedDimensionSizes_; 
233   delete[] myLocationIndex_;
234
235 #ifdef CACHE_LOCATIONS
236   delete[] cachedLocations;
237   delete[] isCached; 
238 #endif
239
240 }
241
242
243 template <class dtype>
244 inline
245 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
246
247 #ifdef CACHE_LOCATIONS
248   if (isCached[destinationPe]) {    
249     return cachedLocations[destinationPe]; 
250   }
251 #endif
252
253   MeshLocation destinationLocation;
254   int remainder = destinationPe;
255   int dimensionIndex; 
256   for (int i = numDimensions_ - 1; i >= 0; i--) {        
257     dimensionIndex = remainder / combinedDimensionSizes_[i];
258     
259     if (dimensionIndex != myLocationIndex_[i]) {
260       destinationLocation.dimension = i; 
261       destinationLocation.bufferIndex = dimensionIndex; 
262 #ifdef CACHE_LOCATIONS
263       cachedLocations[destinationPe] = destinationLocation;
264       isCached[destinationPe] = true; 
265 #endif
266       return destinationLocation;
267     }
268
269     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
270   }
271
272   // all indices agree - message to oneself
273   destinationLocation.dimension = 0; 
274   destinationLocation.bufferIndex = myLocationIndex_[0];
275   return destinationLocation; 
276 }
277
278 template <class dtype>
279 inline
280 void MeshStreamer<dtype>::storeMessage(
281                           int destinationPe, 
282                           const MeshLocation& destinationLocation,
283                           const dtype &dataItem) {
284
285   int dimension = destinationLocation.dimension;
286   int bufferIndex = destinationLocation.bufferIndex; 
287   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
288
289   // allocate new message if necessary
290   if (messageBuffers[bufferIndex] == NULL) {
291     if (dimension == 0) {
292       // personalized messages do not require destination indices
293       messageBuffers[bufferIndex] = 
294         new (0, bufferSize_) MeshStreamerMessage<dtype>();
295     }
296     else {
297       messageBuffers[bufferIndex] = 
298         new (bufferSize_, bufferSize_) MeshStreamerMessage<dtype>();
299     }
300 #ifdef DEBUG_STREAMER
301     CkAssert(messageBuffers[bufferIndex] != NULL);
302 #endif
303   }
304   
305   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
306   
307   int numBuffered = destinationBuffer->addDataItem(dataItem); 
308   if (dimension != 0) {
309     destinationBuffer->markDestination(numBuffered-1, destinationPe);
310   }
311   numDataItemsBuffered_++;
312
313   // copy data into message and send if buffer is full
314   if (numBuffered == bufferSize_) {
315
316     int destinationIndex;
317
318     destinationIndex = myIndex_ + 
319       (bufferIndex - myLocationIndex_[dimension]) * 
320       combinedDimensionSizes_[dimension];
321
322     if (dimension == 0) {
323       deliverToDestination(destinationIndex, destinationBuffer);
324     }
325     else {
326       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
327     }
328
329     messageBuffers[bufferIndex] = NULL;
330     numDataItemsBuffered_ -= numBuffered; 
331
332     if (isPeriodicFlushEnabled_) {
333       timeOfLastSend_ = CkWallTimer();
334     }
335
336   }
337
338   if (numDataItemsBuffered_ == totalBufferCapacity_) {
339     flushLargestBuffer();
340     if (isPeriodicFlushEnabled_) {
341       timeOfLastSend_ = CkWallTimer();
342     }
343   }
344
345 }
346
347 template <class dtype>
348 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
349   static int count = 0;
350
351   if (destinationPe == CkMyPe()) {
352     localDeliver(dataItem);
353     return;
354   }
355
356   MeshLocation destinationLocation = determineLocation(destinationPe);
357   storeMessage(destinationPe, destinationLocation, dataItem); 
358
359   // release control to scheduler if requested by the user, 
360   //   assume caller is threaded entry
361   if (yieldFlag_ && ++count == 1024) {
362     count = 0; 
363     CthYield();
364   }
365 }
366
367 template <class dtype>
368 void MeshStreamer<dtype>::doneInserting() {
369   this->contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), 
370                               this->thisProxy));
371 }
372
373 template <class dtype>
374 void MeshStreamer<dtype>::finish(CkReductionMsg *msg) {
375
376   isPeriodicFlushEnabled_ = false; 
377   flushDirect();
378
379   if (!userCallback_.isInvalid()) {
380     CkStartQD(userCallback_);
381     userCallback_ = CkCallback();      // nullify the current callback
382   }
383
384   // TODO: TEST IF THIS DELETE STILL CAUSES UNEXPLAINED CRASHES
385   //  delete msg; 
386 }
387
388 template <class dtype>
389 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
390
391   int destinationPe; 
392   MeshLocation destinationLocation;
393
394   for (int i = 0; i < msg->numDataItems; i++) {
395     destinationPe = msg->destinationPes[i];
396     dtype &dataItem = msg->getDataItem(i);
397     destinationLocation = determineLocation(destinationPe);
398     if (destinationPe == CkMyPe()) {
399       localDeliver(dataItem);
400     }
401     else {
402       storeMessage(destinationPe, destinationLocation, dataItem);   
403     }
404   }
405
406   delete msg;
407
408 }
409
410 template <class dtype>
411 void MeshStreamer<dtype>::flushLargestBuffer() {
412
413   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
414   MeshStreamerMessage<dtype> ** messageBuffers; 
415   MeshStreamerMessage<dtype> *destinationBuffer; 
416
417   for (int i = 0; i < numDimensions_; i++) {
418
419     messageBuffers = dataBuffers_[i]; 
420     numBuffers = individualDimensionSizes_[i]; 
421
422     flushDimension = i; 
423     maxSize = 0;    
424     for (int j = 0; j < numBuffers; j++) {
425       if (messageBuffers[j] != NULL && 
426           messageBuffers[j]->numDataItems > maxSize) {
427         maxSize = messageBuffers[j]->numDataItems;
428         flushIndex = j; 
429       }
430     }
431
432     if (maxSize > 0) {
433
434       messageBuffers = dataBuffers_[flushDimension]; 
435       destinationBuffer = messageBuffers[flushIndex];
436       destinationIndex = myIndex_ + 
437         (flushIndex - myLocationIndex_[flushDimension]) * 
438         combinedDimensionSizes_[flushDimension] ;
439
440       if (destinationBuffer->numDataItems < bufferSize_) {
441         // not sending the full buffer, shrink the message size
442         envelope *env = UsrToEnv(destinationBuffer);
443         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
444                           (bufferSize_ - destinationBuffer->numDataItems));
445       }
446       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
447
448       if (flushDimension == 0) {
449         deliverToDestination(destinationIndex, destinationBuffer);
450       }
451       else {
452         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
453       }
454       messageBuffers[flushIndex] = NULL;
455
456     }
457
458   }
459 }
460
461 template <class dtype>
462 void MeshStreamer<dtype>::flushAllBuffers() {
463
464   MeshStreamerMessage<dtype> **messageBuffers; 
465   int numBuffers; 
466
467   for (int i = 0; i < numDimensions_; i++) {
468
469     messageBuffers = dataBuffers_[i]; 
470     numBuffers = individualDimensionSizes_[i]; 
471
472     for (int j = 0; j < numBuffers; j++) {
473
474       if(messageBuffers[j] == NULL) {
475         continue;
476       }
477
478       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
479
480       if (i == 0) {
481         int destinationPe = myIndex_ + j - myLocationIndex_[i];
482         deliverToDestination(destinationPe, messageBuffers[j]);
483       }  
484       else {
485
486         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
487
488           MeshStreamerMessage<dtype> *directMsg = 
489             new (0, 1) MeshStreamerMessage<dtype>();
490 #ifdef DEBUG_STREAMER
491           CkAssert(directMsg != NULL);
492 #endif
493           int destinationPe = messageBuffers[j]->destinationPes[k]; 
494           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
495           directMsg->addDataItem(dataItem);
496           deliverToDestination(destinationPe,directMsg);
497         }
498         delete messageBuffers[j];
499       }
500       messageBuffers[j] = NULL;
501     }
502   }
503 }
504
505 template <class dtype>
506 void MeshStreamer<dtype>::flushDirect(){
507
508     if (!isPeriodicFlushEnabled_ || 
509         1000 * (CkWallTimer() - timeOfLastSend_) >= progressPeriodInMs_) {
510       flushAllBuffers();
511     }
512
513     if (isPeriodicFlushEnabled_) {
514       timeOfLastSend_ = CkWallTimer();
515     }
516
517 #ifdef DEBUG_STREAMER
518     //CkPrintf("[%d] numDataItemsBuffered_: %d\n", CkMyPe(), numDataItemsBuffered_);
519     CkAssert(numDataItemsBuffered_ == 0); 
520 #endif
521
522 }
523
524 template <class dtype>
525 void periodicProgressFunction(void *MeshStreamerObj, double time) {
526
527   MeshStreamer<dtype> *properObj = 
528     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
529
530   if (properObj->isPeriodicFlushEnabled()) {
531     properObj->flushDirect();
532     properObj->registerPeriodicProgressFunction();
533   }
534 }
535
536 template <class dtype>
537 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
538   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
539                  progressPeriodInMs_); 
540 }
541
542
543 template <class dtype>
544 class GroupMeshStreamer : public MeshStreamer<dtype> {
545 private:
546
547   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
548   MeshStreamerGroupClient<dtype> *clientObj_;
549
550   void deliverToDestination(int destinationPe, 
551                             MeshStreamerMessage<dtype> *destinationBuffer) {
552     clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
553   }
554
555   void localDeliver(dtype &dataItem) {
556     clientObj_->process(dataItem);
557   }
558
559 public:
560
561   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
562                     int *dimensionSizes, 
563                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
564                     int yieldFlag = 0, double progressPeriodInMs = -1.0)
565    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
566                          yieldFlag, progressPeriodInMs) 
567   {
568     clientProxy_ = clientProxy; 
569     clientObj_ = 
570       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
571   }
572
573
574 };
575
576 template <class dtype>
577 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype> > {
578 private:
579
580   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
581   CkArray *clientArrayMgr;
582   MeshStreamerArrayClient<dtype> *clientObj_;
583
584
585   void deliverToDestination(
586        int destinationPe, 
587        MeshStreamerMessage<ArrayDataItem<dtype> > *destinationBuffer) { 
588     ( (CProxy_ArrayMeshStreamer<dtype>) this->thisProxy[destinationPe]).receiveArrayData(destinationBuffer);
589   }
590
591   void localDeliver(ArrayDataItem<dtype> &packedDataItem) {
592     int arrayId = packedDataItem.arrayIndex; 
593     MeshStreamerArrayClient<dtype> *clientObj = 
594       clientProxy_[arrayId].ckLocal();
595
596     if (clientObj != NULL) {
597       clientObj->process(packedDataItem.dataItem);
598     }
599     else {
600       // array element is no longer present locally - redeliver using proxy
601       clientProxy_[arrayId].process(packedDataItem.dataItem);
602     }
603   }
604
605 public:
606
607   ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
608                     int *dimensionSizes, 
609                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
610                     int yieldFlag = 0, double progressPeriodInMs = -1.0)
611     :MeshStreamer<ArrayDataItem<dtype> >(totalBufferCapacity, numDimensions, 
612                                         dimensionSizes, yieldFlag, 
613                                         progressPeriodInMs) 
614   {
615     clientProxy_ = clientProxy; 
616     clientArrayMgr = clientProxy_.ckLocalBranch();
617   }
618
619   void receiveArrayData(MeshStreamerMessage<ArrayDataItem<dtype> > *msg) {
620     for (int i = 0; i < msg->numDataItems; i++) {
621       ArrayDataItem<dtype> &data = msg->getDataItem(i);
622       localDeliver(data);
623     }
624     delete msg;
625   }
626
627   void insertData(dtype &dataItem, int arrayIndex) {
628     // simple implementation to test functionality
629     // TODO - reimplement to avoid copying item before transfer into message
630     ArrayDataItem<dtype> packedDataItem;
631     packedDataItem.arrayIndex = arrayIndex; 
632     packedDataItem.dataItem = dataItem;
633     MeshStreamer<ArrayDataItem<dtype> >::insertData(packedDataItem, clientArrayMgr->lastKnown(clientProxy_[arrayIndex].ckGetIndex()));
634   }
635 };
636
637 #define CK_TEMPLATES_ONLY
638 #include "NDMeshStreamer.def.h"
639 #undef CK_TEMPLATES_ONLY
640
641 #endif