51bcb18c94a0b95043c4023ab549cdab3c0d495f
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // allocate more total buffer space than the maximum buffering limit but flush 
11 //   upon reaching totalBufferCapacity_
12 #define BUFFER_SIZE_FACTOR 4
13
14 // #define DEBUG_STREAMER
15 // #define CACHE_LOCATIONS
16 // #define SUPPORT_INCOMPLETE_MESH
17 #define CACHE_ARRAY_METADATA // only works for 1D array clients
18
19 struct MeshLocation {
20   int dimension; 
21   int bufferIndex; 
22 }; 
23
24 template<class dtype>
25 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
26 public:
27     int numDataItems;
28     int *destinationPes;
29     dtype *data;
30
31     MeshStreamerMessage(): numDataItems(0) {}   
32
33     int addDataItem(const dtype &dataItem) {
34         data[numDataItems] = dataItem;
35         return ++numDataItems; 
36     }
37
38     void markDestination(const int index, const int destinationPe) {
39         destinationPes[index] = destinationPe;
40     }
41
42     dtype &getDataItem(const int index) {
43         return data[index];
44     }
45 };
46
47 template <class dtype>
48 class MeshStreamerClient {
49  protected:
50   CompletionDetector *detectorLocalObj_;
51  public:
52   // would like to make it pure virtual but charm will try to
53   // instantiate the abstract class, leading to errors
54   virtual void process(dtype &data) {};     
55   void setDetector(CompletionDetector *detectorLocalObj) {
56     detectorLocalObj_ = detectorLocalObj;
57   }
58 };
59
60 template <class dtype>
61 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>,
62   public MeshStreamerClient<dtype> {
63  public:
64
65   virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg) {
66     for (int i = 0; i < msg->numDataItems; i++) {
67       dtype &data = msg->getDataItem(i);
68       process(data);
69     }
70     MeshStreamerClient<dtype>::detectorLocalObj_->consume(msg->numDataItems);
71     delete msg;
72   }
73 };
74
75 template <class dtype>
76 class MeshStreamerArrayClient :  public CBase_MeshStreamerArrayClient<dtype>, 
77   public MeshStreamerClient<dtype>
78 {
79
80 public:
81
82   // virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
83   MeshStreamerArrayClient() {}
84   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
85   void receiveRedeliveredItem(dtype data) {
86     MeshStreamerClient<dtype>::detectorLocalObj_->consume();
87     process(data);
88   }
89
90   void pup(PUP::er &p) {
91     CBase_MeshStreamerArrayClient<dtype>::pup(p);
92   }
93
94 };
95
96 template <class dtype>
97 class MeshStreamerArray3DClient : 
98   public CBase_MeshStreamerArray3DClient<dtype>, 
99   public MeshStreamerClient<dtype> 
100 {
101
102 public:
103   MeshStreamerArray3DClient() {}
104   MeshStreamerArray3DClient(CkMigrateMessage *msg) {}
105   void receiveRedeliveredItem(dtype data) {
106     MeshStreamerClient<dtype>::detectorLocalObj_->consume();
107     process(data);
108   }
109   void pup(PUP::er &p) {
110     CBase_MeshStreamerArray3DClient<dtype>::pup(p);
111   }
112
113 };
114
115 template <class dtype>
116 class MeshStreamer : public CBase_MeshStreamer<dtype> {
117
118 private:
119     int bufferSize_; 
120     int totalBufferCapacity_;
121     int numDataItemsBuffered_;
122
123     int numMembers_; 
124     int numDimensions_;
125     int *individualDimensionSizes_;
126     int *combinedDimensionSizes_;
127
128     int myIndex_;
129     int *myLocationIndex_;
130
131     CkCallback   userCallback_;
132     bool yieldFlag_;
133
134     double progressPeriodInMs_; 
135     bool isPeriodicFlushEnabled_; 
136     bool hasSentRecently_;
137
138     MeshStreamerMessage<dtype> ***dataBuffers_;
139
140     CProxy_CompletionDetector detector_;
141
142 #ifdef CACHE_LOCATIONS
143     MeshLocation *cachedLocations_;
144     bool *isCached_; 
145 #endif
146
147     MeshLocation determineLocation(int destinationPe);
148
149     void storeMessage(int destinationPe, 
150                       const MeshLocation &destinationCoordinates, 
151                       void *dataItem, bool copyIndirectly = false);
152
153     virtual void deliverToDestination(
154                  int destinationPe, 
155                  MeshStreamerMessage<dtype> *destinationBuffer) = 0;
156
157     virtual void localDeliver(dtype &dataItem) = 0; 
158
159     virtual int numElementsInClient() = 0;
160
161     virtual void initLocalClients() = 0;
162
163     void flushLargestBuffer();
164
165 protected:
166
167     CompletionDetector *detectorLocalObj_;
168     virtual int copyDataItemIntoMessage(
169                 MeshStreamerMessage<dtype> *destinationBuffer, 
170                 void *dataItemHandle, bool copyIndirectly = false);
171
172 public:
173
174     MeshStreamer(int totalBufferCapacity, int numDimensions, 
175                  int *dimensionSies,
176                  bool yieldFlag = 0, double progressPeriodInMs = -1.0);
177     ~MeshStreamer();
178
179       // entry
180     void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
181     void flushDirect();
182     void finish();
183
184     // non entry
185     bool isPeriodicFlushEnabled() {
186       return isPeriodicFlushEnabled_;
187     }
188     virtual void insertData(dtype &dataItem, int destinationPe); 
189     void insertData(void *dataItemHandle, int destinationPe);
190     void associateCallback(int numContributors, 
191                            CkCallback startCb, CkCallback endCb, 
192                            CProxy_CompletionDetector detector);
193     void flushAllBuffers();
194     void registerPeriodicProgressFunction();
195
196     // flushing begins only after enablePeriodicFlushing has been invoked
197
198     void enablePeriodicFlushing(){
199       isPeriodicFlushEnabled_ = true; 
200       registerPeriodicProgressFunction();
201     }
202
203     void done(int numContributorsFinished = 1) {
204       detectorLocalObj_->done(numContributorsFinished);
205     }
206
207 };
208
209 template <class dtype>
210 MeshStreamer<dtype>::MeshStreamer(
211                      int totalBufferCapacity, int numDimensions, 
212                      int *dimensionSizes, 
213                      bool yieldFlag, 
214                      double progressPeriodInMs)
215  :numDimensions_(numDimensions), 
216   totalBufferCapacity_(totalBufferCapacity), 
217   yieldFlag_(yieldFlag), 
218   progressPeriodInMs_(progressPeriodInMs)
219 {
220   // limit total number of messages in system to totalBufferCapacity
221   //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
222   //   advantage of nonuniform filling of buffers
223
224   int sumAlongAllDimensions = 0;   
225   individualDimensionSizes_ = new int[numDimensions_];
226   combinedDimensionSizes_ = new int[numDimensions_ + 1];
227   myLocationIndex_ = new int[numDimensions_];
228   memcpy(individualDimensionSizes_, dimensionSizes, 
229          numDimensions * sizeof(int)); 
230   combinedDimensionSizes_[0] = 1; 
231   for (int i = 0; i < numDimensions; i++) {
232     sumAlongAllDimensions += individualDimensionSizes_[i];
233     combinedDimensionSizes_[i + 1] = 
234       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
235   }
236
237   // except for personalized messages, the buffers for dimensions with the 
238   //   same index as the sender's are not used
239   bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
240     / (sumAlongAllDimensions - numDimensions_ + 1); 
241   if (bufferSize_ <= 0) {
242     bufferSize_ = 1; 
243     CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
244              "is invalid. Defaulting to a single buffer per destination.\n");
245   }
246   totalBufferCapacity_ = totalBufferCapacity;
247   numDataItemsBuffered_ = 0; 
248   numMembers_ = CkNumPes(); 
249
250   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
251   for (int i = 0; i < numDimensions; i++) {
252     int numMembersAlongDimension = individualDimensionSizes_[i]; 
253     dataBuffers_[i] = 
254       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
255     for (int j = 0; j < numMembersAlongDimension; j++) {
256       dataBuffers_[i][j] = NULL;
257     }
258   }
259
260   myIndex_ = CkMyPe();
261   int remainder = myIndex_;
262   for (int i = numDimensions_ - 1; i >= 0; i--) {    
263     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
264     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
265   }
266
267   isPeriodicFlushEnabled_ = false; 
268   detectorLocalObj_ = NULL;
269
270 #ifdef CACHE_LOCATIONS
271   cachedLocations_ = new MeshLocation[numMembers_];
272   isCached_ = new bool[numMembers_];
273   std::fill(isCached_, isCached_ + numMembers_, false);
274 #endif
275
276 }
277
278 template <class dtype>
279 MeshStreamer<dtype>::~MeshStreamer() {
280
281   for (int i = 0; i < numDimensions_; i++) {
282     for (int j=0; j < individualDimensionSizes_[i]; j++) {
283       delete[] dataBuffers_[i][j]; 
284     }
285     delete[] dataBuffers_[i]; 
286   }
287
288   delete[] individualDimensionSizes_;
289   delete[] combinedDimensionSizes_; 
290   delete[] myLocationIndex_;
291
292 #ifdef CACHE_LOCATIONS
293   delete[] cachedLocations_;
294   delete[] isCached_; 
295 #endif
296
297 }
298
299
300 template <class dtype>
301 inline
302 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
303
304 #ifdef CACHE_LOCATIONS
305   if (isCached_[destinationPe]) {    
306     return cachedLocations_[destinationPe]; 
307   }
308 #endif
309
310   MeshLocation destinationLocation;
311   int remainder = destinationPe;
312   int dimensionIndex; 
313   for (int i = numDimensions_ - 1; i >= 0; i--) {        
314     dimensionIndex = remainder / combinedDimensionSizes_[i];
315     
316     if (dimensionIndex != myLocationIndex_[i]) {
317       destinationLocation.dimension = i; 
318       destinationLocation.bufferIndex = dimensionIndex; 
319 #ifdef CACHE_LOCATIONS
320       cachedLocations_[destinationPe] = destinationLocation;
321       isCached_[destinationPe] = true; 
322 #endif
323       return destinationLocation;
324     }
325
326     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
327   }
328
329   // all indices agree - message to oneself
330   destinationLocation.dimension = 0; 
331   destinationLocation.bufferIndex = myLocationIndex_[0];
332   return destinationLocation; 
333 }
334
335 template <class dtype>
336 inline 
337 int MeshStreamer<dtype>::copyDataItemIntoMessage(
338                          MeshStreamerMessage<dtype> *destinationBuffer,
339                          void *dataItemHandle, bool copyIndirectly) {
340   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
341 }
342
343 template <class dtype>
344 inline
345 void MeshStreamer<dtype>::storeMessage(
346                           int destinationPe, 
347                           const MeshLocation& destinationLocation,
348                           void *dataItem, bool copyIndirectly) {
349
350   int dimension = destinationLocation.dimension;
351   int bufferIndex = destinationLocation.bufferIndex; 
352   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
353
354
355
356   // allocate new message if necessary
357   if (messageBuffers[bufferIndex] == NULL) {
358     if (dimension == 0) {
359       // personalized messages do not require destination indices
360       messageBuffers[bufferIndex] = 
361         new (0, bufferSize_) MeshStreamerMessage<dtype>();
362     }
363     else {
364       messageBuffers[bufferIndex] = 
365         new (bufferSize_, bufferSize_) MeshStreamerMessage<dtype>();
366     }
367 #ifdef DEBUG_STREAMER
368     CkAssert(messageBuffers[bufferIndex] != NULL);
369 #endif
370   }
371   
372   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
373   int numBuffered = 
374     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
375   if (dimension != 0) {
376     destinationBuffer->markDestination(numBuffered-1, destinationPe);
377   }  
378   numDataItemsBuffered_++;
379
380   // send if buffer is full
381   if (numBuffered == bufferSize_) {
382
383     int destinationIndex;
384
385     destinationIndex = myIndex_ + 
386       (bufferIndex - myLocationIndex_[dimension]) * 
387       combinedDimensionSizes_[dimension];
388
389     if (dimension == 0) {
390       deliverToDestination(destinationIndex, destinationBuffer);
391     }
392     else {
393       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
394     }
395
396     messageBuffers[bufferIndex] = NULL;
397     numDataItemsBuffered_ -= numBuffered; 
398     hasSentRecently_ = true; 
399
400   }
401   // send if total buffering capacity has been reached
402   else if (numDataItemsBuffered_ == totalBufferCapacity_) {
403     flushLargestBuffer();
404     hasSentRecently_ = true; 
405   }
406
407 }
408
409 template <class dtype>
410 inline
411 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
412   static int count = 0;
413   const static bool copyIndirectly = true;
414
415   MeshLocation destinationLocation = determineLocation(destinationPe);
416   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
417                copyIndirectly); 
418   // release control to scheduler if requested by the user, 
419   //   assume caller is threaded entry
420   if (yieldFlag_ && ++count == 1024) {
421     count = 0; 
422     CthYield();
423   }
424
425 }
426
427 template <class dtype>
428 inline
429 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
430
431   detectorLocalObj_->produce();
432   if (destinationPe == CkMyPe()) {
433     // copying here is necessary - user code should not be 
434     // passed back a reference to the original item
435     dtype dataItemCopy = dataItem;
436     localDeliver(dataItemCopy);
437     return;
438   }
439
440   insertData((void *) &dataItem, destinationPe);
441 }
442
443 template <class dtype>
444 void MeshStreamer<dtype>::associateCallback(
445                           int numContributors,
446                           CkCallback startCb, CkCallback endCb, 
447                           CProxy_CompletionDetector detector) {
448   userCallback_ = endCb; 
449   static CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
450                            this->thisProxy);
451   detector_ = detector;      
452   detectorLocalObj_ = detector_.ckLocalBranch();
453   initLocalClients();
454
455   detectorLocalObj_->start_detection(numContributors, startCb, finish , 0);
456   
457   if (progressPeriodInMs_ <= 0) {
458     CkPrintf("Using completion detection in NDMeshStreamer requires"
459              " setting a valid periodic flush period. Defaulting"
460              " to 10 ms\n");
461     progressPeriodInMs_ = 10;
462   }
463   
464   hasSentRecently_ = false; 
465   enablePeriodicFlushing();
466       
467 }
468
469 template <class dtype>
470 void MeshStreamer<dtype>::finish() {
471   isPeriodicFlushEnabled_ = false; 
472
473   if (!userCallback_.isInvalid()) {
474     this->contribute(userCallback_);
475     userCallback_ = CkCallback();      // nullify the current callback
476   }
477
478 }
479
480 template <class dtype>
481 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
482
483   int destinationPe; 
484   MeshLocation destinationLocation;
485
486   for (int i = 0; i < msg->numDataItems; i++) {
487     destinationPe = msg->destinationPes[i];
488     dtype &dataItem = msg->getDataItem(i);
489     destinationLocation = determineLocation(destinationPe);
490     if (destinationPe == CkMyPe()) {
491       localDeliver(dataItem);
492     }
493     else {
494       storeMessage(destinationPe, destinationLocation, &dataItem);   
495     }
496   }
497
498   delete msg;
499
500 }
501
502 template <class dtype>
503 void MeshStreamer<dtype>::flushLargestBuffer() {
504
505   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
506   MeshStreamerMessage<dtype> ** messageBuffers; 
507   MeshStreamerMessage<dtype> *destinationBuffer; 
508
509   for (int i = 0; i < numDimensions_; i++) {
510
511     messageBuffers = dataBuffers_[i]; 
512     numBuffers = individualDimensionSizes_[i]; 
513
514     flushDimension = i; 
515     maxSize = 0;    
516     for (int j = 0; j < numBuffers; j++) {
517       if (messageBuffers[j] != NULL && 
518           messageBuffers[j]->numDataItems > maxSize) {
519         maxSize = messageBuffers[j]->numDataItems;
520         flushIndex = j; 
521       }
522     }
523
524     if (maxSize > 0) {
525
526       messageBuffers = dataBuffers_[flushDimension]; 
527       destinationBuffer = messageBuffers[flushIndex];
528       destinationIndex = myIndex_ + 
529         (flushIndex - myLocationIndex_[flushDimension]) * 
530         combinedDimensionSizes_[flushDimension] ;
531
532       if (destinationBuffer->numDataItems < bufferSize_) {
533         // not sending the full buffer, shrink the message size
534         envelope *env = UsrToEnv(destinationBuffer);
535         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
536                           (bufferSize_ - destinationBuffer->numDataItems));
537       }
538       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
539
540       if (flushDimension == 0) {
541         deliverToDestination(destinationIndex, destinationBuffer);
542       }
543       else {
544         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
545       }
546       messageBuffers[flushIndex] = NULL;
547
548     }
549
550   }
551 }
552
553 template <class dtype>
554 void MeshStreamer<dtype>::flushAllBuffers() {
555
556   MeshStreamerMessage<dtype> **messageBuffers; 
557   int numBuffers; 
558
559   for (int i = 0; i < numDimensions_; i++) {
560
561     messageBuffers = dataBuffers_[i]; 
562     numBuffers = individualDimensionSizes_[i]; 
563
564     for (int j = 0; j < numBuffers; j++) {
565
566       if(messageBuffers[j] == NULL) {
567         continue;
568       }
569
570       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
571
572       if (i == 0) {
573         int destinationPe = myIndex_ + j - myLocationIndex_[i];
574         deliverToDestination(destinationPe, messageBuffers[j]);
575       }  
576       else {
577
578         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
579
580           MeshStreamerMessage<dtype> *directMsg = 
581             new (0, 1) MeshStreamerMessage<dtype>();
582 #ifdef DEBUG_STREAMER
583           CkAssert(directMsg != NULL);
584 #endif
585           int destinationPe = messageBuffers[j]->destinationPes[k]; 
586           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
587           directMsg->addDataItem(dataItem);
588           deliverToDestination(destinationPe,directMsg);
589         }
590         delete messageBuffers[j];
591       }
592       messageBuffers[j] = NULL;
593     }
594   }
595 }
596
597 template <class dtype>
598 void MeshStreamer<dtype>::flushDirect(){
599
600   // flush if (1) this is not a periodic call or 
601   //          (2) this is a periodic call and no sending took place
602   //              since the last time the function was invoked
603   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
604
605     if (numDataItemsBuffered_ != 0) {
606       flushAllBuffers();
607     }    
608 #ifdef DEBUG_STREAMER
609     CkAssert(numDataItemsBuffered_ == 0); 
610 #endif
611     
612   }
613
614   hasSentRecently_ = false; 
615
616 }
617
618 template <class dtype>
619 void periodicProgressFunction(void *MeshStreamerObj, double time) {
620
621   MeshStreamer<dtype> *properObj = 
622     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
623
624   if (properObj->isPeriodicFlushEnabled()) {
625     properObj->flushDirect();
626     properObj->registerPeriodicProgressFunction();
627   }
628 }
629
630 template <class dtype>
631 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
632   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
633                  progressPeriodInMs_); 
634 }
635
636
637 template <class dtype>
638 class GroupMeshStreamer : public MeshStreamer<dtype> {
639 private:
640
641   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
642   MeshStreamerGroupClient<dtype> *clientObj_;
643
644   void deliverToDestination(int destinationPe, 
645                             MeshStreamerMessage<dtype> *destinationBuffer) {
646     clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
647   }
648
649   void localDeliver(dtype &dataItem) {
650     clientObj_->process(dataItem);
651     MeshStreamer<dtype>::detectorLocalObj_->consume();
652   }
653
654   int numElementsInClient() {
655     // client is a group - there is one element per PE
656     return CkNumPes();
657   }
658
659   void initLocalClients() {
660     clientObj_->setDetector(MeshStreamer<dtype>::detectorLocalObj_);
661   }
662
663 public:
664
665   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
666                     int *dimensionSizes, 
667                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
668                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
669    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
670                          yieldFlag, progressPeriodInMs) 
671   {
672     clientProxy_ = clientProxy; 
673     clientObj_ = 
674       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
675   }
676
677 };
678
679 template <class dtype, class ctype>
680 class MeshStreamerClientIterator : public CkLocIterator {
681
682 public:
683   
684   CompletionDetector *detectorLocalObj_;
685   ctype clientProxy_;
686   MeshStreamerClientIterator(CompletionDetector *detectorObj, 
687                              ctype clientProxy) 
688    : detectorLocalObj_(detectorObj), clientProxy_(clientProxy) {}
689
690   // CkLocMgr::iterate will call addLocation on all elements local to this PE
691   void addLocation(CkLocation &loc) {
692
693     ((MeshStreamerClient<dtype> *) 
694      (clientProxy_.ckLocalBranch()->lookup(loc.getIndex())))
695       ->setDetector(detectorLocalObj_); 
696
697   }
698
699 };
700
701 template <class dtype, class ctype, class itype>
702 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
703   
704 private:
705   
706   ctype clientProxy_;
707   CkArray *clientArrayMgr_;
708   int numArrayElements_;
709 #ifdef CACHE_ARRAY_METADATA
710   MeshStreamerArrayClient<dtype> **clientObjs_;
711   int *destinationPes_;
712   bool *isCachedArrayMetadata_;
713 #endif
714
715   void deliverToDestination(
716        int destinationPe, 
717        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *destinationBuffer) {
718     ( (CProxy_ArrayMeshStreamer<dtype, ctype, itype>) 
719       this->thisProxy )[destinationPe].receiveArrayData(destinationBuffer);
720   }
721
722   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
723     itype arrayId = packedDataItem.arrayIndex; 
724
725     MeshStreamerArrayClient<dtype> *clientObj;
726 #ifdef CACHE_ARRAY_METADATA
727     clientObj = clientObjs_[arrayId];
728 #else
729     clientObj = clientProxy_[arrayId].ckLocal();
730 #endif
731
732     if (clientObj != NULL) {
733       clientObj->process(packedDataItem.dataItem);
734       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
735     }
736     else { 
737       // array element is no longer present locally - redeliver using proxy
738       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
739     }
740   }
741
742   int numElementsInClient() {
743     return numArrayElements_;
744   }
745
746   void initLocalClients() {
747
748 #ifdef CACHE_ARRAY_METADATA
749     std::fill(isCachedArrayMetadata_, 
750               isCachedArrayMetadata_ + numArrayElements_, false);
751
752     for (int i = 0; i < numArrayElements_; i++) {
753       clientObjs_[i] = clientProxy_[i].ckLocal();
754       if (clientObjs_[i] != NULL) {
755         clientObjs_[i]->setDetector(
756          MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
757       }
758     }
759 #else
760     // set completion detector in local elements of the client
761     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
762     MeshStreamerClientIterator<dtype, ctype> clientIterator(
763      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
764      clientProxy_);
765     clientLocMgr->iterate(clientIterator);
766 #endif    
767
768   }
769
770 public:
771
772   struct DataItemHandle {
773     itype arrayIndex; 
774     dtype *dataItem;
775   };
776
777   ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
778                     int *dimensionSizes, const ctype &clientProxy,
779                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
780     :MeshStreamer<ArrayDataItem<dtype, itype> >(
781       totalBufferCapacity, numDimensions, dimensionSizes, yieldFlag, 
782       progressPeriodInMs) 
783   {
784     clientProxy_ = clientProxy; 
785     clientArrayMgr_ = clientProxy_.ckLocalBranch();
786
787     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
788
789 #ifdef CACHE_ARRAY_METADATA
790     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
791     destinationPes_ = new int[numArrayElements_];
792     isCachedArrayMetadata_ = new bool[numArrayElements_];
793     std::fill(isCachedArrayMetadata_, 
794               isCachedArrayMetadata_ + numArrayElements_, false);
795 #endif
796   }
797
798   ~ArrayMeshStreamer() {
799 #ifdef CACHE_ARRAY_METADATA
800     delete [] clientObjs_;
801     delete [] destinationPes_;
802     delete [] isCachedArrayMetadata_; 
803 #endif
804   }
805
806   void receiveArrayData(
807        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
808     for (int i = 0; i < msg->numDataItems; i++) {
809       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
810       localDeliver(packedData);
811     }
812     delete msg;
813   }
814
815   void insertData(dtype &dataItem, itype arrayIndex) {
816
817     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
818     int destinationPe; 
819 #ifdef CACHE_ARRAY_METADATA
820   if (isCachedArrayMetadata_[arrayIndex]) {    
821     destinationPe =  destinationPes_[arrayIndex];
822   }
823   else {
824     destinationPe = 
825       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
826     isCachedArrayMetadata_[arrayIndex] = true;
827     destinationPes_[arrayIndex] = destinationPe;
828   }
829 #else 
830   destinationPe = 
831     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
832 #endif
833
834   static ArrayDataItem<dtype, itype> packedDataItem;
835     if (destinationPe == CkMyPe()) {
836       // copying here is necessary - user code should not be 
837       // passed back a reference to the original item
838       packedDataItem.arrayIndex = arrayIndex; 
839       packedDataItem.dataItem = dataItem;
840       localDeliver(packedDataItem);
841       return;
842     }
843
844     // this implementation avoids copying an item before transfer into message
845
846     static DataItemHandle tempHandle; 
847     tempHandle.arrayIndex = arrayIndex; 
848     tempHandle.dataItem = &dataItem;
849
850     MeshStreamer<ArrayDataItem<dtype, itype> >::
851      insertData(&tempHandle, destinationPe);
852
853   }
854
855   int copyDataItemIntoMessage(
856       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
857       void *dataItemHandle, bool copyIndirectly) {
858
859     if (copyIndirectly == true) {
860       // newly inserted items are passed through a handle to avoid copying
861       int numDataItems = destinationBuffer->numDataItems;
862       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
863       (destinationBuffer->data)[numDataItems].dataItem = 
864         *(tempHandle->dataItem);
865       (destinationBuffer->data)[numDataItems].arrayIndex = 
866         tempHandle->arrayIndex;
867       return ++destinationBuffer->numDataItems;
868     }
869     else {
870       // this is an item received along the route to destination
871       // we can copy it from the received message
872       return MeshStreamer<ArrayDataItem<dtype, itype> >::
873               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
874     }
875   }
876
877 };
878
879 #define CK_TEMPLATES_ONLY
880 #include "NDMeshStreamer.def.h"
881 #undef CK_TEMPLATES_ONLY
882
883 #endif