10d3330476044d8cae3426b93bc25a545e9edf4b
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // allocate more total buffer space than the maximum buffering limit but flush 
11 //   upon reaching totalBufferCapacity_
12 #define BUFFER_SIZE_FACTOR 4
13
14 // #define DEBUG_STREAMER
15 // #define CACHE_LOCATIONS
16 // #define SUPPORT_INCOMPLETE_MESH
17 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
18
19 struct MeshLocation {
20   int dimension; 
21   int bufferIndex; 
22 }; 
23
24 template<class dtype>
25 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
26 public:
27     int numDataItems;
28     int *destinationPes;
29     dtype *data;
30
31     MeshStreamerMessage(): numDataItems(0) {}   
32
33     int addDataItem(const dtype &dataItem) {
34         data[numDataItems] = dataItem;
35         return ++numDataItems; 
36     }
37
38     void markDestination(const int index, const int destinationPe) {
39         destinationPes[index] = destinationPe;
40     }
41
42     dtype &getDataItem(const int index) {
43         return data[index];
44     }
45 };
46
47 template <class dtype>
48 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
49  private:
50   CompletionDetector *detectorLocalObj_;
51  public:
52   MeshStreamerArrayClient(){}
53   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
54   // would like to make it pure virtual but charm will try to
55   // instantiate the abstract class, leading to errors
56   virtual void process(dtype &data) {
57     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
58             "This virtual function should have been defined by the user.\n");
59   };     
60   void setDetector(CompletionDetector *detectorLocalObj) {
61     detectorLocalObj_ = detectorLocalObj;
62   }
63   void receiveRedeliveredItem(dtype data) {
64     //    CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
65     detectorLocalObj_->consume();
66     process(data);
67   }
68
69   void pup(PUP::er &p) {
70     CBase_MeshStreamerArrayClient<dtype>::pup(p);
71    }  
72
73 };
74
75 template <class dtype>
76 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
77
78  private:
79   CompletionDetector *detectorLocalObj_;
80
81  public:
82   virtual void process(dtype &data) = 0;
83   void setDetector(CompletionDetector *detectorLocalObj) {
84     detectorLocalObj_ = detectorLocalObj;
85   }
86   virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg) {
87     for (int i = 0; i < msg->numDataItems; i++) {
88       dtype &data = msg->getDataItem(i);
89       process(data);
90     }
91     detectorLocalObj_->consume(msg->numDataItems);
92     delete msg;
93   }
94 };
95
96 template <class dtype>
97 class MeshStreamer : public CBase_MeshStreamer<dtype> {
98
99 private:
100     int bufferSize_; 
101     int totalBufferCapacity_;
102     int numDataItemsBuffered_;
103
104     int numMembers_; 
105     int numDimensions_;
106     int *individualDimensionSizes_;
107     int *combinedDimensionSizes_;
108
109     int myIndex_;
110     int *myLocationIndex_;
111
112     CkCallback   userCallback_;
113     bool yieldFlag_;
114
115     double progressPeriodInMs_; 
116     bool isPeriodicFlushEnabled_; 
117     bool hasSentRecently_;
118
119     MeshStreamerMessage<dtype> ***dataBuffers_;
120
121     CProxy_CompletionDetector detector_;
122     int prio_;
123     int yieldCount_;
124
125 #ifdef CACHE_LOCATIONS
126     MeshLocation *cachedLocations_;
127     bool *isCached_; 
128 #endif
129
130     MeshLocation determineLocation(int destinationPe);
131
132     void storeMessage(int destinationPe, 
133                       const MeshLocation &destinationCoordinates, 
134                       void *dataItem, bool copyIndirectly = false);
135
136     virtual void deliverToDestination(
137                  int destinationPe, 
138                  MeshStreamerMessage<dtype> *destinationBuffer) = 0;
139
140     virtual void localDeliver(dtype &dataItem) = 0; 
141
142     virtual int numElementsInClient() = 0;
143
144     virtual void initLocalClients() = 0;
145
146     void flushLargestBuffer();
147     void flushToIntermediateDestinations();
148
149 protected:
150
151     CompletionDetector *detectorLocalObj_;
152     virtual int copyDataItemIntoMessage(
153                 MeshStreamerMessage<dtype> *destinationBuffer, 
154                 void *dataItemHandle, bool copyIndirectly = false);
155
156 public:
157
158     MeshStreamer(int totalBufferCapacity, int numDimensions, 
159                  int *dimensionSies,
160                  bool yieldFlag = 0, double progressPeriodInMs = -1.0);
161     ~MeshStreamer();
162
163       // entry
164     void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
165     void flushDirect();
166     void finish();
167
168     // non entry
169     bool isPeriodicFlushEnabled() {
170       return isPeriodicFlushEnabled_;
171     }
172     virtual void insertData(dtype &dataItem, int destinationPe); 
173     void insertData(void *dataItemHandle, int destinationPe);
174     void associateCallback(int numContributors, 
175                            CkCallback startCb, CkCallback endCb, 
176                            CProxy_CompletionDetector detector,
177                            int prio);
178     void flushAllBuffers();
179     void registerPeriodicProgressFunction();
180
181     // flushing begins only after enablePeriodicFlushing has been invoked
182
183     void enablePeriodicFlushing(){
184       isPeriodicFlushEnabled_ = true; 
185       registerPeriodicProgressFunction();
186     }
187
188     void done(int numContributorsFinished = 1) {
189       detectorLocalObj_->done(numContributorsFinished);
190     }
191
192 };
193
194 template <class dtype>
195 MeshStreamer<dtype>::MeshStreamer(
196                      int totalBufferCapacity, int numDimensions, 
197                      int *dimensionSizes, 
198                      bool yieldFlag, 
199                      double progressPeriodInMs)
200  :numDimensions_(numDimensions), 
201   totalBufferCapacity_(totalBufferCapacity), 
202   yieldFlag_(yieldFlag), 
203   progressPeriodInMs_(progressPeriodInMs)
204 {
205   // limit total number of messages in system to totalBufferCapacity
206   //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
207   //   advantage of nonuniform filling of buffers
208
209   int sumAlongAllDimensions = 0;   
210   individualDimensionSizes_ = new int[numDimensions_];
211   combinedDimensionSizes_ = new int[numDimensions_ + 1];
212   myLocationIndex_ = new int[numDimensions_];
213   memcpy(individualDimensionSizes_, dimensionSizes, 
214          numDimensions * sizeof(int)); 
215   combinedDimensionSizes_[0] = 1; 
216   for (int i = 0; i < numDimensions; i++) {
217     sumAlongAllDimensions += individualDimensionSizes_[i];
218     combinedDimensionSizes_[i + 1] = 
219       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
220   }
221
222   // except for personalized messages, the buffers for dimensions with the 
223   //   same index as the sender's are not used
224   bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
225     / (sumAlongAllDimensions - numDimensions_ + 1); 
226   if (bufferSize_ <= 0) {
227     bufferSize_ = 1; 
228     CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
229              "is invalid. Defaulting to a single buffer per destination.\n");
230   }
231   totalBufferCapacity_ = totalBufferCapacity;
232   numDataItemsBuffered_ = 0; 
233   numMembers_ = CkNumPes(); 
234
235   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
236   for (int i = 0; i < numDimensions; i++) {
237     int numMembersAlongDimension = individualDimensionSizes_[i]; 
238     dataBuffers_[i] = 
239       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
240     for (int j = 0; j < numMembersAlongDimension; j++) {
241       dataBuffers_[i][j] = NULL;
242     }
243   }
244
245   myIndex_ = CkMyPe();
246   int remainder = myIndex_;
247   for (int i = numDimensions_ - 1; i >= 0; i--) {    
248     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
249     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
250   }
251
252   isPeriodicFlushEnabled_ = false; 
253   detectorLocalObj_ = NULL;
254
255 #ifdef CACHE_LOCATIONS
256   cachedLocations_ = new MeshLocation[numMembers_];
257   isCached_ = new bool[numMembers_];
258   std::fill(isCached_, isCached_ + numMembers_, false);
259 #endif
260
261 }
262
263 template <class dtype>
264 MeshStreamer<dtype>::~MeshStreamer() {
265
266   for (int i = 0; i < numDimensions_; i++) {
267     for (int j=0; j < individualDimensionSizes_[i]; j++) {
268       delete[] dataBuffers_[i][j]; 
269     }
270     delete[] dataBuffers_[i]; 
271   }
272
273   delete[] individualDimensionSizes_;
274   delete[] combinedDimensionSizes_; 
275   delete[] myLocationIndex_;
276
277 #ifdef CACHE_LOCATIONS
278   delete[] cachedLocations_;
279   delete[] isCached_; 
280 #endif
281
282 }
283
284
285 template <class dtype>
286 inline
287 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
288
289 #ifdef CACHE_LOCATIONS
290   if (isCached_[destinationPe]) {    
291     return cachedLocations_[destinationPe]; 
292   }
293 #endif
294
295   MeshLocation destinationLocation;
296   int remainder = destinationPe;
297   int dimensionIndex; 
298   for (int i = numDimensions_ - 1; i >= 0; i--) {        
299     dimensionIndex = remainder / combinedDimensionSizes_[i];
300     
301     if (dimensionIndex != myLocationIndex_[i]) {
302       destinationLocation.dimension = i; 
303       destinationLocation.bufferIndex = dimensionIndex; 
304 #ifdef CACHE_LOCATIONS
305       cachedLocations_[destinationPe] = destinationLocation;
306       isCached_[destinationPe] = true; 
307 #endif
308       return destinationLocation;
309     }
310
311     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
312   }
313
314   // all indices agree - message to oneself
315   destinationLocation.dimension = 0; 
316   destinationLocation.bufferIndex = myLocationIndex_[0];
317   return destinationLocation; 
318 }
319
320 template <class dtype>
321 inline 
322 int MeshStreamer<dtype>::copyDataItemIntoMessage(
323                          MeshStreamerMessage<dtype> *destinationBuffer,
324                          void *dataItemHandle, bool copyIndirectly) {
325   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
326 }
327
328 template <class dtype>
329 inline
330 void MeshStreamer<dtype>::storeMessage(
331                           int destinationPe, 
332                           const MeshLocation& destinationLocation,
333                           void *dataItem, bool copyIndirectly) {
334
335   int dimension = destinationLocation.dimension;
336   int bufferIndex = destinationLocation.bufferIndex; 
337   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
338
339
340
341   // allocate new message if necessary
342   if (messageBuffers[bufferIndex] == NULL) {
343     if (dimension == 0) {
344       // personalized messages do not require destination indices
345       messageBuffers[bufferIndex] = 
346         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
347     }
348     else {
349       messageBuffers[bufferIndex] = 
350         new (bufferSize_, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
351     }
352     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
353     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
354 #ifdef DEBUG_STREAMER
355     CkAssert(messageBuffers[bufferIndex] != NULL);
356 #endif
357   }
358   
359   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
360   int numBuffered = 
361     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
362   if (dimension != 0) {
363     destinationBuffer->markDestination(numBuffered-1, destinationPe);
364   }  
365   numDataItemsBuffered_++;
366
367   // send if buffer is full
368   if (numBuffered == bufferSize_) {
369
370     int destinationIndex;
371
372     destinationIndex = myIndex_ + 
373       (bufferIndex - myLocationIndex_[dimension]) * 
374       combinedDimensionSizes_[dimension];
375
376     if (dimension == 0) {
377       deliverToDestination(destinationIndex, destinationBuffer);
378     }
379     else {
380       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
381     }
382
383     messageBuffers[bufferIndex] = NULL;
384     numDataItemsBuffered_ -= numBuffered; 
385     hasSentRecently_ = true; 
386
387   }
388   // send if total buffering capacity has been reached
389   else if (numDataItemsBuffered_ == totalBufferCapacity_) {
390     flushLargestBuffer();
391     hasSentRecently_ = true; 
392   }
393
394 }
395
396 template <class dtype>
397 inline
398 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
399   const static bool copyIndirectly = true;
400
401   MeshLocation destinationLocation = determineLocation(destinationPe);
402   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
403                copyIndirectly); 
404   // release control to scheduler if requested by the user, 
405   //   assume caller is threaded entry
406   if (yieldFlag_ && ++yieldCount_ == 1024) {
407     yieldCount_ = 0; 
408     CthYield();
409   }
410
411 }
412
413 template <class dtype>
414 inline
415 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
416
417   detectorLocalObj_->produce();
418   if (destinationPe == CkMyPe()) {
419     // copying here is necessary - user code should not be 
420     // passed back a reference to the original item
421     dtype dataItemCopy = dataItem;
422     localDeliver(dataItemCopy);
423     return;
424   }
425
426   insertData((void *) &dataItem, destinationPe);
427 }
428
429 template <class dtype>
430 void MeshStreamer<dtype>::associateCallback(
431                           int numContributors,
432                           CkCallback startCb, CkCallback endCb, 
433                           CProxy_CompletionDetector detector, 
434                           int prio) {
435   yieldCount_ = 0; 
436   prio_ = prio;
437   userCallback_ = endCb; 
438   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushDirect(), 
439                      this->thisProxy);
440   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
441                     this->thisProxy);
442   detector_ = detector;      
443   detectorLocalObj_ = detector_.ckLocalBranch();
444   initLocalClients();
445
446   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, finish , 0);
447   
448   if (progressPeriodInMs_ <= 0) {
449     CkPrintf("Using completion detection in NDMeshStreamer requires"
450              " setting a valid periodic flush period. Defaulting"
451              " to 10 ms\n");
452     progressPeriodInMs_ = 10;
453   }
454   
455   hasSentRecently_ = false; 
456   enablePeriodicFlushing();
457       
458 }
459
460 template <class dtype>
461 void MeshStreamer<dtype>::finish() {
462   isPeriodicFlushEnabled_ = false; 
463
464   if (!userCallback_.isInvalid()) {
465     this->contribute(userCallback_);
466     userCallback_ = CkCallback();      // nullify the current callback
467   }
468
469 }
470
471 template <class dtype>
472 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
473
474   int destinationPe, lastDestinationPe; 
475   MeshLocation destinationLocation;
476
477   lastDestinationPe = -1;
478   for (int i = 0; i < msg->numDataItems; i++) {
479     destinationPe = msg->destinationPes[i];
480     dtype &dataItem = msg->getDataItem(i);
481     if (destinationPe == CkMyPe()) {
482       localDeliver(dataItem);
483     }
484     else {
485       if (destinationPe != lastDestinationPe) {
486         // do this once per sequence of items with the same destination
487         destinationLocation = determineLocation(destinationPe);
488       }
489       storeMessage(destinationPe, destinationLocation, &dataItem);   
490     }
491     lastDestinationPe = destinationPe; 
492   }
493
494   delete msg;
495
496 }
497
498 template <class dtype>
499 void MeshStreamer<dtype>::flushLargestBuffer() {
500
501   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
502   MeshStreamerMessage<dtype> ** messageBuffers; 
503   MeshStreamerMessage<dtype> *destinationBuffer; 
504
505   for (int i = 0; i < numDimensions_; i++) {
506
507     messageBuffers = dataBuffers_[i]; 
508     numBuffers = individualDimensionSizes_[i]; 
509
510     flushDimension = i; 
511     maxSize = 0;    
512     for (int j = 0; j < numBuffers; j++) {
513       if (messageBuffers[j] != NULL && 
514           messageBuffers[j]->numDataItems > maxSize) {
515         maxSize = messageBuffers[j]->numDataItems;
516         flushIndex = j; 
517       }
518     }
519
520     if (maxSize > 0) {
521
522       messageBuffers = dataBuffers_[flushDimension]; 
523       destinationBuffer = messageBuffers[flushIndex];
524       destinationIndex = myIndex_ + 
525         (flushIndex - myLocationIndex_[flushDimension]) * 
526         combinedDimensionSizes_[flushDimension] ;
527
528       if (destinationBuffer->numDataItems < bufferSize_) {
529         // not sending the full buffer, shrink the message size
530         envelope *env = UsrToEnv(destinationBuffer);
531         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
532                           (bufferSize_ - destinationBuffer->numDataItems));
533         *((int *) env->getPrioPtr()) = prio_;
534       }
535       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
536
537       if (flushDimension == 0) {
538         deliverToDestination(destinationIndex, destinationBuffer);
539       }
540       else {
541         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
542       }
543       messageBuffers[flushIndex] = NULL;
544
545     }
546
547   }
548 }
549
550 template <class dtype>
551 void MeshStreamer<dtype>::flushAllBuffers() {
552
553   MeshStreamerMessage<dtype> **messageBuffers; 
554   int numBuffers; 
555
556   for (int i = 0; i < numDimensions_; i++) {
557
558     messageBuffers = dataBuffers_[i]; 
559     numBuffers = individualDimensionSizes_[i]; 
560
561     for (int j = 0; j < numBuffers; j++) {
562
563       if(messageBuffers[j] == NULL) {
564         continue;
565       }
566
567       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
568
569       if (i == 0) {
570         int destinationPe = myIndex_ + j - myLocationIndex_[i];
571         deliverToDestination(destinationPe, messageBuffers[j]);
572       }  
573       else {
574
575         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
576
577           MeshStreamerMessage<dtype> *directMsg = 
578             new (0, 1, sizeof(int)) MeshStreamerMessage<dtype>();
579           *(int *) CkPriorityPtr(directMsg) = prio_;
580           CkSetQueueing(directMsg, CK_QUEUEING_IFIFO);
581
582 #ifdef DEBUG_STREAMER
583           CkAssert(directMsg != NULL);
584 #endif
585           int destinationPe = messageBuffers[j]->destinationPes[k]; 
586           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
587           directMsg->addDataItem(dataItem);
588           deliverToDestination(destinationPe,directMsg);
589         }
590         delete messageBuffers[j];
591       }
592       messageBuffers[j] = NULL;
593     }
594   }
595 }
596
597 template <class dtype>
598 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
599
600   MeshStreamerMessage<dtype> **messageBuffers; 
601   MeshStreamerMessage<dtype> *destinationBuffer; 
602   int destinationIndex, numBuffers; 
603
604   for (int i = 0; i < numDimensions_; i++) {
605
606     messageBuffers = dataBuffers_[i]; 
607     numBuffers = individualDimensionSizes_[i]; 
608
609     for (int j = 0; j < numBuffers; j++) {
610
611       if(messageBuffers[j] == NULL) {
612         continue;
613       }
614
615       messageBuffers = dataBuffers_[i]; 
616       destinationBuffer = messageBuffers[j];
617       destinationIndex = myIndex_ + 
618         (j - myLocationIndex_[i]) * 
619         combinedDimensionSizes_[i] ;
620
621       if (destinationBuffer->numDataItems < bufferSize_) {
622         // not sending the full buffer, shrink the message size
623         envelope *env = UsrToEnv(destinationBuffer);
624         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
625                           (bufferSize_ - destinationBuffer->numDataItems));
626         *((int *) env->getPrioPtr()) = prio_;
627       }
628       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
629
630       if (i == 0) {
631         deliverToDestination(destinationIndex, destinationBuffer);
632       }
633       else {
634         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
635       }
636       messageBuffers[j] = NULL;
637     }
638   }
639 }
640
641
642
643 template <class dtype>
644 void MeshStreamer<dtype>::flushDirect(){
645
646   // flush if (1) this is not a periodic call or 
647   //          (2) this is a periodic call and no sending took place
648   //              since the last time the function was invoked
649   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
650
651     if (numDataItemsBuffered_ != 0) {
652       flushAllBuffers();
653     }    
654 #ifdef DEBUG_STREAMER
655     CkAssert(numDataItemsBuffered_ == 0); 
656 #endif
657     
658   }
659
660   hasSentRecently_ = false; 
661
662 }
663
664 template <class dtype>
665 void periodicProgressFunction(void *MeshStreamerObj, double time) {
666
667   MeshStreamer<dtype> *properObj = 
668     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
669
670   if (properObj->isPeriodicFlushEnabled()) {
671     properObj->flushDirect();
672     properObj->registerPeriodicProgressFunction();
673   }
674 }
675
676 template <class dtype>
677 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
678   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
679                  progressPeriodInMs_); 
680 }
681
682
683 template <class dtype>
684 class GroupMeshStreamer : public MeshStreamer<dtype> {
685 private:
686
687   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
688   MeshStreamerGroupClient<dtype> *clientObj_;
689
690   void deliverToDestination(int destinationPe, 
691                             MeshStreamerMessage<dtype> *destinationBuffer) {
692     clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
693   }
694
695   void localDeliver(dtype &dataItem) {
696     clientObj_->process(dataItem);
697     MeshStreamer<dtype>::detectorLocalObj_->consume();
698   }
699
700   int numElementsInClient() {
701     // client is a group - there is one element per PE
702     return CkNumPes();
703   }
704
705   void initLocalClients() {
706     clientObj_->setDetector(MeshStreamer<dtype>::detectorLocalObj_);
707   }
708
709 public:
710
711   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
712                     int *dimensionSizes, 
713                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
714                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
715    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
716                          yieldFlag, progressPeriodInMs) 
717   {
718     clientProxy_ = clientProxy; 
719     clientObj_ = 
720       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
721   }
722
723 };
724
725 template <class dtype>
726 class MeshStreamerClientIterator : public CkLocIterator {
727
728 public:
729   
730   CompletionDetector *detectorLocalObj_;
731   CkArray *clientArrMgr_;
732   MeshStreamerClientIterator(CompletionDetector *detectorObj, 
733                              CkArray *clientArrMgr) 
734     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
735
736   // CkLocMgr::iterate will call addLocation on all elements local to this PE
737   void addLocation(CkLocation &loc) {
738
739     MeshStreamerArrayClient<dtype> *clientObj = 
740       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
741
742     CkAssert(clientObj != NULL); 
743     clientObj->setDetector(detectorLocalObj_); 
744   }
745
746 };
747
748 template <class dtype, class itype>
749 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
750   
751 private:
752   
753   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
754   CkArray *clientArrayMgr_;
755   int numArrayElements_;
756 #ifdef CACHE_ARRAY_METADATA
757   MeshStreamerArrayClient<dtype> **clientObjs_;
758   int *destinationPes_;
759   bool *isCachedArrayMetadata_;
760 #endif
761
762   void deliverToDestination(
763        int destinationPe, 
764        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *destinationBuffer) {
765
766     CProxy_ArrayMeshStreamer<dtype, itype> myProxy(this->thisProxy); 
767     myProxy[destinationPe].receiveArrayData(destinationBuffer);
768   }
769
770   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
771     itype arrayId = packedDataItem.arrayIndex; 
772
773     MeshStreamerArrayClient<dtype> *clientObj;
774 #ifdef CACHE_ARRAY_METADATA
775     clientObj = clientObjs_[arrayId];
776 #else
777     clientObj = clientProxy_[arrayId].ckLocal();
778 #endif
779
780     if (clientObj != NULL) {
781       clientObj->process(packedDataItem.dataItem);
782       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
783     }
784     else { 
785       // array element is no longer present locally - redeliver using proxy
786       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
787     }
788   }
789
790   int numElementsInClient() {
791     return numArrayElements_;
792   }
793
794   void initLocalClients() {
795
796 #ifdef CACHE_ARRAY_METADATA
797     std::fill(isCachedArrayMetadata_, 
798               isCachedArrayMetadata_ + numArrayElements_, false);
799
800     for (int i = 0; i < numArrayElements_; i++) {
801       clientObjs_[i] = clientProxy_[i].ckLocal();
802       if (clientObjs_[i] != NULL) {
803         clientObjs_[i]->setDetector(
804          MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
805       }
806     }
807 #else
808     // set completion detector in local elements of the client
809     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
810     MeshStreamerClientIterator<dtype> clientIterator(
811      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
812      clientProxy_.ckLocalBranch());
813     clientLocMgr->iterate(clientIterator);
814 #endif    
815
816   }
817
818 public:
819
820   struct DataItemHandle {
821     itype arrayIndex; 
822     dtype *dataItem;
823   };
824
825   ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
826                     int *dimensionSizes, 
827                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
828                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
829     :MeshStreamer<ArrayDataItem<dtype, itype> >(
830       totalBufferCapacity, numDimensions, dimensionSizes, yieldFlag, 
831       progressPeriodInMs) 
832   {
833     clientProxy_ = clientProxy; 
834     clientArrayMgr_ = clientProxy_.ckLocalBranch();
835
836     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
837
838 #ifdef CACHE_ARRAY_METADATA
839     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
840     destinationPes_ = new int[numArrayElements_];
841     isCachedArrayMetadata_ = new bool[numArrayElements_];
842     std::fill(isCachedArrayMetadata_, 
843               isCachedArrayMetadata_ + numArrayElements_, false);
844 #endif
845   }
846
847   ~ArrayMeshStreamer() {
848 #ifdef CACHE_ARRAY_METADATA
849     delete [] clientObjs_;
850     delete [] destinationPes_;
851     delete [] isCachedArrayMetadata_; 
852 #endif
853   }
854
855   void receiveArrayData(
856        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
857     for (int i = 0; i < msg->numDataItems; i++) {
858       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
859       localDeliver(packedData);
860     }
861     delete msg;
862   }
863
864   void insertData(dtype &dataItem, itype arrayIndex) {
865
866     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
867     int destinationPe; 
868 #ifdef CACHE_ARRAY_METADATA
869   if (isCachedArrayMetadata_[arrayIndex]) {    
870     destinationPe =  destinationPes_[arrayIndex];
871   }
872   else {
873     destinationPe = 
874       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
875     isCachedArrayMetadata_[arrayIndex] = true;
876     destinationPes_[arrayIndex] = destinationPe;
877   }
878 #else 
879   destinationPe = 
880     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
881 #endif
882
883   ArrayDataItem<dtype, itype> packedDataItem;
884     if (destinationPe == CkMyPe()) {
885       // copying here is necessary - user code should not be 
886       // passed back a reference to the original item
887       packedDataItem.arrayIndex = arrayIndex; 
888       packedDataItem.dataItem = dataItem;
889       localDeliver(packedDataItem);
890       return;
891     }
892
893     // this implementation avoids copying an item before transfer into message
894
895     DataItemHandle tempHandle; 
896     tempHandle.arrayIndex = arrayIndex; 
897     tempHandle.dataItem = &dataItem;
898
899     MeshStreamer<ArrayDataItem<dtype, itype> >::
900      insertData(&tempHandle, destinationPe);
901
902   }
903
904   int copyDataItemIntoMessage(
905       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
906       void *dataItemHandle, bool copyIndirectly) {
907
908     if (copyIndirectly == true) {
909       // newly inserted items are passed through a handle to avoid copying
910       int numDataItems = destinationBuffer->numDataItems;
911       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
912       (destinationBuffer->data)[numDataItems].dataItem = 
913         *(tempHandle->dataItem);
914       (destinationBuffer->data)[numDataItems].arrayIndex = 
915         tempHandle->arrayIndex;
916       return ++destinationBuffer->numDataItems;
917     }
918     else {
919       // this is an item received along the route to destination
920       // we can copy it from the received message
921       return MeshStreamer<ArrayDataItem<dtype, itype> >::
922               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
923     }
924   }
925
926 };
927
928 #define CK_TEMPLATES_ONLY
929 #include "NDMeshStreamer.def.h"
930 #undef CK_TEMPLATES_ONLY
931
932 #endif