NDMeshStreamer: don't force the user to define process in case it is not neeed,
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define CACHE_LOCATIONS
17 // #define SUPPORT_INCOMPLETE_MESH
18 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
19 // #define STREAMER_VERBOSE_OUTPUT
20
21 #define TRAM_BROADCAST (-100)
22
23 struct MeshLocation {
24   int dimension; 
25   int bufferIndex; 
26 }; 
27
28 template<class dtype>
29 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
30
31 public:
32
33   int finalMsgCount; 
34   int dimension; 
35   int numDataItems;
36   int *destinationPes;
37   dtype *dataItems;
38
39   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
40     finalMsgCount = -1; 
41   }
42
43   int addDataItem(const dtype& dataItem) {
44     dataItems[numDataItems] = dataItem;
45     return ++numDataItems; 
46   }
47
48   void markDestination(const int index, const int destinationPe) {
49     destinationPes[index] = destinationPe;
50   }
51
52   const dtype& getDataItem(const int index) {
53     return dataItems[index];
54   }
55
56 };
57
58 template <class dtype>
59 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
60 private:
61   CompletionDetector *detectorLocalObj_;
62 public:
63   MeshStreamerArrayClient(){
64     detectorLocalObj_ = NULL; 
65   }
66   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
67   // would like to make it pure virtual but charm will try to
68   // instantiate the abstract class, leading to errors
69   virtual void process(const dtype& data) {
70     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
71             "This virtual function should have been defined by the user.\n");
72   }     
73   void setDetector(CompletionDetector *detectorLocalObj) {
74     detectorLocalObj_ = detectorLocalObj;
75   }
76   void receiveRedeliveredItem(dtype data) {
77 #ifdef STREAMER_VERBOSE_OUTPUT
78     CkPrintf("[%d] redelivered to index %d\n", 
79              CkMyPe(), this->thisIndex.data[0]);
80 #endif
81     if (detectorLocalObj_ != NULL) {
82       detectorLocalObj_->consume();
83     }
84     process(data);
85   }
86
87   void pup(PUP::er& p) {
88     CBase_MeshStreamerArrayClient<dtype>::pup(p);
89    }  
90
91 };
92
93 template <class dtype>
94 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
95
96 public:
97   virtual void process(const dtype& data) {
98     CkAbort("Error. MeshStreamerGroupClient::process() is being called. "
99             "This virtual function should have been defined by the user.\n");
100   }     
101
102   virtual void receiveArray(dtype *data, int numItems, int sourcePe) {
103     for (int i = 0; i < numItems; i++) {
104       process(data[i]);
105     }
106   }
107 };
108
109 template <class dtype>
110 class MeshStreamer : public CBase_MeshStreamer<dtype> {
111
112 private:
113   int bufferSize_; 
114   int maxNumDataItemsBuffered_;
115   int numDataItemsBuffered_;
116
117   int numMembers_; 
118   int *individualDimensionSizes_;
119   int *combinedDimensionSizes_;
120
121   int *startingIndexAtDimension_; 
122
123   int myIndex_;
124   int *myLocationIndex_;
125
126   CkCallback   userCallback_;
127   bool yieldFlag_;
128
129   double progressPeriodInMs_; 
130   bool isPeriodicFlushEnabled_; 
131   bool hasSentRecently_;
132   MeshStreamerMessage<dtype> ***dataBuffers_;
133
134   CProxy_CompletionDetector detector_;
135   int prio_;
136   int yieldCount_;
137
138 #ifdef CACHE_LOCATIONS
139   MeshLocation *cachedLocations_;
140   bool *isCached_; 
141 #endif
142
143
144   // only used for staged completion
145   int **cntMsgSent_;
146   int *cntMsgReceived_;
147   int *cntMsgExpected_;
148   int *cntFinished_; 
149   int dimensionToFlush_;
150   int numLocalDone_; 
151   int numLocalContributors_;
152
153   void storeMessage(int destinationPe, 
154                     const MeshLocation& destinationCoordinates, 
155                     const void *dataItem, bool copyIndirectly = false);
156   virtual void localDeliver(const dtype& dataItem) = 0; 
157   virtual void localBroadcast(const dtype& dataItem) = 0; 
158   virtual int numElementsInClient() = 0;
159   virtual int numLocalElementsInClient() = 0; 
160
161   virtual void initLocalClients() = 0;
162
163   void sendLargestBuffer();
164   void flushToIntermediateDestinations();
165   void flushDimension(int dimension, bool sendMsgCounts = false); 
166   MeshLocation determineLocation(int destinationPe, 
167                                  int dimensionReceivedAlong);
168
169 protected:
170
171   int numDimensions_;
172   bool useStagedCompletion_;
173   CompletionDetector *detectorLocalObj_;
174   virtual int copyDataItemIntoMessage(
175               MeshStreamerMessage<dtype> *destinationBuffer, 
176               const void *dataItemHandle, bool copyIndirectly = false);
177   void insertData(const void *dataItemHandle, int destinationPe);
178   void broadcast(const void *dataItemHandle, int dimension, 
179                  bool copyIndirectly);
180
181 public:
182
183   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
184                int *dimensionSizes, int bufferSize,
185                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
186   ~MeshStreamer();
187
188   // entry
189   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
190             int prio, bool usePeriodicFlushing);
191   void init(int numContributors, CkCallback startCb, CkCallback endCb, 
192             CProxy_CompletionDetector detector,
193             int prio, bool usePeriodicFlushing);
194   void init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb, 
195             int prio, bool usePeriodicFlushing);
196
197   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
198   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
199   void flushIfIdle();
200   void finish();
201
202   // non entry
203   bool isPeriodicFlushEnabled() {
204     return isPeriodicFlushEnabled_;
205   }
206   virtual void insertData(const dtype& dataItem, int destinationPe); 
207   virtual void broadcast(const dtype& dataItem); 
208   void registerPeriodicProgressFunction();
209
210   // flushing begins only after enablePeriodicFlushing has been invoked
211
212   void enablePeriodicFlushing(){
213     isPeriodicFlushEnabled_ = true; 
214     registerPeriodicProgressFunction();
215   }
216
217   void done(int numContributorsFinished = 1) {
218
219     if (useStagedCompletion_) {
220       numLocalDone_ += numContributorsFinished; 
221       if (numLocalDone_ == numLocalContributors_) {
222         startStagedCompletion();
223       }
224     }
225     else {
226       detectorLocalObj_->done(numContributorsFinished);
227     }
228   }
229   
230   bool stagedCompletionStarted() {    
231     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
232   }
233
234   void startStagedCompletion() {          
235     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
236       flushDimension(dimensionToFlush_, true);
237     }
238     dimensionToFlush_--;
239
240     checkForCompletedStages();
241   }
242
243   void markMessageReceived(int dimension, int finalCount) {
244     cntMsgReceived_[dimension]++;
245     if (finalCount != -1) {
246       cntFinished_[dimension]++; 
247       cntMsgExpected_[dimension] += finalCount; 
248 #ifdef STREAMER_VERBOSE_OUTPUT
249       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
250                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
251                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
252                cntMsgReceived_[dimension]); 
253 #endif
254     }  
255     if (dimensionToFlush_ != numDimensions_ - 1) {
256       checkForCompletedStages();
257     }
258   }
259
260   void checkForCompletedStages() {
261
262     while (cntFinished_[dimensionToFlush_ + 1] == 
263            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
264            cntMsgExpected_[dimensionToFlush_ + 1] == 
265            cntMsgReceived_[dimensionToFlush_ + 1]) {
266       if (dimensionToFlush_ == -1) {
267 #ifdef STREAMER_VERBOSE_OUTPUT
268         CkPrintf("[%d] contribute\n", CkMyPe()); 
269 #endif
270         CkAssert(numDataItemsBuffered_ == 0); 
271         isPeriodicFlushEnabled_ = false; 
272         if (!userCallback_.isInvalid()) {
273           this->contribute(userCallback_);
274           userCallback_ = CkCallback();
275         }
276         return; 
277       }
278       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
279         flushDimension(dimensionToFlush_, true); 
280       }
281       dimensionToFlush_--; 
282     }
283   }
284
285 };
286
287 template <class dtype>
288 MeshStreamer<dtype>::MeshStreamer(
289                      int maxNumDataItemsBuffered, int numDimensions, 
290                      int *dimensionSizes, 
291                      int bufferSize,
292                      bool yieldFlag, 
293                      double progressPeriodInMs)
294  :numDimensions_(numDimensions), 
295   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
296   yieldFlag_(yieldFlag), 
297   progressPeriodInMs_(progressPeriodInMs), 
298   bufferSize_(bufferSize)
299 {
300
301   int sumAlongAllDimensions = 0;   
302   individualDimensionSizes_ = new int[numDimensions_];
303   combinedDimensionSizes_ = new int[numDimensions_ + 1];
304   myLocationIndex_ = new int[numDimensions_];
305   startingIndexAtDimension_ = new int[numDimensions_ + 1]; 
306   memcpy(individualDimensionSizes_, dimensionSizes, 
307          numDimensions * sizeof(int)); 
308   combinedDimensionSizes_[0] = 1; 
309   for (int i = 0; i < numDimensions; i++) {
310     sumAlongAllDimensions += individualDimensionSizes_[i];
311     combinedDimensionSizes_[i + 1] = 
312       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
313   }
314
315   CkAssert(combinedDimensionSizes_[numDimensions] == CkNumPes()); 
316
317   // a bufferSize input of 0 indicates it should be calculated by the library
318   if (bufferSize_ == 0) {
319     CkAssert(maxNumDataItemsBuffered_ > 0);
320     // buffers for dimensions with the 
321     //   same index as the sender's are not allocated/used
322     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
323       / (sumAlongAllDimensions - numDimensions_ + 1); 
324   }
325   else {
326     maxNumDataItemsBuffered_ = 
327       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1) 
328       / OVERALLOCATION_FACTOR;
329   }
330
331   if (bufferSize_ <= 0) {
332     bufferSize_ = 1; 
333     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
334              "is invalid. Defaulting to a single buffer per destination.\n");
335   }
336   numDataItemsBuffered_ = 0; 
337   numMembers_ = CkNumPes(); 
338
339   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
340   for (int i = 0; i < numDimensions; i++) {
341     int numMembersAlongDimension = individualDimensionSizes_[i]; 
342     dataBuffers_[i] = 
343       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
344     for (int j = 0; j < numMembersAlongDimension; j++) {
345       dataBuffers_[i][j] = NULL;
346     }
347   }
348
349   myIndex_ = CkMyPe();
350   int remainder = myIndex_;
351   startingIndexAtDimension_[numDimensions_] = 0;
352   for (int i = numDimensions_ - 1; i >= 0; i--) {    
353     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
354     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
355     remainder -= dimensionOffset; 
356     startingIndexAtDimension_[i] = 
357       startingIndexAtDimension_[i+1] + dimensionOffset; 
358   }
359
360   isPeriodicFlushEnabled_ = false; 
361   detectorLocalObj_ = NULL;
362
363 #ifdef CACHE_LOCATIONS
364   cachedLocations_ = new MeshLocation[numMembers_];
365   isCached_ = new bool[numMembers_];
366   std::fill(isCached_, isCached_ + numMembers_, false);
367 #endif
368
369   cntMsgSent_ = NULL; 
370   cntMsgReceived_ = NULL; 
371   cntMsgExpected_ = NULL; 
372   cntFinished_ = NULL; 
373
374 }
375
376 template <class dtype>
377 MeshStreamer<dtype>::~MeshStreamer() {
378
379   for (int i = 0; i < numDimensions_; i++) {
380     for (int j=0; j < individualDimensionSizes_[i]; j++) {
381       delete[] dataBuffers_[i][j]; 
382     }
383     delete[] dataBuffers_[i]; 
384   }
385
386   delete[] individualDimensionSizes_;
387   delete[] combinedDimensionSizes_; 
388   delete[] myLocationIndex_;
389   delete[] startingIndexAtDimension_;
390
391 #ifdef CACHE_LOCATIONS
392   delete[] cachedLocations_;
393   delete[] isCached_; 
394 #endif
395
396   if (cntMsgSent_ != NULL) {
397     for (int i = 0; i < numDimensions_; i++) {
398       delete[] cntMsgSent_[i]; 
399     }
400     delete[] cntMsgSent_; 
401     delete[] cntMsgReceived_; 
402     delete[] cntMsgExpected_; 
403     delete[] cntFinished_;
404   }
405
406 }
407
408 template <class dtype>
409 inline
410 MeshLocation MeshStreamer<dtype>::determineLocation(
411                                   int destinationPe, 
412                                   int dimensionReceivedAlong) {
413
414 #ifdef CACHE_LOCATIONS
415   if (isCached_[destinationPe]) {    
416     return cachedLocations_[destinationPe]; 
417   }
418 #endif
419
420   MeshLocation destinationLocation;
421   int remainder = 
422     destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
423   int dimensionIndex; 
424   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {        
425     dimensionIndex = remainder / combinedDimensionSizes_[i];
426     
427     if (dimensionIndex != myLocationIndex_[i]) {
428       destinationLocation.dimension = i; 
429       destinationLocation.bufferIndex = dimensionIndex; 
430 #ifdef CACHE_LOCATIONS
431       cachedLocations_[destinationPe] = destinationLocation;
432       isCached_[destinationPe] = true; 
433 #endif
434       return destinationLocation;
435     }
436
437     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
438   }
439
440   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
441           "equal to sender's PE. This is unexpected and may cause errors.\n"); 
442   // to prevent warnings
443   return destinationLocation; 
444 }
445
446 template <class dtype>
447 inline 
448 int MeshStreamer<dtype>::copyDataItemIntoMessage(
449                          MeshStreamerMessage<dtype> *destinationBuffer,
450                          const void *dataItemHandle, bool copyIndirectly) {
451   return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle)); 
452 }
453
454 template <class dtype>
455 inline
456 void MeshStreamer<dtype>::storeMessage(
457                           int destinationPe, 
458                           const MeshLocation& destinationLocation,
459                           const void *dataItem, bool copyIndirectly) {
460
461   int dimension = destinationLocation.dimension;
462   int bufferIndex = destinationLocation.bufferIndex; 
463   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
464
465   // allocate new message if necessary
466   if (messageBuffers[bufferIndex] == NULL) {
467     if (dimension == 0) {
468       // personalized messages do not require destination indices
469       messageBuffers[bufferIndex] = 
470         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
471     }
472     else {
473       messageBuffers[bufferIndex] = 
474         new (bufferSize_, bufferSize_, sizeof(int)) 
475         MeshStreamerMessage<dtype>(dimension);
476     }
477     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
478     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
479     CkAssert(messageBuffers[bufferIndex] != NULL);
480   }
481   
482   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
483   int numBuffered = 
484     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
485   if (dimension != 0) {
486     destinationBuffer->markDestination(numBuffered-1, destinationPe);
487   }
488   numDataItemsBuffered_++;
489
490   // send if buffer is full
491   if (numBuffered == bufferSize_) {
492
493     int destinationIndex;
494
495     destinationIndex = myIndex_ + 
496       (bufferIndex - myLocationIndex_[dimension]) * 
497       combinedDimensionSizes_[dimension];
498
499     if (dimension == 0) {
500 #ifdef STREAMER_VERBOSE_OUTPUT
501       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
502 #endif
503       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
504     }
505     else {
506 #ifdef STREAMER_VERBOSE_OUTPUT
507       CkPrintf("[%d] sending intermediate to %d\n", 
508                CkMyPe(), destinationIndex); 
509 #endif
510       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
511     }
512
513     if (useStagedCompletion_) {
514       cntMsgSent_[dimension][bufferIndex]++; 
515     }
516
517     messageBuffers[bufferIndex] = NULL;
518     numDataItemsBuffered_ -= numBuffered; 
519     hasSentRecently_ = true; 
520
521   }
522   // send if total buffering capacity has been reached
523   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
524     sendLargestBuffer();
525     hasSentRecently_ = true; 
526   }
527
528 }
529 template <class dtype>
530 inline
531 void MeshStreamer<dtype>::broadcast(const dtype& dataItem) {
532   const static bool copyIndirectly = true;
533
534   // no data items should be submitted after all local contributors call done 
535   // and staged completion has begun
536   CkAssert(stagedCompletionStarted() == false);
537
538   // produce and consume once per PE
539   if (!useStagedCompletion_) {
540     detectorLocalObj_->produce(CkNumPes());
541   }
542
543   // deliver locally
544   localBroadcast(dataItem);
545
546   broadcast(&dataItem, numDimensions_ - 1, copyIndirectly); 
547 }
548
549 template <class dtype>
550 inline
551 void MeshStreamer<dtype>::broadcast(const void *dataItemHandle, int dimension, 
552                                     bool copyIndirectly) {
553
554   MeshLocation destinationLocation;
555   destinationLocation.dimension = dimension; 
556
557   while (destinationLocation.dimension != -1) {
558     for (int i = 0; 
559          i < individualDimensionSizes_[destinationLocation.dimension]; 
560          i++) {
561
562       if (i != myLocationIndex_[destinationLocation.dimension]) {
563         destinationLocation.bufferIndex = i; 
564         storeMessage(TRAM_BROADCAST, destinationLocation, 
565                      dataItemHandle, copyIndirectly); 
566       }
567       // release control to scheduler if requested by the user, 
568       //   assume caller is threaded entry
569       if (yieldFlag_ && ++yieldCount_ == 1024) {
570         yieldCount_ = 0; 
571         CthYield();
572       }
573     }
574     destinationLocation.dimension--; 
575   }
576
577 }
578
579
580 template <class dtype>
581 inline
582 void MeshStreamer<dtype>::insertData(const void *dataItemHandle, 
583                                      int destinationPe) {
584   const static bool copyIndirectly = true;
585
586   // treat newly inserted items as if they were received along
587   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
588   MeshLocation destinationLocation = determineLocation(destinationPe, 
589                                                        numDimensions_);
590   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
591                copyIndirectly); 
592   // release control to scheduler if requested by the user, 
593   //   assume caller is threaded entry
594   if (yieldFlag_ && ++yieldCount_ == 1024) {
595     yieldCount_ = 0; 
596     CthYield();
597   }
598
599 }
600
601 template <class dtype>
602 inline
603 void MeshStreamer<dtype>::insertData(const dtype& dataItem, int destinationPe) {
604
605   // no data items should be submitted after all local contributors call done 
606   // and staged completion has begun
607   CkAssert(stagedCompletionStarted() == false);
608
609   if (!useStagedCompletion_) {
610     detectorLocalObj_->produce();
611   }
612   if (destinationPe == CkMyPe()) {
613     localDeliver(dataItem);
614     return;
615   }
616
617   insertData((const void *) &dataItem, destinationPe);
618 }
619
620 template <class dtype>
621 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
622                                CkCallback endCb, int prio, 
623                                bool usePeriodicFlushing) {
624   useStagedCompletion_ = true; 
625   // allocate memory on first use
626   if (cntMsgSent_ == NULL) {
627     cntMsgSent_ = new int*[numDimensions_]; 
628     cntMsgReceived_ = new int[numDimensions_];
629     cntMsgExpected_ = new int[numDimensions_];
630     cntFinished_ = new int[numDimensions_]; 
631
632     for (int i = 0; i < numDimensions_; i++) {
633       cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
634     }
635   }
636
637
638   for (int i = 0; i < numDimensions_; i++) {
639     std::fill(cntMsgSent_[i], 
640               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
641     cntMsgReceived_[i] = 0;
642     cntMsgExpected_[i] = 0; 
643     cntFinished_[i] = 0; 
644   }
645   dimensionToFlush_ = numDimensions_ - 1;
646
647   yieldCount_ = 0; 
648   userCallback_ = endCb; 
649   prio_ = prio;
650
651   numLocalDone_ = 0; 
652   numLocalContributors_ = numLocalContributors; 
653   initLocalClients();
654
655   if (numLocalContributors_ == 0) {
656     startStagedCompletion();
657   }
658
659   hasSentRecently_ = false;
660   if (usePeriodicFlushing) {
661     enablePeriodicFlushing();
662   }
663
664   this->contribute(startCb);
665 }
666
667 template <class dtype>
668 void MeshStreamer<dtype>::init(int numContributors,       
669                                CkCallback startCb, CkCallback endCb,
670                                CProxy_CompletionDetector detector, 
671                                int prio, bool usePeriodicFlushing) {
672
673   useStagedCompletion_ = false; 
674   yieldCount_ = 0; 
675   prio_ = prio;
676   userCallback_ = endCb; 
677   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushIfIdle(), 
678                      this->thisProxy);
679   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
680                     this->thisProxy);
681   detector_ = detector;      
682   detectorLocalObj_ = detector_.ckLocalBranch();
683   initLocalClients();
684
685   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, 
686                                      finish , 0);
687   
688   if (progressPeriodInMs_ <= 0) {
689     CkPrintf("Using completion detection in NDMeshStreamer requires"
690              " setting a valid periodic flush period. Defaulting"
691              " to 10 ms\n");
692     progressPeriodInMs_ = 10;
693   }
694   
695   hasSentRecently_ = false; 
696   if (usePeriodicFlushing) {
697     enablePeriodicFlushing();
698   }
699
700 }
701
702 template <class dtype>
703 void MeshStreamer<dtype>::init(CkArrayID senderArrayID, CkCallback startCb, 
704             CkCallback endCb, int prio, bool usePeriodicFlushing) {
705     CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
706     int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
707     init(numLocalElements, startCb, endCb, prio, usePeriodicFlushing); 
708   }
709
710
711 template <class dtype>
712 void MeshStreamer<dtype>::finish() {
713   isPeriodicFlushEnabled_ = false; 
714
715   if (!userCallback_.isInvalid()) {
716     this->contribute(userCallback_);
717     userCallback_ = CkCallback();      // nullify the current callback
718   }
719
720 }
721
722 template <class dtype>
723 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
724
725   int destinationPe, lastDestinationPe; 
726   MeshLocation destinationLocation;
727
728   lastDestinationPe = -1;
729   for (int i = 0; i < msg->numDataItems; i++) {
730     destinationPe = msg->destinationPes[i];
731     const dtype& dataItem = msg->getDataItem(i);
732     if (destinationPe == CkMyPe()) {
733       localDeliver(dataItem);
734     }
735     else if (destinationPe != TRAM_BROADCAST) {
736       if (destinationPe != lastDestinationPe) {
737         // do this once per sequence of items with the same destination
738         destinationLocation = determineLocation(destinationPe, msg->dimension);
739       }
740       storeMessage(destinationPe, destinationLocation, &dataItem);   
741     }
742     else /* if (destinationPe == TRAM_BROADCAST) */ {
743       localBroadcast(dataItem);       
744       broadcast(&dataItem, msg->dimension - 1, false); 
745     }
746     lastDestinationPe = destinationPe; 
747   }
748
749   if (useStagedCompletion_) {
750     markMessageReceived(msg->dimension, msg->finalMsgCount); 
751   }
752
753   delete msg;
754
755 }
756
757 template <class dtype>
758 void MeshStreamer<dtype>::sendLargestBuffer() {
759
760   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
761   MeshStreamerMessage<dtype> ** messageBuffers; 
762   MeshStreamerMessage<dtype> *destinationBuffer; 
763
764   for (int i = 0; i < numDimensions_; i++) {
765
766     messageBuffers = dataBuffers_[i]; 
767     numBuffers = individualDimensionSizes_[i]; 
768
769     flushDimension = i; 
770     maxSize = 0;    
771     for (int j = 0; j < numBuffers; j++) {
772       if (messageBuffers[j] != NULL && 
773           messageBuffers[j]->numDataItems > maxSize) {
774         maxSize = messageBuffers[j]->numDataItems;
775         flushIndex = j; 
776       }
777     }
778
779     if (maxSize > 0) {
780
781       messageBuffers = dataBuffers_[flushDimension]; 
782       destinationBuffer = messageBuffers[flushIndex];
783       destinationIndex = myIndex_ + 
784         (flushIndex - myLocationIndex_[flushDimension]) * 
785         combinedDimensionSizes_[flushDimension] ;
786
787       // not sending the full buffer, shrink the message size
788       envelope *env = UsrToEnv(destinationBuffer);
789       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
790                         (bufferSize_ - destinationBuffer->numDataItems));
791       *((int *) env->getPrioPtr()) = prio_;
792
793       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
794
795       if (flushDimension == 0) {
796 #ifdef STREAMER_VERBOSE_OUTPUT
797         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
798 #endif
799         this->thisProxy[destinationIndex].
800           receiveAtDestination(destinationBuffer);
801       }
802       else {
803 #ifdef STREAMER_VERBOSE_OUTPUT
804         CkPrintf("[%d] sending intermediate flush to %d\n", 
805                  CkMyPe(), destinationIndex); 
806 #endif
807         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
808       }
809
810       if (useStagedCompletion_) {
811         cntMsgSent_[i][flushIndex]++; 
812       }
813
814       messageBuffers[flushIndex] = NULL;
815
816     }
817
818   }
819 }
820
821 template <class dtype>
822 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
823   
824   for (int i = 0; i < numDimensions_; i++) {
825     flushDimension(i); 
826   }
827 }
828
829 template <class dtype>
830 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
831 #ifdef STREAMER_VERBOSE_OUTPUT
832   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", 
833            CkMyPe(), dimension, sendMsgCounts); 
834 #endif
835   MeshStreamerMessage<dtype> **messageBuffers; 
836   MeshStreamerMessage<dtype> *destinationBuffer; 
837   int destinationIndex, numBuffers; 
838
839   messageBuffers = dataBuffers_[dimension]; 
840   numBuffers = individualDimensionSizes_[dimension]; 
841
842   for (int j = 0; j < numBuffers; j++) {
843
844     if(messageBuffers[j] == NULL) {      
845       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
846         messageBuffers[j] = 
847           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
848         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
849         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
850       }
851       else {
852         continue; 
853       } 
854     }
855
856     destinationBuffer = messageBuffers[j];
857     destinationIndex = myIndex_ + 
858       (j - myLocationIndex_[dimension]) * 
859       combinedDimensionSizes_[dimension] ;
860
861     if (destinationBuffer->numDataItems != 0) {
862       // not sending the full buffer, shrink the message size
863       envelope *env = UsrToEnv(destinationBuffer);
864       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
865                         (bufferSize_ - destinationBuffer->numDataItems));
866       *((int *) env->getPrioPtr()) = prio_;
867     }
868     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
869
870     if (useStagedCompletion_) {
871       cntMsgSent_[dimension][j]++;
872       if (sendMsgCounts) {
873         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
874       }
875     }
876
877     if (dimension == 0) {
878 #ifdef STREAMER_VERBOSE_OUTPUT
879       CkPrintf("[%d] sending dimension flush to %d\n", 
880                CkMyPe(), destinationIndex); 
881 #endif
882       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
883     }
884     else {
885 #ifdef STREAMER_VERBOSE_OUTPUT
886       CkPrintf("[%d] sending intermediate dimension flush to %d\n", 
887                CkMyPe(), destinationIndex); 
888 #endif
889       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
890     }
891     messageBuffers[j] = NULL;
892   }
893   
894 }
895
896
897 template <class dtype>
898 void MeshStreamer<dtype>::flushIfIdle(){
899   // flush if (1) this is not a periodic call or 
900   //          (2) this is a periodic call and no sending took place
901   //              since the last time the function was invoked
902   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
903
904     if (numDataItemsBuffered_ != 0) {
905       flushToIntermediateDestinations();
906     }    
907     CkAssert(numDataItemsBuffered_ == 0); 
908     
909   }
910
911   hasSentRecently_ = false; 
912
913 }
914
915 template <class dtype>
916 void periodicProgressFunction(void *MeshStreamerObj, double time) {
917
918   MeshStreamer<dtype> *properObj = 
919     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
920
921   if (properObj->isPeriodicFlushEnabled()) {
922     properObj->flushIfIdle();
923     properObj->registerPeriodicProgressFunction();
924   }
925 }
926
927 template <class dtype>
928 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
929   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
930                  progressPeriodInMs_); 
931 }
932
933
934 template <class dtype>
935 class GroupMeshStreamer : public MeshStreamer<dtype> {
936 private:
937
938   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
939   MeshStreamerGroupClient<dtype> *clientObj_;
940
941   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
942     for (int i = 0; i < msg->numDataItems; i++) {
943       const dtype& data = msg->getDataItem(i);
944       clientObj_->process(data);
945     }
946
947     if (MeshStreamer<dtype>::useStagedCompletion_) {
948 #ifdef STREAMER_VERBOSE_OUTPUT
949       envelope *env = UsrToEnv(msg);
950       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
951                CkMyPe(), env->getSrcPe(), msg->numDataItems, 
952                msg->finalMsgCount);  
953 #endif
954       markMessageReceived(msg->dimension, msg->finalMsgCount); 
955     }
956     else {
957       this->detectorLocalObj_->consume(msg->numDataItems);    
958     }
959
960     delete msg;
961   }
962
963   inline void localDeliver(const dtype& dataItem) {
964     clientObj_->process(dataItem);
965     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
966       MeshStreamer<dtype>::detectorLocalObj_->consume();
967     }
968   }
969
970   inline void localBroadcast(const dtype& dataItem) {
971     localDeliver(dataItem); 
972   }
973
974   inline int numElementsInClient() {
975     // client is a group - there is one element per PE
976     return CkNumPes();
977   }
978
979   inline int numLocalElementsInClient() {
980     return 1; 
981   }
982
983   inline void initLocalClients() {
984     // no action required
985   }
986
987 public:
988
989   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
990                     int *dimensionSizes, 
991                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
992                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
993    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
994                         0, yieldFlag, progressPeriodInMs) 
995   {
996     clientProxy_ = clientProxy; 
997     clientObj_ = 
998       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
999   }
1000
1001   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
1002                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1003                     int bufferSize, bool yieldFlag = 0, 
1004                     double progressPeriodInMs = -1.0)
1005    :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
1006                         yieldFlag, progressPeriodInMs) 
1007   {
1008     clientProxy_ = clientProxy; 
1009     clientObj_ = 
1010       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
1011   }
1012
1013 };
1014
1015 template <class dtype>
1016 class ClientInitializer : public CkLocIterator {
1017
1018 public:
1019   
1020   CompletionDetector *detectorLocalObj_;
1021   CkArray *clientArrMgr_;
1022   ClientInitializer(CompletionDetector *detectorObj, 
1023                              CkArray *clientArrMgr) 
1024     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
1025
1026   // CkLocMgr::iterate will call addLocation on all elements local to this PE
1027   void addLocation(CkLocation& loc) {
1028
1029     MeshStreamerArrayClient<dtype> *clientObj = 
1030       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1031
1032     CkAssert(clientObj != NULL); 
1033     clientObj->setDetector(detectorLocalObj_); 
1034   }
1035
1036 };
1037
1038 template <class dtype>
1039 class LocalBroadcaster : public CkLocIterator {
1040
1041 public:
1042   CkArray *clientArrMgr_;
1043   const dtype *dataItem_; 
1044
1045   LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem) 
1046    : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
1047
1048   void addLocation(CkLocation& loc) {
1049     MeshStreamerArrayClient<dtype> *clientObj = 
1050       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1051
1052     CkAssert(clientObj != NULL); 
1053
1054     clientObj->process(*dataItem_); 
1055   }
1056
1057 };
1058
1059 template <class dtype, class itype>
1060 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
1061   
1062 private:
1063   
1064   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
1065   CkArray *clientArrayMgr_;
1066   int numArrayElements_;
1067   int numLocalArrayElements_;
1068 #ifdef CACHE_ARRAY_METADATA
1069   MeshStreamerArrayClient<dtype> **clientObjs_;
1070   int *destinationPes_;
1071   bool *isCachedArrayMetadata_;
1072 #endif
1073
1074   void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
1075     itype arrayId = packedDataItem.arrayIndex; 
1076     if (arrayId == itype(TRAM_BROADCAST)) {
1077       localBroadcast(packedDataItem);
1078       return;
1079     }
1080     MeshStreamerArrayClient<dtype> *clientObj;
1081 #ifdef CACHE_ARRAY_METADATA
1082     clientObj = clientObjs_[arrayId];
1083 #else
1084     clientObj = clientProxy_[arrayId].ckLocal();
1085 #endif
1086
1087     if (clientObj != NULL) {
1088       clientObj->process(packedDataItem.dataItem);
1089       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1090            ::useStagedCompletion_ == false) {
1091         MeshStreamer<ArrayDataItem<dtype, itype> >
1092          ::detectorLocalObj_->consume();
1093       }
1094     }
1095     else { 
1096       // array element is no longer present locally - redeliver using proxy
1097       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
1098     }
1099   }
1100
1101   void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
1102
1103     LocalBroadcaster<dtype> clientIterator(clientProxy_.ckLocalBranch(), 
1104                                            &packedDataItem.dataItem);
1105     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1106     clientLocMgr->iterate(clientIterator);
1107
1108     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1109          ::useStagedCompletion_ == false) {
1110         MeshStreamer<ArrayDataItem<dtype, itype> >
1111          ::detectorLocalObj_->consume();      
1112     }
1113
1114   }
1115
1116   int numElementsInClient() {
1117     return numArrayElements_;
1118   }
1119
1120   int numLocalElementsInClient() {
1121     return numLocalArrayElements_;
1122   }
1123
1124   void initLocalClients() {
1125     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1126          ::useStagedCompletion_ == false) {
1127 #ifdef CACHE_ARRAY_METADATA
1128       std::fill(isCachedArrayMetadata_, 
1129                 isCachedArrayMetadata_ + numArrayElements_, false);
1130
1131       for (int i = 0; i < numArrayElements_; i++) {
1132         clientObjs_[i] = clientProxy_[i].ckLocal();
1133         if (clientObjs_[i] != NULL) {
1134           clientObjs_[i]->setDetector(
1135            MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
1136         }
1137       }
1138 #else
1139       // set completion detector in local elements of the client
1140       CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1141       ClientInitializer<dtype> clientIterator(
1142           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1143           clientProxy_.ckLocalBranch());
1144       clientLocMgr->iterate(clientIterator);
1145 #endif    
1146     }
1147     else {
1148       numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1149     }
1150   }
1151
1152   void commonInit() {
1153 #ifdef CACHE_ARRAY_METADATA
1154     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1155     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1156     destinationPes_ = new int[numArrayElements_];
1157     isCachedArrayMetadata_ = new bool[numArrayElements_];
1158     std::fill(isCachedArrayMetadata_, 
1159               isCachedArrayMetadata_ + numArrayElements_, false);
1160 #endif    
1161   }
1162
1163 public:
1164
1165   struct DataItemHandle {
1166     itype arrayIndex; 
1167     const dtype *dataItem;
1168   };
1169
1170   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1171                     int *dimensionSizes, 
1172                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1173                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1174     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1175                   maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
1176                   0, yieldFlag, progressPeriodInMs) 
1177   {
1178     clientProxy_ = clientProxy; 
1179     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1180     commonInit();
1181   }
1182
1183   ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
1184                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1185                     int bufferSize, bool yieldFlag = 0, 
1186                     double progressPeriodInMs = -1.0)
1187     :MeshStreamer<ArrayDataItem<dtype,itype> >(
1188                   0, numDimensions, dimensionSizes, 
1189                   bufferSize, yieldFlag, progressPeriodInMs) 
1190   {
1191     clientProxy_ = clientProxy; 
1192     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1193     commonInit();
1194
1195   }
1196
1197   ~ArrayMeshStreamer() {
1198 #ifdef CACHE_ARRAY_METADATA
1199     delete [] clientObjs_;
1200     delete [] destinationPes_;
1201     delete [] isCachedArrayMetadata_; 
1202 #endif
1203   }
1204
1205   void receiveAtDestination(
1206        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1207
1208     for (int i = 0; i < msg->numDataItems; i++) {
1209       const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
1210       localDeliver(packedData);
1211     }
1212     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1213       markMessageReceived(msg->dimension, msg->finalMsgCount);
1214     }
1215
1216     delete msg;
1217   }
1218
1219   inline void broadcast(const dtype& dataItem) {
1220     const static bool copyIndirectly = true;
1221
1222     // no data items should be submitted after all local contributors call done
1223     // and staged completion has begun
1224     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1225                ::stagedCompletionStarted()) == false);
1226
1227     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1228          ::useStagedCompletion_ == false) {
1229       MeshStreamer<ArrayDataItem<dtype, itype> >
1230         ::detectorLocalObj_->produce(CkNumPes());
1231     }
1232
1233     // deliver locally
1234     ArrayDataItem<dtype, itype>& packedDataItem(TRAM_BROADCAST, dataItem);
1235     localBroadcast(packedDataItem);
1236
1237     DataItemHandle tempHandle; 
1238     tempHandle.dataItem = &dataItem;
1239     tempHandle.arrayIndex = TRAM_BROADCAST;
1240
1241     int numDimensions = 
1242       MeshStreamer<ArrayDataItem<dtype, itype> >::numDimensions_;
1243     MeshStreamer<ArrayDataItem<dtype, itype> >::
1244       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
1245   }
1246
1247   inline void insertData(const dtype& dataItem, itype arrayIndex) {
1248
1249     // no data items should be submitted after all local contributors call done
1250     // and staged completion has begun
1251     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1252                ::stagedCompletionStarted()) == false);
1253
1254     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1255          ::useStagedCompletion_ == false) {
1256       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1257     }
1258     int destinationPe; 
1259 #ifdef CACHE_ARRAY_METADATA
1260   if (isCachedArrayMetadata_[arrayIndex]) {    
1261     destinationPe =  destinationPes_[arrayIndex];
1262   }
1263   else {
1264     destinationPe = 
1265       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1266     isCachedArrayMetadata_[arrayIndex] = true;
1267     destinationPes_[arrayIndex] = destinationPe;
1268   }
1269 #else 
1270   destinationPe = 
1271     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1272 #endif
1273
1274     if (destinationPe == CkMyPe()) {
1275       ArrayDataItem<dtype, itype> packedDataItem(arrayIndex, dataItem);
1276       localDeliver(packedDataItem);
1277       return;
1278     }
1279
1280     // this implementation avoids copying an item before transfer into message
1281     DataItemHandle tempHandle; 
1282     tempHandle.arrayIndex = arrayIndex; 
1283     tempHandle.dataItem = &dataItem;
1284
1285     MeshStreamer<ArrayDataItem<dtype, itype> >::
1286      insertData(&tempHandle, destinationPe);
1287
1288   }
1289
1290   inline int copyDataItemIntoMessage(
1291       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1292       const void *dataItemHandle, bool copyIndirectly) {
1293
1294     if (copyIndirectly == true) {
1295       // newly inserted items are passed through a handle to avoid copying
1296       int numDataItems = destinationBuffer->numDataItems;
1297       const DataItemHandle *tempHandle = 
1298         (const DataItemHandle *) dataItemHandle;
1299       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1300         *(tempHandle->dataItem);
1301       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1302         tempHandle->arrayIndex;
1303       return ++destinationBuffer->numDataItems;
1304     }
1305     else {
1306       // this is an item received along the route to destination
1307       // we can copy it from the received message
1308       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1309               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1310     }
1311   }
1312
1313 };
1314
1315 template <class dtype>
1316 class GroupChunkMeshStreamer : 
1317   public MeshStreamer<ChunkDataItem> {
1318
1319 private:
1320   dtype **receiveBuffers;
1321   int *receivedChunks;
1322   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
1323   MeshStreamerGroupClient<dtype> *clientObj_;
1324
1325
1326 public:
1327
1328   GroupChunkMeshStreamer(
1329        int maxNumDataItemsBuffered, int numDimensions,
1330        int *dimensionSizes, 
1331        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1332        bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1333     :MeshStreamer<ChunkDataItem>(maxNumDataItemsBuffered, 
1334                                  numDimensions, dimensionSizes,
1335                                  0, yieldFlag, progressPeriodInMs) {
1336     
1337     receiveBuffers = new dtype*[CkNumPes()];
1338     receivedChunks = new int[CkNumPes()]; 
1339     memset(receivedChunks, 0, CkNumPes() * sizeof(int));
1340     memset(receiveBuffers, 0, CkNumPes() * sizeof(dtype*)); 
1341
1342     clientProxy_ = clientProxy; 
1343     clientObj_ = 
1344       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
1345
1346   }
1347
1348   GroupChunkMeshStreamer(
1349        int numDimensions, int *dimensionSizes, 
1350        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1351        int bufferSize, bool yieldFlag = 0, 
1352        double progressPeriodInMs = -1.0)
1353     :MeshStreamer<ChunkDataItem>(0, numDimensions, dimensionSizes, 
1354                                  bufferSize, yieldFlag, 
1355                                  progressPeriodInMs) {
1356
1357     receiveBuffers = new dtype*[CkNumPes()];
1358     receivedChunks = new int[CkNumPes()]; 
1359     memset(receivedChunks, 0, CkNumPes() * sizeof(int));
1360     memset(receiveBuffers, 0, CkNumPes() * sizeof(dtype*)); 
1361
1362     clientProxy_ = clientProxy; 
1363     clientObj_ = 
1364       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
1365
1366   }
1367
1368   inline void insertData(dtype *dataArray, int numElements, int destinationPe) {
1369
1370     char *inputData = (char *) dataArray; 
1371     int arraySizeInBytes = numElements * sizeof(dtype); 
1372     ChunkDataItem chunk;
1373     int chunkNumber = 0; 
1374     chunk.sourcePe = CkMyPe();
1375     chunk.chunkNumber = 0; 
1376     chunk.chunkSize = CHUNK_SIZE;
1377     chunk.numChunks = (int) ceil ( (float) numElements * sizeof(dtype) / CHUNK_SIZE); 
1378     chunk.numItems = numElements; 
1379     for (int offset = 0; offset < arraySizeInBytes; offset += CHUNK_SIZE) {
1380
1381       if (offset + CHUNK_SIZE > arraySizeInBytes) {
1382         chunk.chunkSize = arraySizeInBytes - offset; 
1383         memset(chunk.rawData, 0, CHUNK_SIZE);
1384         memcpy(chunk.rawData, inputData + offset, chunk.chunkSize); 
1385       }
1386       else {
1387         memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
1388       }
1389
1390       MeshStreamer<ChunkDataItem>::insertData(chunk, destinationPe); 
1391       chunk.chunkNumber++; 
1392     }
1393
1394   }
1395
1396   inline void processChunk(const ChunkDataItem& chunk) {
1397
1398     if (receiveBuffers[chunk.sourcePe] == NULL) {
1399       receiveBuffers[chunk.sourcePe] = new dtype[chunk.numItems]; 
1400     }      
1401
1402     char *receiveBuffer = (char *) receiveBuffers[chunk.sourcePe];
1403
1404     memcpy(receiveBuffer + chunk.chunkNumber * CHUNK_SIZE, 
1405            chunk.rawData, chunk.chunkSize);
1406     if (++receivedChunks[chunk.sourcePe] == chunk.numChunks) {
1407       clientObj_->receiveArray((dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
1408       receivedChunks[chunk.sourcePe] = 0;        
1409       delete [] receiveBuffers[chunk.sourcePe]; 
1410       receiveBuffers[chunk.sourcePe] = NULL;
1411     }
1412
1413   }
1414
1415   inline void localDeliver(const ChunkDataItem& chunk) {
1416     processChunk(chunk);
1417     if (MeshStreamer<ChunkDataItem>::useStagedCompletion_ == false) {
1418       MeshStreamer<ChunkDataItem>::detectorLocalObj_->consume();
1419     }
1420   }
1421
1422   inline void receiveAtDestination(
1423        MeshStreamerMessage<ChunkDataItem> *msg) {
1424
1425     for (int i = 0; i < msg->numDataItems; i++) {
1426       const ChunkDataItem& chunk = msg->getDataItem(i);
1427       processChunk(chunk);             
1428     }
1429
1430     if (MeshStreamer<ChunkDataItem>::useStagedCompletion_) {
1431 #ifdef STREAMER_VERBOSE_OUTPUT
1432       envelope *env = UsrToEnv(msg);
1433       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
1434                CkMyPe(), env->getSrcPe(), msg->numDataItems, 
1435                msg->finalMsgCount);  
1436 #endif
1437       markMessageReceived(msg->dimension, msg->finalMsgCount); 
1438     }
1439     else {
1440       this->detectorLocalObj_->consume(msg->numDataItems);    
1441     }
1442
1443     delete msg;
1444     
1445   }
1446
1447   inline void localBroadcast(const ChunkDataItem& dataItem) {
1448     localDeliver(dataItem); 
1449   }
1450
1451   inline int numElementsInClient() {
1452     // client is a group - there is one element per PE
1453     return CkNumPes();
1454   }
1455
1456   inline int numLocalElementsInClient() {
1457     return 1; 
1458   }
1459
1460   inline void initLocalClients() {
1461     // no action required
1462   }
1463
1464 };
1465
1466
1467
1468 #define CK_TEMPLATES_ONLY
1469 #include "NDMeshStreamer.def.h"
1470 #undef CK_TEMPLATES_ONLY
1471
1472 #endif