8d0c8b47decb45068206e33b08fd9dabb74abe06
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8
9 // allocate more total buffer space than the maximum buffering limit but flush 
10 //   upon reaching totalBufferCapacity_
11 #define BUFFER_SIZE_FACTOR 4
12
13 // #define DEBUG_STREAMER
14 // #define CACHE_LOCATIONS
15 // #define SUPPORT_INCOMPLETE_MESH
16 #define CACHE_ARRAY_METADATA
17
18 struct MeshLocation {
19   int dimension; 
20   int bufferIndex; 
21 }; 
22
23 template<class dtype>
24 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
25 public:
26     int numDataItems;
27     int *destinationPes;
28     dtype *data;
29
30     MeshStreamerMessage(): numDataItems(0) {}   
31
32     int addDataItem(const dtype &dataItem) {
33         data[numDataItems] = dataItem;
34         return ++numDataItems; 
35     }
36
37     void markDestination(const int index, const int destinationPe) {
38         destinationPes[index] = destinationPe;
39     }
40
41     dtype &getDataItem(const int index) {
42         return data[index];
43     }
44 };
45
46 template <class dtype>
47 class MeshStreamerClient {
48  protected:
49   CompletionDetector *detectorLocalObj_;
50  public:
51   // would like to make it pure virtual but charm will try to
52   // instantiate the abstract class, leading to errors
53   virtual void process(dtype &data) {};     
54   void setDetector(CompletionDetector *detectorLocalObj) {
55     detectorLocalObj_ = detectorLocalObj;
56   }
57 };
58
59 template <class dtype>
60 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>,
61   public MeshStreamerClient<dtype> {
62  public:
63
64   virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg) {
65     for (int i = 0; i < msg->numDataItems; i++) {
66       dtype &data = msg->getDataItem(i);
67       process(data);
68     }
69     MeshStreamerClient<dtype>::detectorLocalObj_->consume(msg->numDataItems);
70     delete msg;
71   }
72 };
73
74 template <class dtype>
75 class MeshStreamerArrayClient :  public CBase_MeshStreamerArrayClient<dtype>, 
76   public MeshStreamerClient<dtype>
77    {
78  public:
79
80   // virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
81   MeshStreamerArrayClient() {}
82   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
83   void receiveRedeliveredItem(dtype data) {
84     MeshStreamerClient<dtype>::detectorLocalObj_->consume();
85     process(data);
86   }
87   void pup(PUP::er &p) {
88     CBase_MeshStreamerArrayClient<dtype>::pup(p);
89   }
90
91 };
92
93 template <class dtype>
94 class MeshStreamer : public CBase_MeshStreamer<dtype> {
95
96 private:
97     int bufferSize_; 
98     int totalBufferCapacity_;
99     int numDataItemsBuffered_;
100
101     int numMembers_; 
102     int numDimensions_;
103     int *individualDimensionSizes_;
104     int *combinedDimensionSizes_;
105
106     int myIndex_;
107     int *myLocationIndex_;
108
109     CkCallback   userCallback_;
110     bool yieldFlag_;
111
112     double progressPeriodInMs_; 
113     bool isPeriodicFlushEnabled_; 
114     bool hasSentRecently_;
115
116     MeshStreamerMessage<dtype> ***dataBuffers_;
117
118     CProxy_CompletionDetector detector_;
119
120 #ifdef CACHE_LOCATIONS
121     MeshLocation *cachedLocations_;
122     bool *isCached_; 
123 #endif
124
125     MeshLocation determineLocation(int destinationPe);
126
127     void storeMessage(int destinationPe, 
128                       const MeshLocation &destinationCoordinates, 
129                       void *dataItem, bool copyIndirectly = false);
130
131     virtual void deliverToDestination(
132                  int destinationPe, 
133                  MeshStreamerMessage<dtype> *destinationBuffer) = 0;
134
135     virtual void localDeliver(dtype &dataItem) = 0; 
136
137     virtual int numElementsInClient() = 0;
138
139     virtual void initLocalClients() = 0;
140
141     void flushLargestBuffer();
142
143 protected:
144
145     CompletionDetector *detectorLocalObj_;
146     virtual int copyDataItemIntoMessage(
147                 MeshStreamerMessage<dtype> *destinationBuffer, 
148                 void *dataItemHandle, bool copyIndirectly = false);
149
150 public:
151
152     MeshStreamer(int totalBufferCapacity, int numDimensions, 
153                  int *dimensionSizes,
154                  bool yieldFlag = 0, double progressPeriodInMs = -1.0);
155     ~MeshStreamer();
156
157       // entry
158     void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
159     void flushDirect();
160     void finish();
161
162     // non entry
163     bool isPeriodicFlushEnabled() {
164       return isPeriodicFlushEnabled_;
165     }
166     virtual void insertData(dtype &dataItem, int destinationPe); 
167     void insertData(void *dataItemHandle, int destinationPe);
168     void associateCallback(int numContributors, 
169                            CkCallback startCb, CkCallback endCb, 
170                            CProxy_CompletionDetector detector);
171     void flushAllBuffers();
172     void registerPeriodicProgressFunction();
173
174     // flushing begins only after enablePeriodicFlushing has been invoked
175
176     void enablePeriodicFlushing(){
177       isPeriodicFlushEnabled_ = true; 
178       registerPeriodicProgressFunction();
179     }
180
181     void done(int numContributorsFinished = 1) {
182       detectorLocalObj_->done(numContributorsFinished);
183     }
184
185 };
186
187 template <class dtype>
188 MeshStreamer<dtype>::MeshStreamer(
189                      int totalBufferCapacity, int numDimensions, 
190                      int *dimensionSizes, 
191                      bool yieldFlag, 
192                      double progressPeriodInMs)
193  :numDimensions_(numDimensions), 
194   totalBufferCapacity_(totalBufferCapacity), 
195   yieldFlag_(yieldFlag), 
196   progressPeriodInMs_(progressPeriodInMs)
197 {
198   // limit total number of messages in system to totalBufferCapacity
199   //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
200   //   advantage of nonuniform filling of buffers
201
202   int sumAlongAllDimensions = 0;   
203   individualDimensionSizes_ = new int[numDimensions_];
204   combinedDimensionSizes_ = new int[numDimensions_ + 1];
205   myLocationIndex_ = new int[numDimensions_];
206   memcpy(individualDimensionSizes_, dimensionSizes, 
207          numDimensions * sizeof(int)); 
208   combinedDimensionSizes_[0] = 1; 
209   for (int i = 0; i < numDimensions; i++) {
210     sumAlongAllDimensions += individualDimensionSizes_[i];
211     combinedDimensionSizes_[i + 1] = 
212       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
213   }
214
215   // except for personalized messages, the buffers for dimensions with the 
216   //   same index as the sender's are not used
217   bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
218     / (sumAlongAllDimensions - numDimensions_ + 1); 
219   if (bufferSize_ <= 0) {
220     bufferSize_ = 1; 
221     CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
222              "is invalid. Defaulting to a single buffer per destination.\n");
223   }
224   totalBufferCapacity_ = totalBufferCapacity;
225   numDataItemsBuffered_ = 0; 
226   numMembers_ = CkNumPes(); 
227
228   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
229   for (int i = 0; i < numDimensions; i++) {
230     int numMembersAlongDimension = individualDimensionSizes_[i]; 
231     dataBuffers_[i] = 
232       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
233     for (int j = 0; j < numMembersAlongDimension; j++) {
234       dataBuffers_[i][j] = NULL;
235     }
236   }
237
238   myIndex_ = CkMyPe();
239   int remainder = myIndex_;
240   for (int i = numDimensions_ - 1; i >= 0; i--) {    
241     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
242     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
243   }
244
245   isPeriodicFlushEnabled_ = false; 
246   detectorLocalObj_ = NULL;
247
248 #ifdef CACHE_LOCATIONS
249   cachedLocations_ = new MeshLocation[numMembers_];
250   isCached_ = new bool[numMembers_];
251   std::fill(isCached_, isCached_ + numMembers_, false);
252 #endif
253
254 }
255
256 template <class dtype>
257 MeshStreamer<dtype>::~MeshStreamer() {
258
259   for (int i = 0; i < numDimensions_; i++) {
260     for (int j=0; j < individualDimensionSizes_[i]; j++) {
261       delete[] dataBuffers_[i][j]; 
262     }
263     delete[] dataBuffers_[i]; 
264   }
265
266   delete[] individualDimensionSizes_;
267   delete[] combinedDimensionSizes_; 
268   delete[] myLocationIndex_;
269
270 #ifdef CACHE_LOCATIONS
271   delete[] cachedLocations_;
272   delete[] isCached_; 
273 #endif
274
275 }
276
277
278 template <class dtype>
279 inline
280 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
281
282 #ifdef CACHE_LOCATIONS
283   if (isCached_[destinationPe]) {    
284     return cachedLocations_[destinationPe]; 
285   }
286 #endif
287
288   MeshLocation destinationLocation;
289   int remainder = destinationPe;
290   int dimensionIndex; 
291   for (int i = numDimensions_ - 1; i >= 0; i--) {        
292     dimensionIndex = remainder / combinedDimensionSizes_[i];
293     
294     if (dimensionIndex != myLocationIndex_[i]) {
295       destinationLocation.dimension = i; 
296       destinationLocation.bufferIndex = dimensionIndex; 
297 #ifdef CACHE_LOCATIONS
298       cachedLocations_[destinationPe] = destinationLocation;
299       isCached_[destinationPe] = true; 
300 #endif
301       return destinationLocation;
302     }
303
304     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
305   }
306
307   // all indices agree - message to oneself
308   destinationLocation.dimension = 0; 
309   destinationLocation.bufferIndex = myLocationIndex_[0];
310   return destinationLocation; 
311 }
312
313 template <class dtype>
314 inline 
315 int MeshStreamer<dtype>::copyDataItemIntoMessage(
316                          MeshStreamerMessage<dtype> *destinationBuffer,
317                          void *dataItemHandle, bool copyIndirectly) {
318   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
319 }
320
321 template <class dtype>
322 inline
323 void MeshStreamer<dtype>::storeMessage(
324                           int destinationPe, 
325                           const MeshLocation& destinationLocation,
326                           void *dataItem, bool copyIndirectly) {
327
328   int dimension = destinationLocation.dimension;
329   int bufferIndex = destinationLocation.bufferIndex; 
330   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
331
332
333
334   // allocate new message if necessary
335   if (messageBuffers[bufferIndex] == NULL) {
336     if (dimension == 0) {
337       // personalized messages do not require destination indices
338       messageBuffers[bufferIndex] = 
339         new (0, bufferSize_) MeshStreamerMessage<dtype>();
340     }
341     else {
342       messageBuffers[bufferIndex] = 
343         new (bufferSize_, bufferSize_) MeshStreamerMessage<dtype>();
344     }
345 #ifdef DEBUG_STREAMER
346     CkAssert(messageBuffers[bufferIndex] != NULL);
347 #endif
348   }
349   
350   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
351   int numBuffered = 
352     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
353   if (dimension != 0) {
354     destinationBuffer->markDestination(numBuffered-1, destinationPe);
355   }  
356   numDataItemsBuffered_++;
357
358   // send if buffer is full
359   if (numBuffered == bufferSize_) {
360
361     int destinationIndex;
362
363     destinationIndex = myIndex_ + 
364       (bufferIndex - myLocationIndex_[dimension]) * 
365       combinedDimensionSizes_[dimension];
366
367     if (dimension == 0) {
368       deliverToDestination(destinationIndex, destinationBuffer);
369     }
370     else {
371       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
372     }
373
374     messageBuffers[bufferIndex] = NULL;
375     numDataItemsBuffered_ -= numBuffered; 
376     hasSentRecently_ = true; 
377
378   }
379   // send if total buffering capacity has been reached
380   else if (numDataItemsBuffered_ == totalBufferCapacity_) {
381     flushLargestBuffer();
382     hasSentRecently_ = true; 
383   }
384
385 }
386
387 template <class dtype>
388 inline
389 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
390   static int count = 0;
391   const static bool copyIndirectly = true;
392
393   MeshLocation destinationLocation = determineLocation(destinationPe);
394   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
395                copyIndirectly); 
396   // release control to scheduler if requested by the user, 
397   //   assume caller is threaded entry
398   if (yieldFlag_ && ++count == 1024) {
399     count = 0; 
400     CthYield();
401   }
402
403 }
404
405 template <class dtype>
406 inline
407 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
408
409   detectorLocalObj_->produce();
410   if (destinationPe == CkMyPe()) {
411     // copying here is necessary - user code should not be 
412     // passed back a reference to the original item
413     dtype dataItemCopy = dataItem;
414     localDeliver(dataItemCopy);
415     return;
416   }
417
418   insertData((void *) &dataItem, destinationPe);
419 }
420
421 template <class dtype>
422 void MeshStreamer<dtype>::associateCallback(
423                           int numContributors,
424                           CkCallback startCb, CkCallback endCb, 
425                           CProxy_CompletionDetector detector) {
426   userCallback_ = endCb; 
427   static CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
428                            this->thisProxy);
429   detector_ = detector;      
430   detectorLocalObj_ = detector_.ckLocalBranch();
431   initLocalClients();
432
433   detectorLocalObj_->start_detection(numContributors, startCb, finish , 0);
434   
435   if (progressPeriodInMs_ <= 0) {
436     CkPrintf("Using completion detection in NDMeshStreamer requires"
437              " setting a valid periodic flush period. Defaulting"
438              " to 10 ms\n");
439     progressPeriodInMs_ = 10;
440   }
441   
442   hasSentRecently_ = false; 
443   enablePeriodicFlushing();
444       
445 }
446
447 template <class dtype>
448 void MeshStreamer<dtype>::finish() {
449   isPeriodicFlushEnabled_ = false; 
450
451   if (!userCallback_.isInvalid()) {
452     this->contribute(userCallback_);
453     userCallback_ = CkCallback();      // nullify the current callback
454   }
455
456 }
457
458 template <class dtype>
459 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
460
461   int destinationPe; 
462   MeshLocation destinationLocation;
463
464   for (int i = 0; i < msg->numDataItems; i++) {
465     destinationPe = msg->destinationPes[i];
466     dtype &dataItem = msg->getDataItem(i);
467     destinationLocation = determineLocation(destinationPe);
468     if (destinationPe == CkMyPe()) {
469       localDeliver(dataItem);
470     }
471     else {
472       storeMessage(destinationPe, destinationLocation, &dataItem);   
473     }
474   }
475
476   delete msg;
477
478 }
479
480 template <class dtype>
481 void MeshStreamer<dtype>::flushLargestBuffer() {
482
483   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
484   MeshStreamerMessage<dtype> ** messageBuffers; 
485   MeshStreamerMessage<dtype> *destinationBuffer; 
486
487   for (int i = 0; i < numDimensions_; i++) {
488
489     messageBuffers = dataBuffers_[i]; 
490     numBuffers = individualDimensionSizes_[i]; 
491
492     flushDimension = i; 
493     maxSize = 0;    
494     for (int j = 0; j < numBuffers; j++) {
495       if (messageBuffers[j] != NULL && 
496           messageBuffers[j]->numDataItems > maxSize) {
497         maxSize = messageBuffers[j]->numDataItems;
498         flushIndex = j; 
499       }
500     }
501
502     if (maxSize > 0) {
503
504       messageBuffers = dataBuffers_[flushDimension]; 
505       destinationBuffer = messageBuffers[flushIndex];
506       destinationIndex = myIndex_ + 
507         (flushIndex - myLocationIndex_[flushDimension]) * 
508         combinedDimensionSizes_[flushDimension] ;
509
510       if (destinationBuffer->numDataItems < bufferSize_) {
511         // not sending the full buffer, shrink the message size
512         envelope *env = UsrToEnv(destinationBuffer);
513         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
514                           (bufferSize_ - destinationBuffer->numDataItems));
515       }
516       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
517
518       if (flushDimension == 0) {
519         deliverToDestination(destinationIndex, destinationBuffer);
520       }
521       else {
522         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
523       }
524       messageBuffers[flushIndex] = NULL;
525
526     }
527
528   }
529 }
530
531 template <class dtype>
532 void MeshStreamer<dtype>::flushAllBuffers() {
533
534   MeshStreamerMessage<dtype> **messageBuffers; 
535   int numBuffers; 
536
537   for (int i = 0; i < numDimensions_; i++) {
538
539     messageBuffers = dataBuffers_[i]; 
540     numBuffers = individualDimensionSizes_[i]; 
541
542     for (int j = 0; j < numBuffers; j++) {
543
544       if(messageBuffers[j] == NULL) {
545         continue;
546       }
547
548       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
549
550       if (i == 0) {
551         int destinationPe = myIndex_ + j - myLocationIndex_[i];
552         deliverToDestination(destinationPe, messageBuffers[j]);
553       }  
554       else {
555
556         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
557
558           MeshStreamerMessage<dtype> *directMsg = 
559             new (0, 1) MeshStreamerMessage<dtype>();
560 #ifdef DEBUG_STREAMER
561           CkAssert(directMsg != NULL);
562 #endif
563           int destinationPe = messageBuffers[j]->destinationPes[k]; 
564           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
565           directMsg->addDataItem(dataItem);
566           deliverToDestination(destinationPe,directMsg);
567         }
568         delete messageBuffers[j];
569       }
570       messageBuffers[j] = NULL;
571     }
572   }
573 }
574
575 template <class dtype>
576 void MeshStreamer<dtype>::flushDirect(){
577
578   // flush if (1) this is not a periodic call or 
579   //          (2) this is a periodic call and no sending took place
580   //              since the last time the function was invoked
581   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
582
583     if (numDataItemsBuffered_ != 0) {
584       flushAllBuffers();
585     }    
586 #ifdef DEBUG_STREAMER
587     CkAssert(numDataItemsBuffered_ == 0); 
588 #endif
589     
590   }
591
592   hasSentRecently_ = false; 
593
594 }
595
596 template <class dtype>
597 void periodicProgressFunction(void *MeshStreamerObj, double time) {
598
599   MeshStreamer<dtype> *properObj = 
600     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
601
602   if (properObj->isPeriodicFlushEnabled()) {
603     properObj->flushDirect();
604     properObj->registerPeriodicProgressFunction();
605   }
606 }
607
608 template <class dtype>
609 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
610   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
611                  progressPeriodInMs_); 
612 }
613
614
615 template <class dtype>
616 class GroupMeshStreamer : public MeshStreamer<dtype> {
617 private:
618
619   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
620   MeshStreamerGroupClient<dtype> *clientObj_;
621
622   void deliverToDestination(int destinationPe, 
623                             MeshStreamerMessage<dtype> *destinationBuffer) {
624     clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
625   }
626
627   void localDeliver(dtype &dataItem) {
628     clientObj_->process(dataItem);
629     MeshStreamer<dtype>::detectorLocalObj_->consume();
630   }
631
632   int numElementsInClient() {
633     // client is a group - there is one element per PE
634     return CkNumPes();
635   }
636
637   void initLocalClients() {
638     clientObj_->setDetector(MeshStreamer<dtype>::detectorLocalObj_);
639   }
640
641 public:
642
643   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
644                     int *dimensionSizes, 
645                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
646                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
647    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
648                          yieldFlag, progressPeriodInMs) 
649   {
650     clientProxy_ = clientProxy; 
651     clientObj_ = 
652       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
653   }
654
655 };
656
657 template <class dtype>
658 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype> > {
659 private:
660
661   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
662   CkArray *clientArrayMgr_;
663   int numArrayElements_;
664   MeshStreamerArrayClient<dtype> **clientObjs_;
665 #ifdef CACHE_ARRAY_METADATA
666   int *destinationPes_;
667   bool *isCachedArrayMetadata_;
668 #endif
669
670   void deliverToDestination(
671        int destinationPe, 
672        MeshStreamerMessage<ArrayDataItem<dtype> > *destinationBuffer) { 
673     ( (CProxy_ArrayMeshStreamer<dtype>) 
674       this->thisProxy )[destinationPe].receiveArrayData(destinationBuffer);
675   }
676
677   void localDeliver(ArrayDataItem<dtype> &packedDataItem) {
678     int arrayId = packedDataItem.arrayIndex; 
679
680     if (clientObjs_[arrayId] != NULL) {
681       clientObjs_[arrayId]->process(packedDataItem.dataItem);
682       MeshStreamer<ArrayDataItem<dtype> >::detectorLocalObj_->consume();
683     }
684     else { 
685       // array element is no longer present locally - redeliver using proxy
686       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
687     }
688   }
689
690   int numElementsInClient() {
691     return numArrayElements_;
692   }
693
694   void initLocalClients() {
695
696 #ifdef CACHE_ARRAY_METADATA
697     std::fill(isCachedArrayMetadata_, 
698               isCachedArrayMetadata_ + numArrayElements_, false);
699 #endif
700
701     for (int i = 0; i < numArrayElements_; i++) {
702       clientObjs_[i] = clientProxy_[i].ckLocal();
703       if (clientObjs_[i] != NULL) {
704         clientObjs_[i]->setDetector(
705                         MeshStreamer<ArrayDataItem<dtype> >::detectorLocalObj_);
706       }
707     }
708   }
709
710 public:
711
712   struct DataItemHandle {
713     int arrayIndex; 
714     dtype *dataItem;
715   };
716
717   ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
718                     int *dimensionSizes, 
719                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
720                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
721     :MeshStreamer<ArrayDataItem<dtype> >(totalBufferCapacity, numDimensions, 
722                                         dimensionSizes, yieldFlag, 
723                                         progressPeriodInMs) 
724   {
725     clientProxy_ = clientProxy; 
726     clientArrayMgr_ = clientProxy_.ckLocalBranch();
727
728     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
729
730     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
731 #ifdef CACHE_ARRAY_METADATA
732     destinationPes_ = new int[numArrayElements_];
733     isCachedArrayMetadata_ = new bool[numArrayElements_];
734     std::fill(isCachedArrayMetadata_, 
735               isCachedArrayMetadata_ + numArrayElements_, false);
736 #endif
737   }
738
739   ~ArrayMeshStreamer() {
740     delete [] clientObjs_;
741 #ifdef CACHE_ARRAY_METADATA
742     delete [] destinationPes_;
743     delete [] isCachedArrayMetadata_; 
744 #endif
745   }
746
747   void receiveArrayData(MeshStreamerMessage<ArrayDataItem<dtype> > *msg) {
748     for (int i = 0; i < msg->numDataItems; i++) {
749       ArrayDataItem<dtype> &packedData = msg->getDataItem(i);
750       localDeliver(packedData);
751     }
752     delete msg;
753   }
754
755   void insertData(dtype &dataItem, int arrayIndex) {
756
757     MeshStreamer<ArrayDataItem<dtype> >::detectorLocalObj_->produce();
758     int destinationPe; 
759 #ifdef CACHE_ARRAY_METADATA
760   if (isCachedArrayMetadata_[arrayIndex]) {    
761     destinationPe =  destinationPes_[arrayIndex];
762   }
763   else {
764     destinationPe = 
765       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
766     isCachedArrayMetadata_[arrayIndex] = true;
767     destinationPes_[arrayIndex] = destinationPe;
768   }
769 #else 
770   destinationPe = 
771     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
772 #endif
773
774     static ArrayDataItem<dtype> packedDataItem;
775     if (destinationPe == CkMyPe()) {
776       // copying here is necessary - user code should not be 
777       // passed back a reference to the original item
778       packedDataItem.arrayIndex = arrayIndex; 
779       packedDataItem.dataItem = dataItem;
780       localDeliver(packedDataItem);
781       return;
782     }
783
784     // this implementation avoids copying an item before transfer into message
785
786     static DataItemHandle tempHandle; 
787     tempHandle.arrayIndex = arrayIndex; 
788     tempHandle.dataItem = &dataItem;
789
790     MeshStreamer<ArrayDataItem<dtype> >::insertData(&tempHandle, destinationPe);
791
792   }
793
794   int copyDataItemIntoMessage(
795       MeshStreamerMessage<ArrayDataItem <dtype> > *destinationBuffer, 
796       void *dataItemHandle, bool copyIndirectly) {
797
798     if (copyIndirectly == true) {
799       // newly inserted items are passed through a handle to avoid copying
800       int numDataItems = destinationBuffer->numDataItems;
801       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
802       (destinationBuffer->data)[numDataItems].dataItem = 
803         *(tempHandle->dataItem);
804       (destinationBuffer->data)[numDataItems].arrayIndex = 
805         tempHandle->arrayIndex;
806       return ++destinationBuffer->numDataItems;
807     }
808     else {
809       // this is an item received along the route to destination
810       // we can copy it from the received message
811       return MeshStreamer<ArrayDataItem<dtype> >::copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
812     }
813   }
814
815 };
816
817 #define CK_TEMPLATES_ONLY
818 #include "NDMeshStreamer.def.h"
819 #undef CK_TEMPLATES_ONLY
820
821 #endif