NDMeshStreamer: code cleanup.
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define CACHE_LOCATIONS
17 // #define SUPPORT_INCOMPLETE_MESH
18 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
19 // #define STREAMER_VERBOSE_OUTPUT
20
21 #define TRAM_BROADCAST (-100)
22
23 struct MeshLocation {
24   int dimension; 
25   int bufferIndex; 
26 }; 
27
28 template<class dtype>
29 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
30
31 public:
32
33   int finalMsgCount; 
34   int dimension; 
35   int numDataItems;
36   int *destinationPes;
37   dtype *dataItems;
38
39   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
40     finalMsgCount = -1; 
41   }
42
43   inline int addDataItem(const dtype& dataItem) {
44     dataItems[numDataItems] = dataItem;
45     return ++numDataItems; 
46   }
47
48   inline void markDestination(const int index, const int destinationPe) {
49     destinationPes[index] = destinationPe;
50   }
51
52   inline const dtype& getDataItem(const int index) {
53     return dataItems[index];
54   }
55
56 };
57
58 template <class dtype>
59 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
60 private:
61   CompletionDetector *detectorLocalObj_;
62 public:
63   MeshStreamerArrayClient(){
64     detectorLocalObj_ = NULL; 
65   }
66   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
67   // would like to make it pure virtual but charm will try to
68   // instantiate the abstract class, leading to errors
69   virtual void process(const dtype& data) {
70     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
71             "This virtual function should have been defined by the user.\n");
72   }     
73   inline void setDetector(CompletionDetector *detectorLocalObj) {
74     detectorLocalObj_ = detectorLocalObj;
75   }
76   void receiveRedeliveredItem(dtype data) {
77 #ifdef STREAMER_VERBOSE_OUTPUT
78     CkPrintf("[%d] redelivered to index %d\n", 
79              CkMyPe(), this->thisIndex.data[0]);
80 #endif
81     if (detectorLocalObj_ != NULL) {
82       detectorLocalObj_->consume();
83     }
84     process(data);
85   }
86
87   void pup(PUP::er& p) {
88     CBase_MeshStreamerArrayClient<dtype>::pup(p);
89    }  
90
91 };
92
93 template <class dtype>
94 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
95
96 public:
97   virtual void process(const dtype& data) {
98     CkAbort("Error. MeshStreamerGroupClient::process() is being called. "
99             "This virtual function should have been defined by the user.\n");
100   }     
101
102   virtual void receiveArray(dtype *data, int numItems, int sourcePe) {
103     for (int i = 0; i < numItems; i++) {
104       process(data[i]);
105     }
106   }
107 };
108
109 template <class dtype>
110 class MeshStreamer : public CBase_MeshStreamer<dtype> {
111
112 private:
113   int bufferSize_; 
114   int maxNumDataItemsBuffered_;
115   int numDataItemsBuffered_;
116
117   int numMembers_; 
118   int *individualDimensionSizes_;
119   int *combinedDimensionSizes_;
120
121   int *startingIndexAtDimension_; 
122
123   int myIndex_;
124   int *myLocationIndex_;
125
126   CkCallback   userCallback_;
127   bool yieldFlag_;
128
129   double progressPeriodInMs_; 
130   bool isPeriodicFlushEnabled_; 
131   bool hasSentRecently_;
132   MeshStreamerMessage<dtype> ***dataBuffers_;
133
134   CProxy_CompletionDetector detector_;
135   int prio_;
136   int yieldCount_;
137
138 #ifdef CACHE_LOCATIONS
139   MeshLocation *cachedLocations_;
140   bool *isCached_; 
141 #endif
142
143
144   // only used for staged completion
145   int **cntMsgSent_;
146   int *cntMsgReceived_;
147   int *cntMsgExpected_;
148   int *cntFinished_; 
149   int dimensionToFlush_;
150   int numLocalDone_; 
151   int numLocalContributors_;
152
153   void storeMessage(int destinationPe, 
154                     const MeshLocation& destinationCoordinates, 
155                     const void *dataItem, bool copyIndirectly = false);
156   virtual void localDeliver(const dtype& dataItem) = 0; 
157   virtual void localBroadcast(const dtype& dataItem) = 0; 
158   virtual int numElementsInClient() = 0;
159   virtual int numLocalElementsInClient() = 0; 
160
161   virtual void initLocalClients() = 0;
162
163   void sendLargestBuffer();
164   void flushToIntermediateDestinations();
165   void flushDimension(int dimension, bool sendMsgCounts = false); 
166   MeshLocation determineLocation(int destinationPe, 
167                                  int dimensionReceivedAlong);
168
169 protected:
170
171   int numDimensions_;
172   bool useStagedCompletion_;
173   CompletionDetector *detectorLocalObj_;
174   virtual int copyDataItemIntoMessage(
175               MeshStreamerMessage<dtype> *destinationBuffer, 
176               const void *dataItemHandle, bool copyIndirectly = false);
177   void insertData(const void *dataItemHandle, int destinationPe);
178   void broadcast(const void *dataItemHandle, int dimension, 
179                  bool copyIndirectly);
180
181 public:
182
183   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
184                int *dimensionSizes, int bufferSize,
185                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
186   ~MeshStreamer();
187
188   // entry
189   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
190             int prio, bool usePeriodicFlushing);
191   void init(int numContributors, CkCallback startCb, CkCallback endCb, 
192             CProxy_CompletionDetector detector,
193             int prio, bool usePeriodicFlushing);
194   void init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb, 
195             int prio, bool usePeriodicFlushing);
196
197   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
198   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
199   void flushIfIdle();
200   void finish();
201
202   // non entry
203   inline bool isPeriodicFlushEnabled() {
204     return isPeriodicFlushEnabled_;
205   }
206   virtual void insertData(const dtype& dataItem, int destinationPe); 
207   virtual void broadcast(const dtype& dataItem); 
208   void registerPeriodicProgressFunction();
209
210   // flushing begins only after enablePeriodicFlushing has been invoked
211
212   inline void enablePeriodicFlushing(){
213     isPeriodicFlushEnabled_ = true; 
214     registerPeriodicProgressFunction();
215   }
216
217   inline void done(int numContributorsFinished = 1) {
218
219     if (useStagedCompletion_) {
220       numLocalDone_ += numContributorsFinished; 
221       if (numLocalDone_ == numLocalContributors_) {
222         startStagedCompletion();
223       }
224     }
225     else {
226       detectorLocalObj_->done(numContributorsFinished);
227     }
228   }
229   
230   inline bool stagedCompletionStarted() {
231     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
232   }
233
234   inline void startStagedCompletion() {
235     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
236       flushDimension(dimensionToFlush_, true);
237     }
238     dimensionToFlush_--;
239
240     checkForCompletedStages();
241   }
242
243   inline void markMessageReceived(int dimension, int finalCount) {
244     cntMsgReceived_[dimension]++;
245     if (finalCount != -1) {
246       cntFinished_[dimension]++; 
247       cntMsgExpected_[dimension] += finalCount; 
248 #ifdef STREAMER_VERBOSE_OUTPUT
249       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
250                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
251                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
252                cntMsgReceived_[dimension]); 
253 #endif
254     }  
255     if (dimensionToFlush_ != numDimensions_ - 1) {
256       checkForCompletedStages();
257     }
258   }
259
260   inline void checkForCompletedStages() {
261
262     while (cntFinished_[dimensionToFlush_ + 1] == 
263            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
264            cntMsgExpected_[dimensionToFlush_ + 1] == 
265            cntMsgReceived_[dimensionToFlush_ + 1]) {
266       if (dimensionToFlush_ == -1) {
267 #ifdef STREAMER_VERBOSE_OUTPUT
268         CkPrintf("[%d] contribute\n", CkMyPe()); 
269 #endif
270         CkAssert(numDataItemsBuffered_ == 0); 
271         isPeriodicFlushEnabled_ = false; 
272         if (!userCallback_.isInvalid()) {
273           this->contribute(userCallback_);
274           userCallback_ = CkCallback();
275         }
276         return; 
277       }
278       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
279         flushDimension(dimensionToFlush_, true); 
280       }
281       dimensionToFlush_--; 
282     }
283   }
284
285 };
286
287 template <class dtype>
288 MeshStreamer<dtype>::MeshStreamer(
289                      int maxNumDataItemsBuffered, int numDimensions, 
290                      int *dimensionSizes, 
291                      int bufferSize,
292                      bool yieldFlag, 
293                      double progressPeriodInMs)
294  :numDimensions_(numDimensions), 
295   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
296   yieldFlag_(yieldFlag), 
297   progressPeriodInMs_(progressPeriodInMs), 
298   bufferSize_(bufferSize)
299 {
300
301   int sumAlongAllDimensions = 0;   
302   individualDimensionSizes_ = new int[numDimensions_];
303   combinedDimensionSizes_ = new int[numDimensions_ + 1];
304   myLocationIndex_ = new int[numDimensions_];
305   startingIndexAtDimension_ = new int[numDimensions_ + 1]; 
306   memcpy(individualDimensionSizes_, dimensionSizes, 
307          numDimensions * sizeof(int)); 
308   combinedDimensionSizes_[0] = 1; 
309   for (int i = 0; i < numDimensions; i++) {
310     sumAlongAllDimensions += individualDimensionSizes_[i];
311     combinedDimensionSizes_[i + 1] = 
312       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
313   }
314
315   CkAssert(combinedDimensionSizes_[numDimensions] == CkNumPes()); 
316
317   // a bufferSize input of 0 indicates it should be calculated by the library
318   if (bufferSize_ == 0) {
319     CkAssert(maxNumDataItemsBuffered_ > 0);
320     // buffers for dimensions with the 
321     //   same index as the sender's are not allocated/used
322     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
323       / (sumAlongAllDimensions - numDimensions_ + 1); 
324   }
325   else {
326     maxNumDataItemsBuffered_ = 
327       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1) 
328       / OVERALLOCATION_FACTOR;
329   }
330
331   if (bufferSize_ <= 0) {
332     bufferSize_ = 1; 
333     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
334              "is invalid. Defaulting to a single buffer per destination.\n");
335   }
336   numDataItemsBuffered_ = 0; 
337   numMembers_ = CkNumPes(); 
338
339   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
340   for (int i = 0; i < numDimensions; i++) {
341     int numMembersAlongDimension = individualDimensionSizes_[i]; 
342     dataBuffers_[i] = 
343       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
344     for (int j = 0; j < numMembersAlongDimension; j++) {
345       dataBuffers_[i][j] = NULL;
346     }
347   }
348
349   myIndex_ = CkMyPe();
350   int remainder = myIndex_;
351   startingIndexAtDimension_[numDimensions_] = 0;
352   for (int i = numDimensions_ - 1; i >= 0; i--) {    
353     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
354     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
355     remainder -= dimensionOffset; 
356     startingIndexAtDimension_[i] = 
357       startingIndexAtDimension_[i+1] + dimensionOffset; 
358   }
359
360   isPeriodicFlushEnabled_ = false; 
361   detectorLocalObj_ = NULL;
362
363 #ifdef CACHE_LOCATIONS
364   cachedLocations_ = new MeshLocation[numMembers_];
365   isCached_ = new bool[numMembers_];
366   std::fill(isCached_, isCached_ + numMembers_, false);
367 #endif
368
369   cntMsgSent_ = NULL; 
370   cntMsgReceived_ = NULL; 
371   cntMsgExpected_ = NULL; 
372   cntFinished_ = NULL; 
373
374 }
375
376 template <class dtype>
377 MeshStreamer<dtype>::~MeshStreamer() {
378
379   for (int i = 0; i < numDimensions_; i++) {
380     for (int j=0; j < individualDimensionSizes_[i]; j++) {
381       delete[] dataBuffers_[i][j]; 
382     }
383     delete[] dataBuffers_[i]; 
384   }
385
386   delete[] individualDimensionSizes_;
387   delete[] combinedDimensionSizes_; 
388   delete[] myLocationIndex_;
389   delete[] startingIndexAtDimension_;
390
391 #ifdef CACHE_LOCATIONS
392   delete[] cachedLocations_;
393   delete[] isCached_; 
394 #endif
395
396   if (cntMsgSent_ != NULL) {
397     for (int i = 0; i < numDimensions_; i++) {
398       delete[] cntMsgSent_[i]; 
399     }
400     delete[] cntMsgSent_; 
401     delete[] cntMsgReceived_; 
402     delete[] cntMsgExpected_; 
403     delete[] cntFinished_;
404   }
405
406 }
407
408 template <class dtype>
409 inline
410 MeshLocation MeshStreamer<dtype>::determineLocation(
411                                   int destinationPe, 
412                                   int dimensionReceivedAlong) {
413
414 #ifdef CACHE_LOCATIONS
415   if (isCached_[destinationPe]) {    
416     return cachedLocations_[destinationPe]; 
417   }
418 #endif
419
420   MeshLocation destinationLocation;
421   int remainder = 
422     destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
423   int dimensionIndex; 
424   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {        
425     dimensionIndex = remainder / combinedDimensionSizes_[i];
426     
427     if (dimensionIndex != myLocationIndex_[i]) {
428       destinationLocation.dimension = i; 
429       destinationLocation.bufferIndex = dimensionIndex; 
430 #ifdef CACHE_LOCATIONS
431       cachedLocations_[destinationPe] = destinationLocation;
432       isCached_[destinationPe] = true; 
433 #endif
434       return destinationLocation;
435     }
436
437     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
438   }
439
440   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
441           "equal to sender's PE. This is unexpected and may cause errors.\n"); 
442   // to prevent warnings
443   return destinationLocation; 
444 }
445
446 template <class dtype>
447 inline 
448 int MeshStreamer<dtype>::copyDataItemIntoMessage(
449                          MeshStreamerMessage<dtype> *destinationBuffer,
450                          const void *dataItemHandle, bool copyIndirectly) {
451   return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle)); 
452 }
453
454 template <class dtype>
455 inline
456 void MeshStreamer<dtype>::storeMessage(
457                           int destinationPe, 
458                           const MeshLocation& destinationLocation,
459                           const void *dataItem, bool copyIndirectly) {
460
461   int dimension = destinationLocation.dimension;
462   int bufferIndex = destinationLocation.bufferIndex; 
463   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
464
465   // allocate new message if necessary
466   if (messageBuffers[bufferIndex] == NULL) {
467     if (dimension == 0) {
468       // personalized messages do not require destination indices
469       messageBuffers[bufferIndex] = 
470         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
471     }
472     else {
473       messageBuffers[bufferIndex] = 
474         new (bufferSize_, bufferSize_, sizeof(int)) 
475         MeshStreamerMessage<dtype>(dimension);
476     }
477     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
478     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
479     CkAssert(messageBuffers[bufferIndex] != NULL);
480   }
481   
482   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
483   int numBuffered = 
484     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
485   if (dimension != 0) {
486     destinationBuffer->markDestination(numBuffered-1, destinationPe);
487   }
488   numDataItemsBuffered_++;
489
490   // send if buffer is full
491   if (numBuffered == bufferSize_) {
492
493     int destinationIndex;
494
495     destinationIndex = myIndex_ + 
496       (bufferIndex - myLocationIndex_[dimension]) * 
497       combinedDimensionSizes_[dimension];
498
499     if (dimension == 0) {
500 #ifdef STREAMER_VERBOSE_OUTPUT
501       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
502 #endif
503       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
504     }
505     else {
506 #ifdef STREAMER_VERBOSE_OUTPUT
507       CkPrintf("[%d] sending intermediate to %d\n", 
508                CkMyPe(), destinationIndex); 
509 #endif
510       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
511     }
512
513     if (useStagedCompletion_) {
514       cntMsgSent_[dimension][bufferIndex]++; 
515     }
516
517     messageBuffers[bufferIndex] = NULL;
518     numDataItemsBuffered_ -= numBuffered; 
519     hasSentRecently_ = true; 
520
521   }
522   // send if total buffering capacity has been reached
523   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
524     sendLargestBuffer();
525     hasSentRecently_ = true; 
526   }
527
528 }
529 template <class dtype>
530 inline
531 void MeshStreamer<dtype>::broadcast(const dtype& dataItem) {
532   const static bool copyIndirectly = true;
533
534   // no data items should be submitted after all local contributors call done 
535   // and staged completion has begun
536   CkAssert(stagedCompletionStarted() == false);
537
538   // produce and consume once per PE
539   if (!useStagedCompletion_) {
540     detectorLocalObj_->produce(CkNumPes());
541   }
542
543   // deliver locally
544   localBroadcast(dataItem);
545
546   broadcast(&dataItem, numDimensions_ - 1, copyIndirectly); 
547 }
548
549 template <class dtype>
550 inline
551 void MeshStreamer<dtype>::broadcast(const void *dataItemHandle, int dimension, 
552                                     bool copyIndirectly) {
553
554   MeshLocation destinationLocation;
555   destinationLocation.dimension = dimension; 
556
557   while (destinationLocation.dimension != -1) {
558     for (int i = 0; 
559          i < individualDimensionSizes_[destinationLocation.dimension]; 
560          i++) {
561
562       if (i != myLocationIndex_[destinationLocation.dimension]) {
563         destinationLocation.bufferIndex = i; 
564         storeMessage(TRAM_BROADCAST, destinationLocation, 
565                      dataItemHandle, copyIndirectly); 
566       }
567       // release control to scheduler if requested by the user, 
568       //   assume caller is threaded entry
569       if (yieldFlag_ && ++yieldCount_ == 1024) {
570         yieldCount_ = 0; 
571         CthYield();
572       }
573     }
574     destinationLocation.dimension--; 
575   }
576
577 }
578
579
580 template <class dtype>
581 inline
582 void MeshStreamer<dtype>::insertData(const void *dataItemHandle, 
583                                      int destinationPe) {
584   const static bool copyIndirectly = true;
585
586   // treat newly inserted items as if they were received along
587   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
588   MeshLocation destinationLocation = determineLocation(destinationPe, 
589                                                        numDimensions_);
590   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
591                copyIndirectly); 
592   // release control to scheduler if requested by the user, 
593   //   assume caller is threaded entry
594   if (yieldFlag_ && ++yieldCount_ == 1024) {
595     yieldCount_ = 0; 
596     CthYield();
597   }
598
599 }
600
601 template <class dtype>
602 inline
603 void MeshStreamer<dtype>::insertData(const dtype& dataItem, int destinationPe) {
604
605   // no data items should be submitted after all local contributors call done 
606   // and staged completion has begun
607   CkAssert(stagedCompletionStarted() == false);
608
609   if (!useStagedCompletion_) {
610     detectorLocalObj_->produce();
611   }
612   if (destinationPe == CkMyPe()) {
613     localDeliver(dataItem);
614     return;
615   }
616
617   insertData((const void *) &dataItem, destinationPe);
618 }
619
620 template <class dtype>
621 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
622                                CkCallback endCb, int prio, 
623                                bool usePeriodicFlushing) {
624   useStagedCompletion_ = true; 
625   // allocate memory on first use
626   if (cntMsgSent_ == NULL) {
627     cntMsgSent_ = new int*[numDimensions_]; 
628     cntMsgReceived_ = new int[numDimensions_];
629     cntMsgExpected_ = new int[numDimensions_];
630     cntFinished_ = new int[numDimensions_]; 
631
632     for (int i = 0; i < numDimensions_; i++) {
633       cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
634     }
635   }
636
637
638   for (int i = 0; i < numDimensions_; i++) {
639     std::fill(cntMsgSent_[i], 
640               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
641     cntMsgReceived_[i] = 0;
642     cntMsgExpected_[i] = 0; 
643     cntFinished_[i] = 0; 
644   }
645   dimensionToFlush_ = numDimensions_ - 1;
646
647   yieldCount_ = 0; 
648   userCallback_ = endCb; 
649   prio_ = prio;
650
651   numLocalDone_ = 0; 
652   numLocalContributors_ = numLocalContributors; 
653   initLocalClients();
654
655   if (numLocalContributors_ == 0) {
656     startStagedCompletion();
657   }
658
659   hasSentRecently_ = false;
660   if (usePeriodicFlushing) {
661     enablePeriodicFlushing();
662   }
663
664   this->contribute(startCb);
665 }
666
667 template <class dtype>
668 void MeshStreamer<dtype>::init(int numContributors,       
669                                CkCallback startCb, CkCallback endCb,
670                                CProxy_CompletionDetector detector, 
671                                int prio, bool usePeriodicFlushing) {
672
673   useStagedCompletion_ = false; 
674   yieldCount_ = 0; 
675   prio_ = prio;
676   userCallback_ = endCb; 
677   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushIfIdle(), 
678                      this->thisProxy);
679   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
680                     this->thisProxy);
681   detector_ = detector;      
682   detectorLocalObj_ = detector_.ckLocalBranch();
683   initLocalClients();
684
685   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, 
686                                      finish , 0);
687   
688   if (progressPeriodInMs_ <= 0) {
689     CkPrintf("Using completion detection in NDMeshStreamer requires"
690              " setting a valid periodic flush period. Defaulting"
691              " to 10 ms\n");
692     progressPeriodInMs_ = 10;
693   }
694   
695   hasSentRecently_ = false; 
696   if (usePeriodicFlushing) {
697     enablePeriodicFlushing();
698   }
699
700 }
701
702 template <class dtype>
703 void MeshStreamer<dtype>::init(CkArrayID senderArrayID, CkCallback startCb, 
704             CkCallback endCb, int prio, bool usePeriodicFlushing) {
705     CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
706     int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
707     init(numLocalElements, startCb, endCb, prio, usePeriodicFlushing); 
708   }
709
710
711 template <class dtype>
712 void MeshStreamer<dtype>::finish() {
713   isPeriodicFlushEnabled_ = false; 
714
715   if (!userCallback_.isInvalid()) {
716     this->contribute(userCallback_);
717     userCallback_ = CkCallback();      // nullify the current callback
718   }
719
720 }
721
722 template <class dtype>
723 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
724
725   int destinationPe, lastDestinationPe; 
726   MeshLocation destinationLocation;
727
728   lastDestinationPe = -1;
729   for (int i = 0; i < msg->numDataItems; i++) {
730     destinationPe = msg->destinationPes[i];
731     const dtype& dataItem = msg->getDataItem(i);
732     if (destinationPe == CkMyPe()) {
733       localDeliver(dataItem);
734     }
735     else if (destinationPe != TRAM_BROADCAST) {
736       if (destinationPe != lastDestinationPe) {
737         // do this once per sequence of items with the same destination
738         destinationLocation = determineLocation(destinationPe, msg->dimension);
739       }
740       storeMessage(destinationPe, destinationLocation, &dataItem);   
741     }
742     else /* if (destinationPe == TRAM_BROADCAST) */ {
743       localBroadcast(dataItem);       
744       broadcast(&dataItem, msg->dimension - 1, false); 
745     }
746     lastDestinationPe = destinationPe; 
747   }
748
749   if (useStagedCompletion_) {
750     markMessageReceived(msg->dimension, msg->finalMsgCount); 
751   }
752
753   delete msg;
754
755 }
756
757 template <class dtype>
758 inline
759 void MeshStreamer<dtype>::sendLargestBuffer() {
760
761   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
762   MeshStreamerMessage<dtype> ** messageBuffers; 
763   MeshStreamerMessage<dtype> *destinationBuffer; 
764
765   for (int i = 0; i < numDimensions_; i++) {
766
767     messageBuffers = dataBuffers_[i]; 
768     numBuffers = individualDimensionSizes_[i]; 
769
770     flushDimension = i; 
771     maxSize = 0;    
772     for (int j = 0; j < numBuffers; j++) {
773       if (messageBuffers[j] != NULL && 
774           messageBuffers[j]->numDataItems > maxSize) {
775         maxSize = messageBuffers[j]->numDataItems;
776         flushIndex = j; 
777       }
778     }
779
780     if (maxSize > 0) {
781
782       messageBuffers = dataBuffers_[flushDimension]; 
783       destinationBuffer = messageBuffers[flushIndex];
784       destinationIndex = myIndex_ + 
785         (flushIndex - myLocationIndex_[flushDimension]) * 
786         combinedDimensionSizes_[flushDimension] ;
787
788       // not sending the full buffer, shrink the message size
789       envelope *env = UsrToEnv(destinationBuffer);
790       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
791                         (bufferSize_ - destinationBuffer->numDataItems));
792       *((int *) env->getPrioPtr()) = prio_;
793
794       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
795
796       if (flushDimension == 0) {
797 #ifdef STREAMER_VERBOSE_OUTPUT
798         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
799 #endif
800         this->thisProxy[destinationIndex].
801           receiveAtDestination(destinationBuffer);
802       }
803       else {
804 #ifdef STREAMER_VERBOSE_OUTPUT
805         CkPrintf("[%d] sending intermediate flush to %d\n", 
806                  CkMyPe(), destinationIndex); 
807 #endif
808         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
809       }
810
811       if (useStagedCompletion_) {
812         cntMsgSent_[i][flushIndex]++; 
813       }
814
815       messageBuffers[flushIndex] = NULL;
816
817     }
818
819   }
820 }
821
822 template <class dtype>
823 inline
824 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
825   
826   for (int i = 0; i < numDimensions_; i++) {
827     flushDimension(i); 
828   }
829 }
830
831 template <class dtype>
832 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
833 #ifdef STREAMER_VERBOSE_OUTPUT
834   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", 
835            CkMyPe(), dimension, sendMsgCounts); 
836 #endif
837   MeshStreamerMessage<dtype> **messageBuffers; 
838   MeshStreamerMessage<dtype> *destinationBuffer; 
839   int destinationIndex, numBuffers; 
840
841   messageBuffers = dataBuffers_[dimension]; 
842   numBuffers = individualDimensionSizes_[dimension]; 
843
844   for (int j = 0; j < numBuffers; j++) {
845
846     if(messageBuffers[j] == NULL) {      
847       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
848         messageBuffers[j] = 
849           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
850         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
851         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
852       }
853       else {
854         continue; 
855       } 
856     }
857
858     destinationBuffer = messageBuffers[j];
859     destinationIndex = myIndex_ + 
860       (j - myLocationIndex_[dimension]) * 
861       combinedDimensionSizes_[dimension] ;
862
863     if (destinationBuffer->numDataItems != 0) {
864       // not sending the full buffer, shrink the message size
865       envelope *env = UsrToEnv(destinationBuffer);
866       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
867                         (bufferSize_ - destinationBuffer->numDataItems));
868       *((int *) env->getPrioPtr()) = prio_;
869     }
870     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
871
872     if (useStagedCompletion_) {
873       cntMsgSent_[dimension][j]++;
874       if (sendMsgCounts) {
875         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
876       }
877     }
878
879     if (dimension == 0) {
880 #ifdef STREAMER_VERBOSE_OUTPUT
881       CkPrintf("[%d] sending dimension flush to %d\n", 
882                CkMyPe(), destinationIndex); 
883 #endif
884       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
885     }
886     else {
887 #ifdef STREAMER_VERBOSE_OUTPUT
888       CkPrintf("[%d] sending intermediate dimension flush to %d\n", 
889                CkMyPe(), destinationIndex); 
890 #endif
891       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
892     }
893     messageBuffers[j] = NULL;
894   }
895   
896 }
897
898
899 template <class dtype>
900 void MeshStreamer<dtype>::flushIfIdle(){
901   // flush if (1) this is not a periodic call or 
902   //          (2) this is a periodic call and no sending took place
903   //              since the last time the function was invoked
904   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
905
906     if (numDataItemsBuffered_ != 0) {
907       flushToIntermediateDestinations();
908     }    
909     CkAssert(numDataItemsBuffered_ == 0); 
910     
911   }
912
913   hasSentRecently_ = false; 
914
915 }
916
917 template <class dtype>
918 void periodicProgressFunction(void *MeshStreamerObj, double time) {
919
920   MeshStreamer<dtype> *properObj = 
921     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
922
923   if (properObj->isPeriodicFlushEnabled()) {
924     properObj->flushIfIdle();
925     properObj->registerPeriodicProgressFunction();
926   }
927 }
928
929 template <class dtype>
930 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
931   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
932                  progressPeriodInMs_); 
933 }
934
935
936 template <class dtype>
937 class GroupMeshStreamer : public MeshStreamer<dtype> {
938 private:
939
940   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
941   MeshStreamerGroupClient<dtype> *clientObj_;
942
943   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
944     for (int i = 0; i < msg->numDataItems; i++) {
945       const dtype& data = msg->getDataItem(i);
946       clientObj_->process(data);
947     }
948
949     if (MeshStreamer<dtype>::useStagedCompletion_) {
950 #ifdef STREAMER_VERBOSE_OUTPUT
951       envelope *env = UsrToEnv(msg);
952       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
953                CkMyPe(), env->getSrcPe(), msg->numDataItems, 
954                msg->finalMsgCount);  
955 #endif
956       markMessageReceived(msg->dimension, msg->finalMsgCount); 
957     }
958     else {
959       this->detectorLocalObj_->consume(msg->numDataItems);    
960     }
961
962     delete msg;
963   }
964
965   inline void localDeliver(const dtype& dataItem) {
966     clientObj_->process(dataItem);
967     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
968       MeshStreamer<dtype>::detectorLocalObj_->consume();
969     }
970   }
971
972   inline void localBroadcast(const dtype& dataItem) {
973     localDeliver(dataItem); 
974   }
975
976   inline int numElementsInClient() {
977     // client is a group - there is one element per PE
978     return CkNumPes();
979   }
980
981   inline int numLocalElementsInClient() {
982     return 1; 
983   }
984
985   inline void initLocalClients() {
986     // no action required
987   }
988
989 public:
990
991   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
992                     int *dimensionSizes, 
993                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
994                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
995    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
996                         0, yieldFlag, progressPeriodInMs) 
997   {
998     clientProxy_ = clientProxy; 
999     clientObj_ = clientProxy_.ckLocalBranch();
1000   }
1001
1002   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
1003                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1004                     int bufferSize, bool yieldFlag = 0, 
1005                     double progressPeriodInMs = -1.0)
1006    :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
1007                         yieldFlag, progressPeriodInMs) 
1008   {
1009     clientProxy_ = clientProxy; 
1010     clientObj_ = clientProxy_.ckLocalBranch();
1011   }
1012
1013 };
1014
1015 template <class dtype>
1016 class ClientInitializer : public CkLocIterator {
1017
1018 public:
1019   
1020   CompletionDetector *detectorLocalObj_;
1021   CkArray *clientArrMgr_;
1022   ClientInitializer(CompletionDetector *detectorObj, 
1023                              CkArray *clientArrMgr) 
1024     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
1025
1026   // CkLocMgr::iterate will call addLocation on all elements local to this PE
1027   inline void addLocation(CkLocation& loc) {
1028
1029     MeshStreamerArrayClient<dtype> *clientObj = 
1030       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1031
1032     CkAssert(clientObj != NULL); 
1033     clientObj->setDetector(detectorLocalObj_); 
1034   }
1035
1036 };
1037
1038 template <class dtype>
1039 class LocalBroadcaster : public CkLocIterator {
1040
1041 public:
1042   CkArray *clientArrMgr_;
1043   const dtype *dataItem_; 
1044
1045   LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem) 
1046    : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
1047
1048   void addLocation(CkLocation& loc) {
1049     MeshStreamerArrayClient<dtype> *clientObj = 
1050       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1051
1052     CkAssert(clientObj != NULL); 
1053
1054     clientObj->process(*dataItem_); 
1055   }
1056
1057 };
1058
1059 template <class dtype, class itype>
1060 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
1061   
1062 private:
1063   
1064   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
1065   CkArray *clientArrayMgr_;
1066   int numArrayElements_;
1067   int numLocalArrayElements_;
1068 #ifdef CACHE_ARRAY_METADATA
1069   MeshStreamerArrayClient<dtype> **clientObjs_;
1070   int *destinationPes_;
1071   bool *isCachedArrayMetadata_;
1072 #endif
1073
1074   inline 
1075   void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
1076     itype arrayId = packedDataItem.arrayIndex; 
1077     if (arrayId == itype(TRAM_BROADCAST)) {
1078       localBroadcast(packedDataItem);
1079       return;
1080     }
1081     MeshStreamerArrayClient<dtype> *clientObj;
1082 #ifdef CACHE_ARRAY_METADATA
1083     clientObj = clientObjs_[arrayId];
1084 #else
1085     clientObj = clientProxy_[arrayId].ckLocal();
1086 #endif
1087
1088     if (clientObj != NULL) {
1089       clientObj->process(packedDataItem.dataItem);
1090       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1091            ::useStagedCompletion_ == false) {
1092         MeshStreamer<ArrayDataItem<dtype, itype> >
1093          ::detectorLocalObj_->consume();
1094       }
1095     }
1096     else { 
1097       // array element is no longer present locally - redeliver using proxy
1098       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
1099     }
1100   }
1101
1102   inline 
1103   void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
1104
1105     LocalBroadcaster<dtype> clientIterator(clientProxy_.ckLocalBranch(), 
1106                                            &packedDataItem.dataItem);
1107     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1108     clientLocMgr->iterate(clientIterator);
1109
1110     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1111          ::useStagedCompletion_ == false) {
1112         MeshStreamer<ArrayDataItem<dtype, itype> >
1113          ::detectorLocalObj_->consume();      
1114     }
1115
1116   }
1117
1118   inline int numElementsInClient() {
1119     return numArrayElements_;
1120   }
1121
1122   inline int numLocalElementsInClient() {
1123     return numLocalArrayElements_;
1124   }
1125
1126   inline void initLocalClients() {
1127     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1128          ::useStagedCompletion_ == false) {
1129 #ifdef CACHE_ARRAY_METADATA
1130       std::fill(isCachedArrayMetadata_, 
1131                 isCachedArrayMetadata_ + numArrayElements_, false);
1132
1133       for (int i = 0; i < numArrayElements_; i++) {
1134         clientObjs_[i] = clientProxy_[i].ckLocal();
1135         if (clientObjs_[i] != NULL) {
1136           clientObjs_[i]->setDetector(
1137            MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
1138         }
1139       }
1140 #else
1141       // set completion detector in local elements of the client
1142       CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1143       ClientInitializer<dtype> clientIterator(
1144           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1145           clientProxy_.ckLocalBranch());
1146       clientLocMgr->iterate(clientIterator);
1147 #endif    
1148     }
1149     else {
1150       numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1151     }
1152   }
1153
1154   inline void commonInit() {
1155 #ifdef CACHE_ARRAY_METADATA
1156     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1157     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1158     destinationPes_ = new int[numArrayElements_];
1159     isCachedArrayMetadata_ = new bool[numArrayElements_];
1160     std::fill(isCachedArrayMetadata_, 
1161               isCachedArrayMetadata_ + numArrayElements_, false);
1162 #endif    
1163   }
1164
1165 public:
1166
1167   struct DataItemHandle {
1168     itype arrayIndex; 
1169     const dtype *dataItem;
1170   };
1171
1172   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1173                     int *dimensionSizes, 
1174                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1175                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1176     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1177                   maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
1178                   0, yieldFlag, progressPeriodInMs) 
1179   {
1180     clientProxy_ = clientProxy; 
1181     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1182     commonInit();
1183   }
1184
1185   ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
1186                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1187                     int bufferSize, bool yieldFlag = 0, 
1188                     double progressPeriodInMs = -1.0)
1189     :MeshStreamer<ArrayDataItem<dtype,itype> >(
1190                   0, numDimensions, dimensionSizes, 
1191                   bufferSize, yieldFlag, progressPeriodInMs) 
1192   {
1193     clientProxy_ = clientProxy; 
1194     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1195     commonInit();
1196
1197   }
1198
1199   ~ArrayMeshStreamer() {
1200 #ifdef CACHE_ARRAY_METADATA
1201     delete [] clientObjs_;
1202     delete [] destinationPes_;
1203     delete [] isCachedArrayMetadata_; 
1204 #endif
1205   }
1206
1207   void receiveAtDestination(
1208        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1209
1210     for (int i = 0; i < msg->numDataItems; i++) {
1211       const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
1212       localDeliver(packedData);
1213     }
1214     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1215       markMessageReceived(msg->dimension, msg->finalMsgCount);
1216     }
1217
1218     delete msg;
1219   }
1220
1221   inline void broadcast(const dtype& dataItem) {
1222     const static bool copyIndirectly = true;
1223
1224     // no data items should be submitted after all local contributors call done
1225     // and staged completion has begun
1226     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1227                ::stagedCompletionStarted()) == false);
1228
1229     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1230          ::useStagedCompletion_ == false) {
1231       MeshStreamer<ArrayDataItem<dtype, itype> >
1232         ::detectorLocalObj_->produce(CkNumPes());
1233     }
1234
1235     // deliver locally
1236     ArrayDataItem<dtype, itype>& packedDataItem(TRAM_BROADCAST, dataItem);
1237     localBroadcast(packedDataItem);
1238
1239     DataItemHandle tempHandle; 
1240     tempHandle.dataItem = &dataItem;
1241     tempHandle.arrayIndex = TRAM_BROADCAST;
1242
1243     int numDimensions = 
1244       MeshStreamer<ArrayDataItem<dtype, itype> >::numDimensions_;
1245     MeshStreamer<ArrayDataItem<dtype, itype> >::
1246       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
1247   }
1248
1249   inline void insertData(const dtype& dataItem, itype arrayIndex) {
1250
1251     // no data items should be submitted after all local contributors call done
1252     // and staged completion has begun
1253     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1254                ::stagedCompletionStarted()) == false);
1255
1256     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1257          ::useStagedCompletion_ == false) {
1258       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1259     }
1260     int destinationPe; 
1261 #ifdef CACHE_ARRAY_METADATA
1262   if (isCachedArrayMetadata_[arrayIndex]) {    
1263     destinationPe =  destinationPes_[arrayIndex];
1264   }
1265   else {
1266     destinationPe = 
1267       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1268     isCachedArrayMetadata_[arrayIndex] = true;
1269     destinationPes_[arrayIndex] = destinationPe;
1270   }
1271 #else 
1272   destinationPe = 
1273     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1274 #endif
1275
1276     if (destinationPe == CkMyPe()) {
1277       ArrayDataItem<dtype, itype> packedDataItem(arrayIndex, dataItem);
1278       localDeliver(packedDataItem);
1279       return;
1280     }
1281
1282     // this implementation avoids copying an item before transfer into message
1283     DataItemHandle tempHandle; 
1284     tempHandle.arrayIndex = arrayIndex; 
1285     tempHandle.dataItem = &dataItem;
1286
1287     MeshStreamer<ArrayDataItem<dtype, itype> >::
1288      insertData(&tempHandle, destinationPe);
1289
1290   }
1291
1292   inline int copyDataItemIntoMessage(
1293       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1294       const void *dataItemHandle, bool copyIndirectly) {
1295
1296     if (copyIndirectly == true) {
1297       // newly inserted items are passed through a handle to avoid copying
1298       int numDataItems = destinationBuffer->numDataItems;
1299       const DataItemHandle *tempHandle = 
1300         (const DataItemHandle *) dataItemHandle;
1301       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1302         *(tempHandle->dataItem);
1303       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1304         tempHandle->arrayIndex;
1305       return ++destinationBuffer->numDataItems;
1306     }
1307     else {
1308       // this is an item received along the route to destination
1309       // we can copy it from the received message
1310       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1311               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1312     }
1313   }
1314
1315 };
1316
1317 template <class dtype>
1318 class GroupChunkMeshStreamer : 
1319   public MeshStreamer<ChunkDataItem> {
1320
1321 private:
1322   dtype **receiveBuffers;
1323   int *receivedChunks;
1324   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
1325   MeshStreamerGroupClient<dtype> *clientObj_;
1326
1327
1328 public:
1329
1330   GroupChunkMeshStreamer(
1331        int maxNumDataItemsBuffered, int numDimensions,
1332        int *dimensionSizes, 
1333        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1334        bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1335     :MeshStreamer<ChunkDataItem>(maxNumDataItemsBuffered, 
1336                                  numDimensions, dimensionSizes,
1337                                  0, yieldFlag, progressPeriodInMs) {
1338
1339     clientProxy_ = clientProxy; 
1340     clientObj_ = clientProxy_.ckLocalBranch();    
1341     commonInit();
1342   }
1343
1344   GroupChunkMeshStreamer(
1345        int numDimensions, int *dimensionSizes, 
1346        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1347        int bufferSize, bool yieldFlag = 0, 
1348        double progressPeriodInMs = -1.0)
1349     :MeshStreamer<ChunkDataItem>(0, numDimensions, dimensionSizes, 
1350                                  bufferSize, yieldFlag, 
1351                                  progressPeriodInMs) {
1352
1353     clientProxy_ = clientProxy; 
1354     clientObj_ = clientProxy_.ckLocalBranch();
1355     commonInit();
1356   }
1357
1358   inline void commonInit() {
1359
1360     receiveBuffers = new dtype*[CkNumPes()];
1361     receivedChunks = new int[CkNumPes()]; 
1362     memset(receivedChunks, 0, CkNumPes() * sizeof(int));
1363     memset(receiveBuffers, 0, CkNumPes() * sizeof(dtype*)); 
1364
1365   }
1366
1367   inline void insertData(dtype *dataArray, int numElements, int destinationPe) {
1368
1369     char *inputData = (char *) dataArray; 
1370     int arraySizeInBytes = numElements * sizeof(dtype); 
1371     ChunkDataItem chunk;
1372     int chunkNumber = 0; 
1373     chunk.sourcePe = CkMyPe();
1374     chunk.chunkNumber = 0; 
1375     chunk.chunkSize = CHUNK_SIZE;
1376     chunk.numChunks = 
1377       (int) ceil ( (float) numElements * sizeof(dtype) / CHUNK_SIZE); 
1378     chunk.numItems = numElements; 
1379     for (int offset = 0; offset < arraySizeInBytes; offset += CHUNK_SIZE) {
1380
1381       if (offset + CHUNK_SIZE > arraySizeInBytes) {
1382         chunk.chunkSize = arraySizeInBytes - offset; 
1383         memset(chunk.rawData, 0, CHUNK_SIZE);
1384         memcpy(chunk.rawData, inputData + offset, chunk.chunkSize); 
1385       }
1386       else {
1387         memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
1388       }
1389
1390       MeshStreamer<ChunkDataItem>::insertData(chunk, destinationPe); 
1391       chunk.chunkNumber++; 
1392     }
1393
1394   }
1395
1396   inline void processChunk(const ChunkDataItem& chunk) {
1397
1398     if (receiveBuffers[chunk.sourcePe] == NULL) {
1399       receiveBuffers[chunk.sourcePe] = new dtype[chunk.numItems]; 
1400     }      
1401
1402     char *receiveBuffer = (char *) receiveBuffers[chunk.sourcePe];
1403
1404     memcpy(receiveBuffer + chunk.chunkNumber * CHUNK_SIZE, 
1405            chunk.rawData, chunk.chunkSize);
1406     if (++receivedChunks[chunk.sourcePe] == chunk.numChunks) {
1407       clientObj_->receiveArray(
1408                   (dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
1409       receivedChunks[chunk.sourcePe] = 0;        
1410       delete [] receiveBuffers[chunk.sourcePe]; 
1411       receiveBuffers[chunk.sourcePe] = NULL;
1412     }
1413
1414   }
1415
1416   inline void localDeliver(const ChunkDataItem& chunk) {
1417     processChunk(chunk);
1418     if (MeshStreamer<ChunkDataItem>::useStagedCompletion_ == false) {
1419       MeshStreamer<ChunkDataItem>::detectorLocalObj_->consume();
1420     }
1421   }
1422
1423   inline void receiveAtDestination(
1424        MeshStreamerMessage<ChunkDataItem> *msg) {
1425
1426     for (int i = 0; i < msg->numDataItems; i++) {
1427       const ChunkDataItem& chunk = msg->getDataItem(i);
1428       processChunk(chunk);             
1429     }
1430
1431     if (MeshStreamer<ChunkDataItem>::useStagedCompletion_) {
1432 #ifdef STREAMER_VERBOSE_OUTPUT
1433       envelope *env = UsrToEnv(msg);
1434       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
1435                CkMyPe(), env->getSrcPe(), msg->numDataItems, 
1436                msg->finalMsgCount);  
1437 #endif
1438       markMessageReceived(msg->dimension, msg->finalMsgCount); 
1439     }
1440     else {
1441       this->detectorLocalObj_->consume(msg->numDataItems);    
1442     }
1443
1444     delete msg;
1445     
1446   }
1447
1448   inline void localBroadcast(const ChunkDataItem& dataItem) {
1449     localDeliver(dataItem); 
1450   }
1451
1452   inline int numElementsInClient() {
1453     // client is a group - there is one element per PE
1454     return CkNumPes();
1455   }
1456
1457   inline int numLocalElementsInClient() {
1458     return 1; 
1459   }
1460
1461   inline void initLocalClients() {
1462     // no action required
1463   }
1464
1465 };
1466
1467
1468
1469 #define CK_TEMPLATES_ONLY
1470 #include "NDMeshStreamer.def.h"
1471 #undef CK_TEMPLATES_ONLY
1472
1473 #endif