6613e616ea867345ae5da88aaefafdafc9edd35f
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // allocate more total buffer space than the maximum buffering limit but flush 
11 //   upon reaching totalBufferCapacity_
12 #define BUFFER_SIZE_FACTOR 4
13
14 // #define DEBUG_STREAMER
15 // #define CACHE_LOCATIONS
16 // #define SUPPORT_INCOMPLETE_MESH
17 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
18
19 struct MeshLocation {
20   int dimension; 
21   int bufferIndex; 
22 }; 
23
24 template<class dtype>
25 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
26 public:
27     int numDataItems;
28     int *destinationPes;
29     dtype *data;
30
31     MeshStreamerMessage(): numDataItems(0) {}   
32
33     int addDataItem(const dtype &dataItem) {
34         data[numDataItems] = dataItem;
35         return ++numDataItems; 
36     }
37
38     void markDestination(const int index, const int destinationPe) {
39         destinationPes[index] = destinationPe;
40     }
41
42     dtype &getDataItem(const int index) {
43         return data[index];
44     }
45 };
46
47 template <class dtype>
48 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
49  private:
50   CompletionDetector *detectorLocalObj_;
51  public:
52   MeshStreamerArrayClient(){}
53   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
54   // would like to make it pure virtual but charm will try to
55   // instantiate the abstract class, leading to errors
56   virtual void process(dtype &data) {
57     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
58             "This virtual function should have been defined by the user.\n");
59   };     
60   void setDetector(CompletionDetector *detectorLocalObj) {
61     detectorLocalObj_ = detectorLocalObj;
62   }
63   void receiveRedeliveredItem(dtype data) {
64     detectorLocalObj_->consume();
65     process(data);
66   }
67
68   void pup(PUP::er &p) {
69     CBase_MeshStreamerArrayClient<dtype>::pup(p);
70    }  
71
72 };
73
74 template <class dtype>
75 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
76
77  private:
78   CompletionDetector *detectorLocalObj_;
79
80  public:
81   virtual void process(dtype &data) = 0;
82   void setDetector(CompletionDetector *detectorLocalObj) {
83     detectorLocalObj_ = detectorLocalObj;
84   }
85   virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg) {
86     for (int i = 0; i < msg->numDataItems; i++) {
87       dtype &data = msg->getDataItem(i);
88       process(data);
89     }
90     detectorLocalObj_->consume(msg->numDataItems);
91     delete msg;
92   }
93 };
94
95 template <class dtype>
96 class MeshStreamer : public CBase_MeshStreamer<dtype> {
97
98 private:
99     int bufferSize_; 
100     int totalBufferCapacity_;
101     int numDataItemsBuffered_;
102
103     int numMembers_; 
104     int numDimensions_;
105     int *individualDimensionSizes_;
106     int *combinedDimensionSizes_;
107
108     int myIndex_;
109     int *myLocationIndex_;
110
111     CkCallback   userCallback_;
112     bool yieldFlag_;
113
114     double progressPeriodInMs_; 
115     bool isPeriodicFlushEnabled_; 
116     bool hasSentRecently_;
117
118     MeshStreamerMessage<dtype> ***dataBuffers_;
119
120     CProxy_CompletionDetector detector_;
121     int prio_;
122     int yieldCount_;
123
124 #ifdef CACHE_LOCATIONS
125     MeshLocation *cachedLocations_;
126     bool *isCached_; 
127 #endif
128
129     MeshLocation determineLocation(int destinationPe);
130
131     void storeMessage(int destinationPe, 
132                       const MeshLocation &destinationCoordinates, 
133                       void *dataItem, bool copyIndirectly = false);
134
135     virtual void deliverToDestination(
136                  int destinationPe, 
137                  MeshStreamerMessage<dtype> *destinationBuffer) = 0;
138
139     virtual void localDeliver(dtype &dataItem) = 0; 
140
141     virtual int numElementsInClient() = 0;
142
143     virtual void initLocalClients() = 0;
144
145     void flushLargestBuffer();
146
147 protected:
148
149     CompletionDetector *detectorLocalObj_;
150     virtual int copyDataItemIntoMessage(
151                 MeshStreamerMessage<dtype> *destinationBuffer, 
152                 void *dataItemHandle, bool copyIndirectly = false);
153
154 public:
155
156     MeshStreamer(int totalBufferCapacity, int numDimensions, 
157                  int *dimensionSies,
158                  bool yieldFlag = 0, double progressPeriodInMs = -1.0);
159     ~MeshStreamer();
160
161       // entry
162     void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
163     void flushDirect();
164     void finish();
165
166     // non entry
167     bool isPeriodicFlushEnabled() {
168       return isPeriodicFlushEnabled_;
169     }
170     virtual void insertData(dtype &dataItem, int destinationPe); 
171     void insertData(void *dataItemHandle, int destinationPe);
172     void associateCallback(int numContributors, 
173                            CkCallback startCb, CkCallback endCb, 
174                            CProxy_CompletionDetector detector,
175                            int prio);
176     void flushAllBuffers();
177     void registerPeriodicProgressFunction();
178
179     // flushing begins only after enablePeriodicFlushing has been invoked
180
181     void enablePeriodicFlushing(){
182       isPeriodicFlushEnabled_ = true; 
183       registerPeriodicProgressFunction();
184     }
185
186     void done(int numContributorsFinished = 1) {
187       detectorLocalObj_->done(numContributorsFinished);
188     }
189
190 };
191
192 template <class dtype>
193 MeshStreamer<dtype>::MeshStreamer(
194                      int totalBufferCapacity, int numDimensions, 
195                      int *dimensionSizes, 
196                      bool yieldFlag, 
197                      double progressPeriodInMs)
198  :numDimensions_(numDimensions), 
199   totalBufferCapacity_(totalBufferCapacity), 
200   yieldFlag_(yieldFlag), 
201   progressPeriodInMs_(progressPeriodInMs)
202 {
203   // limit total number of messages in system to totalBufferCapacity
204   //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
205   //   advantage of nonuniform filling of buffers
206
207   int sumAlongAllDimensions = 0;   
208   individualDimensionSizes_ = new int[numDimensions_];
209   combinedDimensionSizes_ = new int[numDimensions_ + 1];
210   myLocationIndex_ = new int[numDimensions_];
211   memcpy(individualDimensionSizes_, dimensionSizes, 
212          numDimensions * sizeof(int)); 
213   combinedDimensionSizes_[0] = 1; 
214   for (int i = 0; i < numDimensions; i++) {
215     sumAlongAllDimensions += individualDimensionSizes_[i];
216     combinedDimensionSizes_[i + 1] = 
217       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
218   }
219
220   // except for personalized messages, the buffers for dimensions with the 
221   //   same index as the sender's are not used
222   bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
223     / (sumAlongAllDimensions - numDimensions_ + 1); 
224   if (bufferSize_ <= 0) {
225     bufferSize_ = 1; 
226     CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
227              "is invalid. Defaulting to a single buffer per destination.\n");
228   }
229   totalBufferCapacity_ = totalBufferCapacity;
230   numDataItemsBuffered_ = 0; 
231   numMembers_ = CkNumPes(); 
232
233   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
234   for (int i = 0; i < numDimensions; i++) {
235     int numMembersAlongDimension = individualDimensionSizes_[i]; 
236     dataBuffers_[i] = 
237       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
238     for (int j = 0; j < numMembersAlongDimension; j++) {
239       dataBuffers_[i][j] = NULL;
240     }
241   }
242
243   myIndex_ = CkMyPe();
244   int remainder = myIndex_;
245   for (int i = numDimensions_ - 1; i >= 0; i--) {    
246     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
247     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
248   }
249
250   isPeriodicFlushEnabled_ = false; 
251   detectorLocalObj_ = NULL;
252
253 #ifdef CACHE_LOCATIONS
254   cachedLocations_ = new MeshLocation[numMembers_];
255   isCached_ = new bool[numMembers_];
256   std::fill(isCached_, isCached_ + numMembers_, false);
257 #endif
258
259 }
260
261 template <class dtype>
262 MeshStreamer<dtype>::~MeshStreamer() {
263
264   for (int i = 0; i < numDimensions_; i++) {
265     for (int j=0; j < individualDimensionSizes_[i]; j++) {
266       delete[] dataBuffers_[i][j]; 
267     }
268     delete[] dataBuffers_[i]; 
269   }
270
271   delete[] individualDimensionSizes_;
272   delete[] combinedDimensionSizes_; 
273   delete[] myLocationIndex_;
274
275 #ifdef CACHE_LOCATIONS
276   delete[] cachedLocations_;
277   delete[] isCached_; 
278 #endif
279
280 }
281
282
283 template <class dtype>
284 inline
285 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
286
287 #ifdef CACHE_LOCATIONS
288   if (isCached_[destinationPe]) {    
289     return cachedLocations_[destinationPe]; 
290   }
291 #endif
292
293   MeshLocation destinationLocation;
294   int remainder = destinationPe;
295   int dimensionIndex; 
296   for (int i = numDimensions_ - 1; i >= 0; i--) {        
297     dimensionIndex = remainder / combinedDimensionSizes_[i];
298     
299     if (dimensionIndex != myLocationIndex_[i]) {
300       destinationLocation.dimension = i; 
301       destinationLocation.bufferIndex = dimensionIndex; 
302 #ifdef CACHE_LOCATIONS
303       cachedLocations_[destinationPe] = destinationLocation;
304       isCached_[destinationPe] = true; 
305 #endif
306       return destinationLocation;
307     }
308
309     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
310   }
311
312   // all indices agree - message to oneself
313   destinationLocation.dimension = 0; 
314   destinationLocation.bufferIndex = myLocationIndex_[0];
315   return destinationLocation; 
316 }
317
318 template <class dtype>
319 inline 
320 int MeshStreamer<dtype>::copyDataItemIntoMessage(
321                          MeshStreamerMessage<dtype> *destinationBuffer,
322                          void *dataItemHandle, bool copyIndirectly) {
323   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
324 }
325
326 template <class dtype>
327 inline
328 void MeshStreamer<dtype>::storeMessage(
329                           int destinationPe, 
330                           const MeshLocation& destinationLocation,
331                           void *dataItem, bool copyIndirectly) {
332
333   int dimension = destinationLocation.dimension;
334   int bufferIndex = destinationLocation.bufferIndex; 
335   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
336
337
338
339   // allocate new message if necessary
340   if (messageBuffers[bufferIndex] == NULL) {
341     if (dimension == 0) {
342       // personalized messages do not require destination indices
343       messageBuffers[bufferIndex] = 
344         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
345     }
346     else {
347       messageBuffers[bufferIndex] = 
348         new (bufferSize_, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
349     }
350     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
351     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
352 #ifdef DEBUG_STREAMER
353     CkAssert(messageBuffers[bufferIndex] != NULL);
354 #endif
355   }
356   
357   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
358   int numBuffered = 
359     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
360   if (dimension != 0) {
361     destinationBuffer->markDestination(numBuffered-1, destinationPe);
362   }  
363   numDataItemsBuffered_++;
364
365   // send if buffer is full
366   if (numBuffered == bufferSize_) {
367
368     int destinationIndex;
369
370     destinationIndex = myIndex_ + 
371       (bufferIndex - myLocationIndex_[dimension]) * 
372       combinedDimensionSizes_[dimension];
373
374     if (dimension == 0) {
375       deliverToDestination(destinationIndex, destinationBuffer);
376     }
377     else {
378       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
379     }
380
381     messageBuffers[bufferIndex] = NULL;
382     numDataItemsBuffered_ -= numBuffered; 
383     hasSentRecently_ = true; 
384
385   }
386   // send if total buffering capacity has been reached
387   else if (numDataItemsBuffered_ == totalBufferCapacity_) {
388     flushLargestBuffer();
389     hasSentRecently_ = true; 
390   }
391
392 }
393
394 template <class dtype>
395 inline
396 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
397   const static bool copyIndirectly = true;
398
399   MeshLocation destinationLocation = determineLocation(destinationPe);
400   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
401                copyIndirectly); 
402   // release control to scheduler if requested by the user, 
403   //   assume caller is threaded entry
404   if (yieldFlag_ && ++yieldCount_ == 1024) {
405     yieldCount_ = 0; 
406     CthYield();
407   }
408
409 }
410
411 template <class dtype>
412 inline
413 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
414
415   detectorLocalObj_->produce();
416   if (destinationPe == CkMyPe()) {
417     // copying here is necessary - user code should not be 
418     // passed back a reference to the original item
419     dtype dataItemCopy = dataItem;
420     localDeliver(dataItemCopy);
421     return;
422   }
423
424   insertData((void *) &dataItem, destinationPe);
425 }
426
427 template <class dtype>
428 void MeshStreamer<dtype>::associateCallback(
429                           int numContributors,
430                           CkCallback startCb, CkCallback endCb, 
431                           CProxy_CompletionDetector detector, 
432                           int prio) {
433   yieldCount_ = 0; 
434   prio_ = prio;
435   userCallback_ = endCb; 
436   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushDirect(), 
437                      this->thisProxy);
438   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
439                     this->thisProxy);
440   detector_ = detector;      
441   detectorLocalObj_ = detector_.ckLocalBranch();
442   initLocalClients();
443
444   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, finish , 0);
445   
446   if (progressPeriodInMs_ <= 0) {
447     CkPrintf("Using completion detection in NDMeshStreamer requires"
448              " setting a valid periodic flush period. Defaulting"
449              " to 10 ms\n");
450     progressPeriodInMs_ = 10;
451   }
452   
453   hasSentRecently_ = false; 
454   enablePeriodicFlushing();
455       
456 }
457
458 template <class dtype>
459 void MeshStreamer<dtype>::finish() {
460   isPeriodicFlushEnabled_ = false; 
461
462   if (!userCallback_.isInvalid()) {
463     this->contribute(userCallback_);
464     userCallback_ = CkCallback();      // nullify the current callback
465   }
466
467 }
468
469 template <class dtype>
470 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
471
472   int destinationPe, lastDestinationPe; 
473   MeshLocation destinationLocation;
474
475   lastDestinationPe = -1;
476   for (int i = 0; i < msg->numDataItems; i++) {
477     destinationPe = msg->destinationPes[i];
478     dtype &dataItem = msg->getDataItem(i);
479     if (destinationPe == CkMyPe()) {
480       localDeliver(dataItem);
481     }
482     else {
483       if (destinationPe != lastDestinationPe) {
484         // do this once per sequence of items with the same destination
485         destinationLocation = determineLocation(destinationPe);
486       }
487       storeMessage(destinationPe, destinationLocation, &dataItem);   
488     }
489     lastDestinationPe = destinationPe; 
490   }
491
492   delete msg;
493
494 }
495
496 template <class dtype>
497 void MeshStreamer<dtype>::flushLargestBuffer() {
498
499   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
500   MeshStreamerMessage<dtype> ** messageBuffers; 
501   MeshStreamerMessage<dtype> *destinationBuffer; 
502
503   for (int i = 0; i < numDimensions_; i++) {
504
505     messageBuffers = dataBuffers_[i]; 
506     numBuffers = individualDimensionSizes_[i]; 
507
508     flushDimension = i; 
509     maxSize = 0;    
510     for (int j = 0; j < numBuffers; j++) {
511       if (messageBuffers[j] != NULL && 
512           messageBuffers[j]->numDataItems > maxSize) {
513         maxSize = messageBuffers[j]->numDataItems;
514         flushIndex = j; 
515       }
516     }
517
518     if (maxSize > 0) {
519
520       messageBuffers = dataBuffers_[flushDimension]; 
521       destinationBuffer = messageBuffers[flushIndex];
522       destinationIndex = myIndex_ + 
523         (flushIndex - myLocationIndex_[flushDimension]) * 
524         combinedDimensionSizes_[flushDimension] ;
525
526       if (destinationBuffer->numDataItems < bufferSize_) {
527         // not sending the full buffer, shrink the message size
528         envelope *env = UsrToEnv(destinationBuffer);
529         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
530                           (bufferSize_ - destinationBuffer->numDataItems));
531         *((int *) env->getPrioPtr()) = prio_;
532       }
533       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
534
535       if (flushDimension == 0) {
536         deliverToDestination(destinationIndex, destinationBuffer);
537       }
538       else {
539         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
540       }
541       messageBuffers[flushIndex] = NULL;
542
543     }
544
545   }
546 }
547
548 template <class dtype>
549 void MeshStreamer<dtype>::flushAllBuffers() {
550
551   MeshStreamerMessage<dtype> **messageBuffers; 
552   int numBuffers; 
553
554   for (int i = 0; i < numDimensions_; i++) {
555
556     messageBuffers = dataBuffers_[i]; 
557     numBuffers = individualDimensionSizes_[i]; 
558
559     for (int j = 0; j < numBuffers; j++) {
560
561       if(messageBuffers[j] == NULL) {
562         continue;
563       }
564
565       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
566
567       if (i == 0) {
568         int destinationPe = myIndex_ + j - myLocationIndex_[i];
569         deliverToDestination(destinationPe, messageBuffers[j]);
570       }  
571       else {
572
573         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
574
575           MeshStreamerMessage<dtype> *directMsg = 
576             new (0, 1, sizeof(int)) MeshStreamerMessage<dtype>();
577           *(int *) CkPriorityPtr(directMsg) = prio_;
578           CkSetQueueing(directMsg, CK_QUEUEING_IFIFO);
579
580 #ifdef DEBUG_STREAMER
581           CkAssert(directMsg != NULL);
582 #endif
583           int destinationPe = messageBuffers[j]->destinationPes[k]; 
584           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
585           directMsg->addDataItem(dataItem);
586           deliverToDestination(destinationPe,directMsg);
587         }
588         delete messageBuffers[j];
589       }
590       messageBuffers[j] = NULL;
591     }
592   }
593 }
594
595 template <class dtype>
596 void MeshStreamer<dtype>::flushDirect(){
597
598   // flush if (1) this is not a periodic call or 
599   //          (2) this is a periodic call and no sending took place
600   //              since the last time the function was invoked
601   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
602
603     if (numDataItemsBuffered_ != 0) {
604       flushAllBuffers();
605     }    
606 #ifdef DEBUG_STREAMER
607     CkAssert(numDataItemsBuffered_ == 0); 
608 #endif
609     
610   }
611
612   hasSentRecently_ = false; 
613
614 }
615
616 template <class dtype>
617 void periodicProgressFunction(void *MeshStreamerObj, double time) {
618
619   MeshStreamer<dtype> *properObj = 
620     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
621
622   if (properObj->isPeriodicFlushEnabled()) {
623     properObj->flushDirect();
624     properObj->registerPeriodicProgressFunction();
625   }
626 }
627
628 template <class dtype>
629 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
630   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
631                  progressPeriodInMs_); 
632 }
633
634
635 template <class dtype>
636 class GroupMeshStreamer : public MeshStreamer<dtype> {
637 private:
638
639   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
640   MeshStreamerGroupClient<dtype> *clientObj_;
641
642   void deliverToDestination(int destinationPe, 
643                             MeshStreamerMessage<dtype> *destinationBuffer) {
644     clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
645   }
646
647   void localDeliver(dtype &dataItem) {
648     clientObj_->process(dataItem);
649     MeshStreamer<dtype>::detectorLocalObj_->consume();
650   }
651
652   int numElementsInClient() {
653     // client is a group - there is one element per PE
654     return CkNumPes();
655   }
656
657   void initLocalClients() {
658     clientObj_->setDetector(MeshStreamer<dtype>::detectorLocalObj_);
659   }
660
661 public:
662
663   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
664                     int *dimensionSizes, 
665                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
666                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
667    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
668                          yieldFlag, progressPeriodInMs) 
669   {
670     clientProxy_ = clientProxy; 
671     clientObj_ = 
672       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
673   }
674
675 };
676
677 template <class dtype>
678 class MeshStreamerClientIterator : public CkLocIterator {
679
680 public:
681   
682   CompletionDetector *detectorLocalObj_;
683   CkArray *clientArrMgr_;
684   MeshStreamerClientIterator(CompletionDetector *detectorObj, 
685                              CkArray *clientArrMgr) 
686     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
687
688   // CkLocMgr::iterate will call addLocation on all elements local to this PE
689   void addLocation(CkLocation &loc) {
690
691     MeshStreamerArrayClient<dtype> *clientObj = 
692       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
693
694     CkAssert(clientObj != NULL); 
695     clientObj->setDetector(detectorLocalObj_); 
696   }
697
698 };
699
700 template <class dtype, class itype>
701 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
702   
703 private:
704   
705   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
706   CkArray *clientArrayMgr_;
707   int numArrayElements_;
708 #ifdef CACHE_ARRAY_METADATA
709   MeshStreamerArrayClient<dtype> **clientObjs_;
710   int *destinationPes_;
711   bool *isCachedArrayMetadata_;
712 #endif
713
714   void deliverToDestination(
715        int destinationPe, 
716        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *destinationBuffer) {
717
718     CProxy_ArrayMeshStreamer<dtype, itype> myProxy(this->thisProxy); 
719     myProxy[destinationPe].receiveArrayData(destinationBuffer);
720   }
721
722   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
723     itype arrayId = packedDataItem.arrayIndex; 
724
725     MeshStreamerArrayClient<dtype> *clientObj;
726 #ifdef CACHE_ARRAY_METADATA
727     clientObj = clientObjs_[arrayId];
728 #else
729     clientObj = clientProxy_[arrayId].ckLocal();
730 #endif
731
732     if (clientObj != NULL) {
733       clientObj->process(packedDataItem.dataItem);
734       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
735     }
736     else { 
737       // array element is no longer present locally - redeliver using proxy
738       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
739     }
740   }
741
742   int numElementsInClient() {
743     return numArrayElements_;
744   }
745
746   void initLocalClients() {
747
748 #ifdef CACHE_ARRAY_METADATA
749     std::fill(isCachedArrayMetadata_, 
750               isCachedArrayMetadata_ + numArrayElements_, false);
751
752     for (int i = 0; i < numArrayElements_; i++) {
753       clientObjs_[i] = clientProxy_[i].ckLocal();
754       if (clientObjs_[i] != NULL) {
755         clientObjs_[i]->setDetector(
756          MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
757       }
758     }
759 #else
760     // set completion detector in local elements of the client
761     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
762     MeshStreamerClientIterator<dtype> clientIterator(
763      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
764      clientProxy_.ckLocalBranch());
765     clientLocMgr->iterate(clientIterator);
766 #endif    
767
768   }
769
770 public:
771
772   struct DataItemHandle {
773     itype arrayIndex; 
774     dtype *dataItem;
775   };
776
777   ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
778                     int *dimensionSizes, 
779                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
780                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
781     :MeshStreamer<ArrayDataItem<dtype, itype> >(
782       totalBufferCapacity, numDimensions, dimensionSizes, yieldFlag, 
783       progressPeriodInMs) 
784   {
785     clientProxy_ = clientProxy; 
786     clientArrayMgr_ = clientProxy_.ckLocalBranch();
787
788     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
789
790 #ifdef CACHE_ARRAY_METADATA
791     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
792     destinationPes_ = new int[numArrayElements_];
793     isCachedArrayMetadata_ = new bool[numArrayElements_];
794     std::fill(isCachedArrayMetadata_, 
795               isCachedArrayMetadata_ + numArrayElements_, false);
796 #endif
797   }
798
799   ~ArrayMeshStreamer() {
800 #ifdef CACHE_ARRAY_METADATA
801     delete [] clientObjs_;
802     delete [] destinationPes_;
803     delete [] isCachedArrayMetadata_; 
804 #endif
805   }
806
807   void receiveArrayData(
808        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
809     for (int i = 0; i < msg->numDataItems; i++) {
810       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
811       localDeliver(packedData);
812     }
813     delete msg;
814   }
815
816   void insertData(dtype &dataItem, itype arrayIndex) {
817
818     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
819     int destinationPe; 
820 #ifdef CACHE_ARRAY_METADATA
821   if (isCachedArrayMetadata_[arrayIndex]) {    
822     destinationPe =  destinationPes_[arrayIndex];
823   }
824   else {
825     destinationPe = 
826       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
827     isCachedArrayMetadata_[arrayIndex] = true;
828     destinationPes_[arrayIndex] = destinationPe;
829   }
830 #else 
831   destinationPe = 
832     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
833 #endif
834
835   ArrayDataItem<dtype, itype> packedDataItem;
836     if (destinationPe == CkMyPe()) {
837       // copying here is necessary - user code should not be 
838       // passed back a reference to the original item
839       packedDataItem.arrayIndex = arrayIndex; 
840       packedDataItem.dataItem = dataItem;
841       localDeliver(packedDataItem);
842       return;
843     }
844
845     // this implementation avoids copying an item before transfer into message
846
847     DataItemHandle tempHandle; 
848     tempHandle.arrayIndex = arrayIndex; 
849     tempHandle.dataItem = &dataItem;
850
851     MeshStreamer<ArrayDataItem<dtype, itype> >::
852      insertData(&tempHandle, destinationPe);
853
854   }
855
856   int copyDataItemIntoMessage(
857       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
858       void *dataItemHandle, bool copyIndirectly) {
859
860     if (copyIndirectly == true) {
861       // newly inserted items are passed through a handle to avoid copying
862       int numDataItems = destinationBuffer->numDataItems;
863       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
864       (destinationBuffer->data)[numDataItems].dataItem = 
865         *(tempHandle->dataItem);
866       (destinationBuffer->data)[numDataItems].arrayIndex = 
867         tempHandle->arrayIndex;
868       return ++destinationBuffer->numDataItems;
869     }
870     else {
871       // this is an item received along the route to destination
872       // we can copy it from the received message
873       return MeshStreamer<ArrayDataItem<dtype, itype> >::
874               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
875     }
876   }
877
878 };
879
880 #define CK_TEMPLATES_ONLY
881 #include "NDMeshStreamer.def.h"
882 #undef CK_TEMPLATES_ONLY
883
884 #endif