NDMeshStreamer: aggregate within the lowest dimension first.
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include <list>
6 #include <map>
7 #include "NDMeshStreamer.decl.h"
8 #include "DataItemTypes.h"
9 #include "completion.h"
10 #include "ckarray.h"
11
12 // limit total number of buffered data items to
13 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
14 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
15 // take advantage of nonuniform filling of buffers
16 #define OVERALLOCATION_FACTOR 4
17
18 // #define CACHE_LOCATIONS
19 // #define SUPPORT_INCOMPLETE_MESH
20 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
21 // #define STREAMER_VERBOSE_OUTPUT
22
23 #define TRAM_BROADCAST (-100)
24
25 extern void QdCreate(int n);
26 extern void QdProcess(int n);
27
28 struct MeshLocation {
29   int dimension;
30   int bufferIndex;
31 };
32
33 template<class dtype>
34 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
35
36 public:
37
38   int finalMsgCount;
39   int dimension;
40   int numDataItems;
41   int *destinationPes;
42   dtype *dataItems;
43
44   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
45     finalMsgCount = -1;
46   }
47
48   inline int addDataItem(const dtype& dataItem) {
49     dataItems[numDataItems] = dataItem;
50     return ++numDataItems;
51   }
52
53   inline void markDestination(const int index, const int destinationPe) {
54     destinationPes[index] = destinationPe;
55   }
56
57   inline const dtype& getDataItem(const int index) {
58     return dataItems[index];
59   }
60 };
61
62 template <class dtype>
63 class MeshStreamer : public CBase_MeshStreamer<dtype> {
64
65 private:
66   int bufferSize_;
67   int maxNumDataItemsBuffered_;
68   int numDataItemsBuffered_;
69
70   int numMembers_;
71   int *individualDimensionSizes_;
72   int *combinedDimensionSizes_;
73
74   int myIndex_;
75   int *myLocationIndex_;
76
77   CkCallback   userCallback_;
78   bool yieldFlag_;
79
80   double progressPeriodInMs_;
81   bool isPeriodicFlushEnabled_;
82   bool hasSentRecently_;
83   MeshStreamerMessage<dtype> ***dataBuffers_;
84
85   CProxy_CompletionDetector detector_;
86   int prio_;
87   int yieldCount_;
88
89 #ifdef CACHE_LOCATIONS
90   MeshLocation *cachedLocations_;
91   bool *isCached_;
92 #endif
93
94
95   // only used for staged completion
96   int **cntMsgSent_;
97   int *cntMsgReceived_;
98   int *cntMsgExpected_;
99   int *cntFinished_;
100   int dimensionToFlush_;
101   int numLocalDone_;
102   int numLocalContributors_;
103
104   virtual void localDeliver(const dtype& dataItem) = 0;
105   virtual void localBroadcast(const dtype& dataItem) = 0;
106
107   virtual void initLocalClients() = 0;
108
109   void sendLargestBuffer();
110   void flushToIntermediateDestinations();
111   void flushDimension(int dimension, bool sendMsgCounts = false);
112
113 protected:
114
115   int numDimensions_;
116   bool useStagedCompletion_;
117   bool useCompletionDetection_;
118   CompletionDetector *detectorLocalObj_;
119   virtual int copyDataItemIntoMessage(
120               MeshStreamerMessage<dtype> *destinationBuffer,
121               const void *dataItemHandle, bool copyIndirectly = false);
122   void insertData(const void *dataItemHandle, int destinationPe);
123   void broadcast(const void *dataItemHandle, int dimension,
124                  bool copyIndirectly);
125   MeshLocation determineLocation(int destinationPe,
126                                  int dimensionReceivedAlong);
127   void storeMessage(int destinationPe,
128                     const MeshLocation& destinationCoordinates,
129                     const void *dataItem, bool copyIndirectly = false);
130
131   void ctorHelper(int maxNumDataItemsBuffered, int numDimensions,
132                   int *dimensionSizes, int bufferSize,
133                   bool yieldFlag, double progressPeriodInMs);
134
135 public:
136
137   MeshStreamer() {}
138   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
139                int *dimensionSizes, int bufferSize,
140                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
141   ~MeshStreamer();
142
143   // entry
144   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb,
145             int prio, bool usePeriodicFlushing);
146   void init(int numContributors, CkCallback startCb, CkCallback endCb,
147             CProxy_CompletionDetector detector,
148             int prio, bool usePeriodicFlushing);
149   void init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb,
150             int prio, bool usePeriodicFlushing);
151   void init(CkCallback endCb, int prio);
152
153   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
154   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
155   void flushIfIdle();
156   void finish();
157
158   // non entry
159   inline bool isPeriodicFlushEnabled() {
160     return isPeriodicFlushEnabled_;
161   }
162   virtual void insertData(const dtype& dataItem, int destinationPe);
163   virtual void broadcast(const dtype& dataItem);
164
165   void sendMeshStreamerMessage(MeshStreamerMessage<dtype> *destinationBuffer,
166                                int dimension, int destinationIndex);
167
168   void registerPeriodicProgressFunction();
169   // flushing begins only after enablePeriodicFlushing has been invoked
170   inline void enablePeriodicFlushing(){
171     isPeriodicFlushEnabled_ = true;
172     registerPeriodicProgressFunction();
173   }
174
175   inline void done(int numContributorsFinished = 1) {
176
177     if (useStagedCompletion_) {
178       numLocalDone_ += numContributorsFinished;
179       CkAssert(numLocalDone_ <= numLocalContributors_);
180       if (numLocalDone_ == numLocalContributors_) {
181         startStagedCompletion();
182       }
183     }
184     else if (useCompletionDetection_){
185       detectorLocalObj_->done(numContributorsFinished);
186     }
187   }
188
189   inline bool stagedCompletionStarted() {
190     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1);
191   }
192
193   inline void startStagedCompletion() {
194
195     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
196       flushDimension(dimensionToFlush_, true);
197     }
198     dimensionToFlush_--;
199
200     checkForCompletedStages();
201   }
202
203   inline void markMessageReceived(int dimension, int finalCount) {
204
205     cntMsgReceived_[dimension]++;
206     if (finalCount != -1) {
207       cntFinished_[dimension]++;
208       cntMsgExpected_[dimension] += finalCount;
209 #ifdef STREAMER_VERBOSE_OUTPUT
210       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
211                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension,
212                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
213                cntMsgReceived_[dimension]);
214 #endif
215     }
216     if (dimensionToFlush_ != numDimensions_ - 1) {
217       checkForCompletedStages();
218     }
219   }
220
221   inline void checkForCompletedStages() {
222
223     while (cntFinished_[dimensionToFlush_ + 1] ==
224            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
225            cntMsgExpected_[dimensionToFlush_ + 1] ==
226            cntMsgReceived_[dimensionToFlush_ + 1]) {
227       if (dimensionToFlush_ == -1) {
228 #ifdef STREAMER_VERBOSE_OUTPUT
229         CkPrintf("[%d] contribute\n", CkMyPe());
230 #endif
231         CkAssert(numDataItemsBuffered_ == 0);
232         isPeriodicFlushEnabled_ = false;
233         if (!userCallback_.isInvalid()) {
234           this->contribute(userCallback_);
235           userCallback_ = CkCallback();
236         }
237         return;
238       }
239       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
240         flushDimension(dimensionToFlush_, true);
241       }
242       dimensionToFlush_--;
243     }
244   }
245 };
246
247 template <class dtype>
248 MeshStreamer<dtype>::
249 MeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
250              int *dimensionSizes, int bufferSize, bool yieldFlag,
251              double progressPeriodInMs) {
252   ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
253              bufferSize, yieldFlag, progressPeriodInMs);
254 }
255
256 template <class dtype>
257 void MeshStreamer<dtype>::
258 ctorHelper(int maxNumDataItemsBuffered, int numDimensions,
259            int *dimensionSizes, int bufferSize,
260            bool yieldFlag, double progressPeriodInMs) {
261
262   numDimensions_ = numDimensions;
263   maxNumDataItemsBuffered_ = maxNumDataItemsBuffered;
264   yieldFlag_ = yieldFlag;
265   progressPeriodInMs_ = progressPeriodInMs;
266   bufferSize_ = bufferSize;
267
268   int sumAlongAllDimensions = 0;
269   individualDimensionSizes_ = new int[numDimensions_];
270   combinedDimensionSizes_ = new int[numDimensions_];
271   myLocationIndex_ = new int[numDimensions_];
272   memcpy(individualDimensionSizes_, dimensionSizes,
273          numDimensions_ * sizeof(int));
274   combinedDimensionSizes_[numDimensions - 1] = 1;
275   sumAlongAllDimensions += individualDimensionSizes_[numDimensions_ - 1];
276   for (int i = numDimensions_ - 2; i >= 0; i--) {
277     sumAlongAllDimensions += individualDimensionSizes_[i];
278     combinedDimensionSizes_[i] =
279       combinedDimensionSizes_[i + 1] * individualDimensionSizes_[i + 1];
280   }
281   CkAssert(combinedDimensionSizes_[0] * individualDimensionSizes_[0]== CkNumPes());
282
283   // a bufferSize input of 0 indicates it should be calculated by the library
284   if (bufferSize_ == 0) {
285     CkAssert(maxNumDataItemsBuffered_ > 0);
286     // buffers for dimensions with the
287     //   same index as the sender's are not allocated/used
288     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_
289       / (sumAlongAllDimensions - numDimensions_ + 1);
290   }
291   else {
292     maxNumDataItemsBuffered_ =
293       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1)
294       / OVERALLOCATION_FACTOR;
295   }
296
297   if (bufferSize_ <= 0) {
298     bufferSize_ = 1;
299     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
300              "is invalid. Defaulting to a single buffer per destination.\n");
301   }
302   numDataItemsBuffered_ = 0;
303   numMembers_ = CkNumPes();
304
305   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_];
306   for (int i = 0; i < numDimensions; i++) {
307     int numMembersAlongDimension = individualDimensionSizes_[i];
308     dataBuffers_[i] =
309       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
310     for (int j = 0; j < numMembersAlongDimension; j++) {
311       dataBuffers_[i][j] = NULL;
312     }
313   }
314
315   myIndex_ = CkMyPe();
316   int remainder = myIndex_;
317   for (int i = 0; i < numDimensions_; i++) {
318     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
319     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
320   }
321
322   isPeriodicFlushEnabled_ = false;
323   detectorLocalObj_ = NULL;
324
325 #ifdef CACHE_LOCATIONS
326   cachedLocations_ = new MeshLocation[numMembers_];
327   isCached_ = new bool[numMembers_];
328   std::fill(isCached_, isCached_ + numMembers_, false);
329 #endif
330
331   cntMsgSent_ = NULL;
332   cntMsgReceived_ = NULL;
333   cntMsgExpected_ = NULL;
334   cntFinished_ = NULL;
335 }
336
337 template <class dtype>
338 MeshStreamer<dtype>::~MeshStreamer() {
339
340   for (int i = 0; i < numDimensions_; i++) {
341     for (int j=0; j < individualDimensionSizes_[i]; j++) {
342       delete[] dataBuffers_[i][j];
343     }
344     delete[] dataBuffers_[i];
345   }
346
347   delete[] individualDimensionSizes_;
348   delete[] combinedDimensionSizes_;
349   delete[] myLocationIndex_;
350
351 #ifdef CACHE_LOCATIONS
352   delete[] cachedLocations_;
353   delete[] isCached_;
354 #endif
355
356   if (cntMsgSent_ != NULL) {
357     for (int i = 0; i < numDimensions_; i++) {
358       delete[] cntMsgSent_[i];
359     }
360     delete[] cntMsgSent_;
361     delete[] cntMsgReceived_;
362     delete[] cntMsgExpected_;
363     delete[] cntFinished_;
364   }
365 }
366
367 template <class dtype>
368 inline MeshLocation MeshStreamer<dtype>::
369 determineLocation(int destinationPe, int dimensionReceivedAlong) {
370
371 #ifdef CACHE_LOCATIONS
372   if (isCached_[destinationPe]) {
373     return cachedLocations_[destinationPe];
374   }
375 #endif
376
377   MeshLocation destinationLocation;
378   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {
379     int blockIndex = destinationPe / combinedDimensionSizes_[i];
380
381     int dimensionIndex =
382       blockIndex - blockIndex / individualDimensionSizes_[i]
383       * individualDimensionSizes_[i];
384
385     if (dimensionIndex != myLocationIndex_[i]) {
386       destinationLocation.dimension = i;
387       destinationLocation.bufferIndex = dimensionIndex;
388 #ifdef CACHE_LOCATIONS
389       cachedLocations_[destinationPe] = destinationLocation;
390       isCached_[destinationPe] = true;
391 #endif
392       return destinationLocation;
393     }
394   }
395
396   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
397           "equal to sender's PE. This is unexpected and may cause errors.\n");
398   // to prevent warnings
399   return destinationLocation;
400 }
401
402 template <class dtype>
403 inline int MeshStreamer<dtype>::
404 copyDataItemIntoMessage(MeshStreamerMessage<dtype> *destinationBuffer,
405                         const void *dataItemHandle, bool copyIndirectly) {
406   return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle));
407 }
408
409 template <class dtype>
410 inline void MeshStreamer<dtype>::
411 sendMeshStreamerMessage(MeshStreamerMessage<dtype> *destinationBuffer,
412                         int dimension, int destinationIndex) {
413
414     if (dimension == 0) {
415 #ifdef STREAMER_VERBOSE_OUTPUT
416       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex);
417 #endif
418       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
419     }
420     else {
421 #ifdef STREAMER_VERBOSE_OUTPUT
422       CkPrintf("[%d] sending intermediate to %d\n",
423                CkMyPe(), destinationIndex);
424 #endif
425       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
426     }
427 }
428
429 template <class dtype>
430 inline void MeshStreamer<dtype>::
431 storeMessage(int destinationPe, const MeshLocation& destinationLocation,
432              const void *dataItem, bool copyIndirectly) {
433
434   int dimension = destinationLocation.dimension;
435   int bufferIndex = destinationLocation.bufferIndex;
436   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];
437
438   // allocate new message if necessary
439   if (messageBuffers[bufferIndex] == NULL) {
440     if (dimension == 0) {
441       // personalized messages do not require destination indices
442       messageBuffers[bufferIndex] =
443         new (0, bufferSize_, 8 * sizeof(int))
444          MeshStreamerMessage<dtype>(dimension);
445     }
446     else {
447       messageBuffers[bufferIndex] =
448         new (bufferSize_, bufferSize_, 8 * sizeof(int))
449          MeshStreamerMessage<dtype>(dimension);
450     }
451     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
452     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
453     CkAssert(messageBuffers[bufferIndex] != NULL);
454   }
455
456   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
457   int numBuffered =
458     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
459   if (dimension != 0) {
460     destinationBuffer->markDestination(numBuffered-1, destinationPe);
461   }
462   numDataItemsBuffered_++;
463
464   // send if buffer is full
465   if (numBuffered == bufferSize_) {
466
467     int destinationIndex;
468
469     destinationIndex = myIndex_ +
470       (bufferIndex - myLocationIndex_[dimension]) *
471       combinedDimensionSizes_[dimension];
472
473     sendMeshStreamerMessage(destinationBuffer, dimension, destinationIndex);
474
475     if (useStagedCompletion_) {
476       cntMsgSent_[dimension][bufferIndex]++;
477     }
478
479     messageBuffers[bufferIndex] = NULL;
480     numDataItemsBuffered_ -= numBuffered;
481     hasSentRecently_ = true;
482
483   }
484   // send if total buffering capacity has been reached
485   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
486     sendLargestBuffer();
487     hasSentRecently_ = true;
488   }
489 }
490
491 template <class dtype>
492 inline void MeshStreamer<dtype>::broadcast(const dtype& dataItem) {
493
494   const static bool copyIndirectly = true;
495
496   // no data items should be submitted after all local contributors call done
497   // and staged completion has begun
498   CkAssert(stagedCompletionStarted() == false);
499
500   // produce and consume once per PE
501   if (useCompletionDetection_) {
502     detectorLocalObj_->produce(CkNumPes());
503   }
504   QdCreate(CkNumPes());
505
506   // deliver locally
507   localBroadcast(dataItem);
508
509   broadcast(&dataItem, numDimensions_ - 1, copyIndirectly);
510 }
511
512 template <class dtype>
513 inline void MeshStreamer<dtype>::
514 broadcast(const void *dataItemHandle, int dimension, bool copyIndirectly) {
515
516   MeshLocation destinationLocation;
517   destinationLocation.dimension = dimension;
518
519   while (destinationLocation.dimension != -1) {
520     for (int i = 0;
521          i < individualDimensionSizes_[destinationLocation.dimension];
522          i++) {
523
524       if (i != myLocationIndex_[destinationLocation.dimension]) {
525         destinationLocation.bufferIndex = i;
526         storeMessage(TRAM_BROADCAST, destinationLocation,
527                      dataItemHandle, copyIndirectly);
528       }
529       // release control to scheduler if requested by the user,
530       //   assume caller is threaded entry
531       if (yieldFlag_ && ++yieldCount_ == 1024) {
532         yieldCount_ = 0;
533         CthYield();
534       }
535     }
536     destinationLocation.dimension--;
537   }
538 }
539
540 template <class dtype>
541 inline void MeshStreamer<dtype>::
542 insertData(const void *dataItemHandle, int destinationPe) {
543
544   const static bool copyIndirectly = true;
545
546   // treat newly inserted items as if they were received along
547   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
548   MeshLocation destinationLocation = determineLocation(destinationPe,
549                                                        numDimensions_);
550   storeMessage(destinationPe, destinationLocation, dataItemHandle,
551                copyIndirectly);
552   // release control to scheduler if requested by the user,
553   //   assume caller is threaded entry
554   if (yieldFlag_ && ++yieldCount_ == 1024) {
555     yieldCount_ = 0;
556     CthYield();
557   }
558 }
559
560 template <class dtype>
561 inline void MeshStreamer<dtype>::
562 insertData(const dtype& dataItem, int destinationPe) {
563
564   // no data items should be submitted after all local contributors call done
565   // and staged completion has begun
566   CkAssert(stagedCompletionStarted() == false);
567
568   if (useCompletionDetection_) {
569     detectorLocalObj_->produce();
570   }
571   QdCreate(1);
572   if (destinationPe == CkMyPe()) {
573     localDeliver(dataItem);
574     return;
575   }
576
577   insertData((const void *) &dataItem, destinationPe);
578 }
579
580 template <class dtype>
581 void MeshStreamer<dtype>::init(CkCallback endCb, int prio) {
582
583   useStagedCompletion_ = false;
584   useCompletionDetection_ = false;
585
586   yieldCount_ = 0;
587   userCallback_ = endCb;
588   prio_ = prio;
589
590   initLocalClients();
591
592   hasSentRecently_ = false;
593   enablePeriodicFlushing();
594 }
595
596 template <class dtype>
597 void MeshStreamer<dtype>::
598 init(int numLocalContributors, CkCallback startCb, CkCallback endCb, int prio,
599      bool usePeriodicFlushing) {
600
601   useStagedCompletion_ = true;
602   useCompletionDetection_ = false;
603   // allocate memory on first use
604   if (cntMsgSent_ == NULL) {
605     cntMsgSent_ = new int*[numDimensions_];
606     cntMsgReceived_ = new int[numDimensions_];
607     cntMsgExpected_ = new int[numDimensions_];
608     cntFinished_ = new int[numDimensions_];
609
610     for (int i = 0; i < numDimensions_; i++) {
611       cntMsgSent_[i] = new int[individualDimensionSizes_[i]];
612     }
613   }
614
615
616   for (int i = 0; i < numDimensions_; i++) {
617     std::fill(cntMsgSent_[i],
618               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
619     cntMsgReceived_[i] = 0;
620     cntMsgExpected_[i] = 0;
621     cntFinished_[i] = 0;
622   }
623   dimensionToFlush_ = numDimensions_ - 1;
624
625   yieldCount_ = 0;
626   userCallback_ = endCb;
627   prio_ = prio;
628
629   numLocalDone_ = 0;
630   numLocalContributors_ = numLocalContributors;
631   initLocalClients();
632
633   if (numLocalContributors_ == 0) {
634     startStagedCompletion();
635   }
636
637   hasSentRecently_ = false;
638   if (usePeriodicFlushing) {
639     enablePeriodicFlushing();
640   }
641
642   this->contribute(startCb);
643 }
644
645 template <class dtype>
646 void MeshStreamer<dtype>::
647 init(int numContributors, CkCallback startCb, CkCallback endCb,
648      CProxy_CompletionDetector detector,
649      int prio, bool usePeriodicFlushing) {
650
651   useStagedCompletion_ = false;
652   useCompletionDetection_ = true;
653   yieldCount_ = 0;
654   prio_ = prio;
655   userCallback_ = endCb;
656
657   // to facilitate completion, enable flushing after all contributors
658   //  have finished submitting items
659   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::enablePeriodicFlushing(),
660                      this->thisProxy);
661   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(),
662                     this->thisProxy);
663   detector_ = detector;
664   detectorLocalObj_ = detector_.ckLocalBranch();
665   initLocalClients();
666
667   detectorLocalObj_->start_detection(numContributors, startCb, flushCb,
668                                      finish , 0);
669
670   if (progressPeriodInMs_ <= 0) {
671     CkPrintf("Using completion detection in NDMeshStreamer requires"
672              " setting a valid periodic flush period. Defaulting"
673              " to 10 ms\n");
674     progressPeriodInMs_ = 10;
675   }
676
677   hasSentRecently_ = false;
678   if (usePeriodicFlushing) {
679     enablePeriodicFlushing();
680   }
681 }
682
683 template <class dtype>
684 void MeshStreamer<dtype>::
685 init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb, int prio,
686      bool usePeriodicFlushing) {
687
688   CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
689   int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
690   init(numLocalElements, startCb, endCb, prio, usePeriodicFlushing);
691 }
692
693
694 template <class dtype>
695 void MeshStreamer<dtype>::finish() {
696
697   isPeriodicFlushEnabled_ = false;
698
699   if (!userCallback_.isInvalid()) {
700     this->contribute(userCallback_);
701     userCallback_ = CkCallback();      // nullify the current callback
702   }
703 }
704
705 template <class dtype>
706 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
707
708   int destinationPe, lastDestinationPe;
709   MeshLocation destinationLocation;
710
711   lastDestinationPe = -1;
712   for (int i = 0; i < msg->numDataItems; i++) {
713     destinationPe = msg->destinationPes[i];
714     const dtype& dataItem = msg->getDataItem(i);
715     if (destinationPe == CkMyPe()) {
716       localDeliver(dataItem);
717     }
718     else if (destinationPe != TRAM_BROADCAST) {
719       if (destinationPe != lastDestinationPe) {
720         // do this once per sequence of items with the same destination
721         destinationLocation = determineLocation(destinationPe, msg->dimension);
722       }
723       storeMessage(destinationPe, destinationLocation, &dataItem);
724     }
725     else /* if (destinationPe == TRAM_BROADCAST) */ {
726       localBroadcast(dataItem);
727       broadcast(&dataItem, msg->dimension - 1, false);
728     }
729     lastDestinationPe = destinationPe;
730   }
731
732   if (useStagedCompletion_) {
733     markMessageReceived(msg->dimension, msg->finalMsgCount);
734   }
735
736   delete msg;
737 }
738
739 template <class dtype>
740 inline void MeshStreamer<dtype>::sendLargestBuffer() {
741
742   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
743   MeshStreamerMessage<dtype> ** messageBuffers;
744   MeshStreamerMessage<dtype> *destinationBuffer;
745
746   for (int i = 0; i < numDimensions_; i++) {
747
748     messageBuffers = dataBuffers_[i];
749     numBuffers = individualDimensionSizes_[i];
750
751     flushDimension = i;
752     maxSize = 0;
753     for (int j = 0; j < numBuffers; j++) {
754       if (messageBuffers[j] != NULL &&
755           messageBuffers[j]->numDataItems > maxSize) {
756         maxSize = messageBuffers[j]->numDataItems;
757         flushIndex = j;
758       }
759     }
760
761     if (maxSize > 0) {
762
763       messageBuffers = dataBuffers_[flushDimension];
764       destinationBuffer = messageBuffers[flushIndex];
765       destinationIndex = myIndex_ +
766         (flushIndex - myLocationIndex_[flushDimension]) *
767         combinedDimensionSizes_[flushDimension] ;
768
769       // not sending the full buffer, shrink the message size
770       envelope *env = UsrToEnv(destinationBuffer);
771       env->shrinkUsersize((bufferSize_ - destinationBuffer->numDataItems)
772                         * sizeof(dtype));
773       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
774       sendMeshStreamerMessage(destinationBuffer, flushDimension,
775                               destinationIndex);
776
777       if (useStagedCompletion_) {
778         cntMsgSent_[i][flushIndex]++;
779       }
780
781       messageBuffers[flushIndex] = NULL;
782     }
783   }
784 }
785
786 template <class dtype>
787 inline void MeshStreamer<dtype>::flushToIntermediateDestinations() {
788   for (int i = 0; i < numDimensions_; i++) {
789     flushDimension(i);
790   }
791 }
792
793 template <class dtype>
794 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
795
796 #ifdef STREAMER_VERBOSE_OUTPUT
797   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n",
798            CkMyPe(), dimension, sendMsgCounts);
799 #endif
800   MeshStreamerMessage<dtype> **messageBuffers;
801   MeshStreamerMessage<dtype> *destinationBuffer;
802   int destinationIndex, numBuffers;
803
804   messageBuffers = dataBuffers_[dimension];
805   numBuffers = individualDimensionSizes_[dimension];
806
807   for (int j = 0; j < numBuffers; j++) {
808
809     if(messageBuffers[j] == NULL) {
810       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
811         messageBuffers[j] =
812           new (0, 0, 8 * sizeof(int)) MeshStreamerMessage<dtype>(dimension);
813         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
814         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
815       }
816       else {
817         continue;
818       }
819     }
820
821     destinationBuffer = messageBuffers[j];
822     destinationIndex = myIndex_ +
823       (j - myLocationIndex_[dimension]) *
824       combinedDimensionSizes_[dimension] ;
825
826     if (destinationBuffer->numDataItems != 0) {
827       // not sending the full buffer, shrink the message size
828       envelope *env = UsrToEnv(destinationBuffer);
829       env->shrinkUsersize((bufferSize_ - destinationBuffer->numDataItems)
830                           * sizeof(dtype));
831     }
832     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
833
834     if (useStagedCompletion_) {
835       cntMsgSent_[dimension][j]++;
836       if (sendMsgCounts) {
837         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
838       }
839     }
840
841     sendMeshStreamerMessage(destinationBuffer, dimension,
842                             destinationIndex);
843     messageBuffers[j] = NULL;
844   }
845 }
846
847 template <class dtype>
848 void MeshStreamer<dtype>::flushIfIdle(){
849
850   // flush if (1) this is not a periodic call or
851   //          (2) this is a periodic call and no sending took place
852   //              since the last time the function was invoked
853   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
854
855     if (numDataItemsBuffered_ != 0) {
856       flushToIntermediateDestinations();
857     }
858     CkAssert(numDataItemsBuffered_ == 0);
859
860   }
861
862   hasSentRecently_ = false;
863 }
864
865 template <class dtype>
866 void periodicProgressFunction(void *MeshStreamerObj, double time) {
867
868   MeshStreamer<dtype> *properObj =
869     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj);
870
871   if (properObj->isPeriodicFlushEnabled()) {
872     properObj->flushIfIdle();
873     properObj->registerPeriodicProgressFunction();
874   }
875 }
876
877 template <class dtype>
878 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
879   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this,
880                  progressPeriodInMs_);
881 }
882
883
884 template <class dtype, class ClientType>
885 class GroupMeshStreamer : public CBase_GroupMeshStreamer<dtype, ClientType> {
886 private:
887
888   CkGroupID clientGID_;
889   ClientType *clientObj_;
890
891   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
892     for (int i = 0; i < msg->numDataItems; i++) {
893       const dtype& data = msg->getDataItem(i);
894       clientObj_->process(data);
895     }
896
897     if (MeshStreamer<dtype>::useStagedCompletion_) {
898 #ifdef STREAMER_VERBOSE_OUTPUT
899       envelope *env = UsrToEnv(msg);
900       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n",
901                CkMyPe(), env->getSrcPe(), msg->numDataItems,
902                msg->finalMsgCount);
903 #endif
904       this->markMessageReceived(msg->dimension, msg->finalMsgCount);
905     }
906     else if (MeshStreamer<dtype>::useCompletionDetection_){
907       this->detectorLocalObj_->consume(msg->numDataItems);
908     }
909     QdProcess(msg->numDataItems);
910     delete msg;
911   }
912
913   inline void localDeliver(const dtype& dataItem) {
914     clientObj_->process(dataItem);
915     if (MeshStreamer<dtype>::useCompletionDetection_) {
916       MeshStreamer<dtype>::detectorLocalObj_->consume();
917     }
918     QdProcess(1);
919   }
920
921   inline void localBroadcast(const dtype& dataItem) {
922     localDeliver(dataItem);
923   }
924
925   inline void initLocalClients() {
926     // no action required
927   }
928
929 public:
930
931   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
932                     int *dimensionSizes,
933                     CkGroupID clientGID,
934                     bool yieldFlag = 0, double progressPeriodInMs = -1.0) {
935     this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
936                0, yieldFlag, progressPeriodInMs);
937     clientGID_ = clientGID;
938     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
939
940   }
941
942   GroupMeshStreamer(int numDimensions, int *dimensionSizes,
943                     CkGroupID clientGID,
944                     int bufferSize, bool yieldFlag = 0,
945                     double progressPeriodInMs = -1.0) {
946     this->ctorHelper(0, numDimensions, dimensionSizes, bufferSize,
947                yieldFlag, progressPeriodInMs);
948     clientGID_ = clientGID;
949     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
950
951   }
952 };
953
954 template <class dtype, class ClientType>
955 class LocalBroadcaster : public CkLocIterator {
956
957 public:
958   CkArray *clientArrMgr_;
959   const dtype *dataItem_;
960
961   LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem)
962    : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
963
964   void addLocation(CkLocation& loc) {
965     ClientType *clientObj =
966       (ClientType *) clientArrMgr_->lookup(loc.getIndex());
967     CkAssert(clientObj != NULL);
968     clientObj->process(*dataItem_);
969   }
970
971 };
972
973 template <class dtype, class itype, class ClientType>
974 class ArrayMeshStreamer : public CBase_ArrayMeshStreamer<dtype, itype,
975                                                          ClientType> {
976
977 private:
978
979   CkArrayID clientAID_;
980   CkArray *clientArrayMgr_;
981   CkLocMgr *clientLocMgr_;
982   int numArrayElements_;
983   int numLocalArrayElements_;
984   std::map<itype, std::vector<ArrayDataItem<dtype, itype> > > misdeliveredItems;
985 #ifdef CACHE_ARRAY_METADATA
986   ClientType **clientObjs_;
987   int *destinationPes_;
988   bool *isCachedArrayMetadata_;
989 #endif
990
991   inline
992   void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
993
994     itype arrayId = packedDataItem.arrayIndex;
995     if (arrayId == itype(TRAM_BROADCAST)) {
996       localBroadcast(packedDataItem);
997       return;
998     }
999     ClientType *clientObj;
1000 #ifdef CACHE_ARRAY_METADATA
1001     clientObj = clientObjs_[arrayId];
1002 #else
1003     clientObj = (ClientType *) clientArrayMgr_->lookup(arrayId);
1004 #endif
1005
1006     if (clientObj != NULL) {
1007       clientObj->process(packedDataItem.dataItem);
1008       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1009            ::useCompletionDetection_) {
1010         MeshStreamer<ArrayDataItem<dtype, itype> >
1011          ::detectorLocalObj_->consume();
1012       }
1013       QdProcess(1);
1014     }
1015     else {
1016       // array element arrayId is no longer present locally:
1017       //  buffer the data item and request updated PE index
1018       //  to be sent to the source and this PE
1019       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1020           ::useStagedCompletion_) {
1021         CkAbort("Using staged completion when array locations"
1022                 " are not guaranteed to be correct is currently"
1023                 " not supported.");
1024       }
1025       misdeliveredItems[arrayId].push_back(packedDataItem);
1026       if (misdeliveredItems[arrayId].size() == 1) {
1027         int homePe = clientLocMgr_->homePe(arrayId);
1028         this->thisProxy[homePe].
1029           processLocationRequest(arrayId, CkMyPe(), packedDataItem.sourcePe);
1030       }
1031     }
1032   }
1033
1034   inline
1035   void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
1036
1037     LocalBroadcaster<dtype, ClientType>
1038       clientIterator(clientArrayMgr_, &packedDataItem.dataItem);
1039     clientLocMgr_->iterate(clientIterator);
1040
1041     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1042          ::useCompletionDetection_) {
1043         MeshStreamer<ArrayDataItem<dtype, itype> >
1044          ::detectorLocalObj_->consume();
1045     }
1046     QdProcess(1);
1047   }
1048
1049   inline void initLocalClients() {
1050
1051     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1052          ::useCompletionDetection_) {
1053 #ifdef CACHE_ARRAY_METADATA
1054       std::fill(isCachedArrayMetadata_,
1055                 isCachedArrayMetadata_ + numArrayElements_, false);
1056
1057       for (int i = 0; i < numArrayElements_; i++) {
1058         clientObjs_[i] =
1059           (ClientType*) ( clientArrayMgr_->lookup(CkArrayIndex1D(i)) );
1060       }
1061 #endif
1062     }
1063   }
1064
1065   inline void commonInit() {
1066 #ifdef CACHE_ARRAY_METADATA
1067     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1068     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1069     destinationPes_ = new int[numArrayElements_];
1070     isCachedArrayMetadata_ = new bool[numArrayElements_];
1071     std::fill(isCachedArrayMetadata_,
1072               isCachedArrayMetadata_ + numArrayElements_, false);
1073 #endif
1074   }
1075
1076 public:
1077
1078   struct DataItemHandle {
1079     itype arrayIndex;
1080     const dtype *dataItem;
1081   };
1082
1083   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1084                     int *dimensionSizes, CkArrayID clientAID,
1085                     bool yieldFlag = 0, double progressPeriodInMs = -1.0) {
1086
1087     this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes, 0,
1088                      yieldFlag, progressPeriodInMs);
1089     clientAID_ = clientAID;
1090     clientArrayMgr_ = clientAID_.ckLocalBranch();
1091     clientLocMgr_ = clientArrayMgr_->getLocMgr();
1092     commonInit();
1093   }
1094
1095   ArrayMeshStreamer(int numDimensions, int *dimensionSizes,
1096                     CkArrayID clientAID, int bufferSize, bool yieldFlag = 0,
1097                     double progressPeriodInMs = -1.0) {
1098
1099     this->ctorHelper(0, numDimensions, dimensionSizes, bufferSize, yieldFlag,
1100                      progressPeriodInMs);
1101     clientAID_ = clientAID;
1102     clientArrayMgr_ = clientAID_.ckLocalBranch();
1103     clientLocMgr_ = clientArrayMgr_->getLocMgr();
1104     commonInit();
1105   }
1106
1107   ~ArrayMeshStreamer() {
1108 #ifdef CACHE_ARRAY_METADATA
1109     delete [] clientObjs_;
1110     delete [] destinationPes_;
1111     delete [] isCachedArrayMetadata_;
1112 #endif
1113   }
1114
1115   void receiveAtDestination(
1116        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1117
1118     for (int i = 0; i < msg->numDataItems; i++) {
1119       const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
1120       localDeliver(packedData);
1121     }
1122     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1123       this->markMessageReceived(msg->dimension, msg->finalMsgCount);
1124     }
1125
1126     delete msg;
1127   }
1128
1129   inline void broadcast(const dtype& dataItem) {
1130     const static bool copyIndirectly = true;
1131
1132     // no data items should be submitted after all local contributors call done
1133     // and staged completion has begun
1134     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1135                ::stagedCompletionStarted()) == false);
1136
1137     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1138          ::useCompletionDetection_) {
1139       MeshStreamer<ArrayDataItem<dtype, itype> >
1140         ::detectorLocalObj_->produce(CkNumPes());
1141     }
1142     QdCreate(CkNumPes());
1143
1144     // deliver locally
1145     ArrayDataItem<dtype, itype>& packedDataItem(TRAM_BROADCAST, CkMyPe(),
1146                                                 dataItem);
1147     localBroadcast(packedDataItem);
1148
1149     DataItemHandle tempHandle;
1150     tempHandle.dataItem = &dataItem;
1151     tempHandle.arrayIndex = TRAM_BROADCAST;
1152
1153     int numDimensions =
1154       MeshStreamer<ArrayDataItem<dtype, itype> >::numDimensions_;
1155     MeshStreamer<ArrayDataItem<dtype, itype> >::
1156       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
1157   }
1158
1159   inline void insertData(const dtype& dataItem, itype arrayIndex) {
1160
1161     // no data items should be submitted after all local contributors call done
1162     // and staged completion has begun
1163     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1164                ::stagedCompletionStarted()) == false);
1165
1166     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1167          ::useCompletionDetection_) {
1168       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1169     }
1170     QdCreate(1);
1171     int destinationPe;
1172 #ifdef CACHE_ARRAY_METADATA
1173     if (isCachedArrayMetadata_[arrayIndex]) {
1174       destinationPe =  destinationPes_[arrayIndex];
1175     }
1176     else {
1177       destinationPe =
1178         clientArrayMgr_->lastKnown(arrayIndex);
1179       isCachedArrayMetadata_[arrayIndex] = true;
1180       destinationPes_[arrayIndex] = destinationPe;
1181     }
1182 #else
1183     destinationPe =
1184       clientArrayMgr_->lastKnown(arrayIndex);
1185 #endif
1186
1187     if (destinationPe == CkMyPe()) {
1188       ArrayDataItem<dtype, itype> packedDataItem(arrayIndex, CkMyPe(), dataItem);
1189       localDeliver(packedDataItem);
1190       return;
1191     }
1192
1193     // this implementation avoids copying an item before transfer into message
1194     DataItemHandle tempHandle;
1195     tempHandle.arrayIndex = arrayIndex;
1196     tempHandle.dataItem = &dataItem;
1197
1198     MeshStreamer<ArrayDataItem<dtype, itype> >::
1199       insertData(&tempHandle, destinationPe);
1200
1201   }
1202
1203   inline int copyDataItemIntoMessage(
1204
1205       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer,
1206       const void *dataItemHandle, bool copyIndirectly) {
1207
1208     if (copyIndirectly == true) {
1209       // newly inserted items are passed through a handle to avoid copying
1210       int numDataItems = destinationBuffer->numDataItems;
1211       const DataItemHandle *tempHandle =
1212         (const DataItemHandle *) dataItemHandle;
1213       (destinationBuffer->dataItems)[numDataItems].arrayIndex =
1214         tempHandle->arrayIndex;
1215       (destinationBuffer->dataItems)[numDataItems].sourcePe = CkMyPe();
1216       (destinationBuffer->dataItems)[numDataItems].dataItem =
1217         *(tempHandle->dataItem);
1218       return ++destinationBuffer->numDataItems;
1219     }
1220     else {
1221       // this is an item received along the route to destination
1222       // we can copy it from the received message
1223       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1224               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1225     }
1226   }
1227
1228   // always called on homePE for array element arrayId
1229   void processLocationRequest(itype arrayId, int deliveredToPe, int sourcePe) {
1230     int ownerPe = clientArrayMgr_->lastKnown(arrayId);
1231     this->thisProxy[deliveredToPe].resendMisdeliveredItems(arrayId, ownerPe);
1232     this->thisProxy[sourcePe].updateLocationAtSource(arrayId, sourcePe);
1233   }
1234
1235   void resendMisdeliveredItems(itype arrayId, int destinationPe) {
1236
1237     clientLocMgr_->updateLocation(arrayId, destinationPe);
1238
1239     std::vector<ArrayDataItem<dtype, itype> > &bufferedItems
1240       = misdeliveredItems[arrayId];
1241
1242     MeshLocation destinationLocation =
1243       determineLocation(destinationPe, MeshStreamer
1244                         <ArrayDataItem<dtype, itype> >::numDimensions_);
1245     for (int i = 0; i < bufferedItems.size(); i++) {
1246       storeMessage(destinationPe, destinationLocation, &bufferedItems[i]);
1247     }
1248
1249     bufferedItems.clear();
1250   }
1251
1252   void updateLocationAtSource(itype arrayId, int destinationPe) {
1253
1254     int prevOwner = clientArrayMgr_->lastKnown(arrayId);
1255
1256     if (prevOwner != destinationPe) {
1257       clientLocMgr_->updateLocation(arrayId, destinationPe);
1258
1259 //    // could also try to correct destinations of items buffered for arrayId,
1260 //    // but it would take significant additional computation, so leaving it out;
1261 //    // the items will get forwarded after being delivered to the previous owner
1262 //    MeshLocation oldLocation = determineLocation(prevOwner, numDimensions_);
1263
1264 //    MeshStreamerMessage<dtype> *messageBuffer = dataBuffers_
1265 //     [oldLocation.dimension][oldLocation.bufferIndex];
1266
1267 //    if (messageBuffer != NULL) {
1268 //      // TODO: find items for arrayId, move them to buffer for destinationPe
1269 //      // do not leave holes in messageBuffer
1270 //    }
1271     }
1272   }
1273
1274 };
1275
1276 struct ChunkReceiveBuffer {
1277   int bufferNumber;
1278   int receivedChunks;
1279   char *buffer;
1280 };
1281
1282 struct ChunkOutOfOrderBuffer {
1283   int bufferNumber;
1284   int receivedChunks;
1285   int sourcePe;
1286   char *buffer;
1287
1288   ChunkOutOfOrderBuffer(int b, int r, int s, char *buf)
1289     : bufferNumber(b), receivedChunks(r), sourcePe(s), buffer(buf) {}
1290
1291   bool operator==(const ChunkDataItem &chunk) {
1292     return ( (chunk.bufferNumber == bufferNumber) &&
1293              (chunk.sourcePe == sourcePe) );
1294   }
1295
1296 };
1297
1298 template <class dtype, class ClientType >
1299 class GroupChunkMeshStreamer
1300   : public CBase_GroupChunkMeshStreamer<dtype, ClientType> {
1301
1302 private:
1303   // implementation assumes very few buffers will be received out of order
1304   // if this is not the case a different data structure may be preferable
1305   std::list<ChunkOutOfOrderBuffer> outOfOrderBuffers_;
1306   ChunkReceiveBuffer *lastReceived_;
1307   int *currentBufferNumbers_;
1308
1309   CkGroupID clientGID_;
1310   ClientType *clientObj_;
1311
1312   bool userHandlesFreeing_;
1313 public:
1314
1315   GroupChunkMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1316                          int *dimensionSizes, CkGroupID clientGID,
1317                          bool yieldFlag = 0, double progressPeriodInMs = -1.0,
1318                          bool userHandlesFreeing = false) {
1319
1320     this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
1321                      0, yieldFlag, progressPeriodInMs);
1322     clientGID_ = clientGID;
1323     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
1324     userHandlesFreeing_ = userHandlesFreeing;
1325     commonInit();
1326   }
1327
1328   GroupChunkMeshStreamer(int numDimensions, int *dimensionSizes,
1329                          CkGroupID clientGID, int bufferSize,
1330                          bool yieldFlag = 0, double progressPeriodInMs = -1.0,
1331                          bool userHandlesFreeing = false) {
1332
1333     this->ctorHelper(0, numDimensions, dimensionSizes,  bufferSize, yieldFlag,
1334                progressPeriodInMs);
1335     clientGID_ = clientGID;
1336     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
1337     userHandlesFreeing_ = userHandlesFreeing;
1338     commonInit();
1339   }
1340
1341   inline void commonInit() {
1342     lastReceived_ = new ChunkReceiveBuffer[CkNumPes()];
1343     currentBufferNumbers_ = new int[CkNumPes()];
1344     memset(lastReceived_, 0, CkNumPes() * sizeof(ChunkReceiveBuffer));
1345     memset(currentBufferNumbers_, 0, CkNumPes() * sizeof(int));
1346   }
1347
1348   inline void insertData(dtype *dataArray, int numElements, int destinationPe,
1349                          void *extraData = NULL, int extraDataSize = 0) {
1350
1351     char *inputData = (char *) dataArray;
1352     int arraySizeInBytes = numElements * sizeof(dtype);
1353     int totalSizeInBytes = arraySizeInBytes + extraDataSize;
1354     ChunkDataItem chunk;
1355     int offset;
1356     int chunkNumber = 0;
1357     chunk.bufferNumber = currentBufferNumbers_[destinationPe]++;
1358     chunk.sourcePe = CkMyPe();
1359     chunk.chunkNumber = 0;
1360     chunk.chunkSize = CHUNK_SIZE;
1361     chunk.numChunks =  (int) ceil ( (float) totalSizeInBytes / CHUNK_SIZE);
1362     chunk.numItems = numElements;
1363
1364     // loop over full chunks - handle leftovers and extra data later
1365     for (offset = 0; offset < arraySizeInBytes - CHUNK_SIZE;
1366          offset += CHUNK_SIZE) {
1367         memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
1368         MeshStreamer<ChunkDataItem>::insertData(chunk, destinationPe);
1369         chunk.chunkNumber++;
1370     }
1371
1372     // final (possibly incomplete) array chunk
1373     chunk.chunkSize = arraySizeInBytes - offset;
1374     memset(chunk.rawData, 0, CHUNK_SIZE);
1375     memcpy(chunk.rawData, inputData + offset, chunk.chunkSize);
1376
1377     // extra data (place in last chunk if possible)
1378     int remainingToSend = extraDataSize;
1379     int tempOffset = chunk.chunkSize;
1380     int extraOffset = 0;
1381     do {
1382       chunk.chunkSize = std::min(tempOffset + remainingToSend, CHUNK_SIZE);
1383       memcpy(chunk.rawData + tempOffset, (char *) extraData + extraOffset,
1384              chunk.chunkSize - tempOffset);
1385
1386       MeshStreamer<ChunkDataItem>::insertData(chunk, destinationPe);
1387       chunk.chunkNumber++;
1388       offset += CHUNK_SIZE;
1389       extraOffset += (chunk.chunkSize - tempOffset);
1390       remainingToSend -= (chunk.chunkSize - tempOffset);
1391       tempOffset = 0;
1392     } while (offset < totalSizeInBytes);
1393
1394   }
1395
1396   inline void processChunk(const ChunkDataItem& chunk) {
1397
1398     ChunkReceiveBuffer &last = lastReceived_[chunk.sourcePe];
1399
1400     if (last.buffer == NULL) {
1401       if (outOfOrderBuffers_.size() == 0) {
1402         // make common case fast
1403         last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
1404         last.receivedChunks = 0;
1405       }
1406       else {
1407         // check if chunks for this buffer have been received previously
1408         std::list<ChunkOutOfOrderBuffer>::iterator storedBuffer =
1409           find(outOfOrderBuffers_.begin(), outOfOrderBuffers_.end(), chunk);
1410         if (storedBuffer != outOfOrderBuffers_.end()) {
1411           last.buffer = storedBuffer->buffer;
1412           last.receivedChunks = storedBuffer->receivedChunks;
1413           outOfOrderBuffers_.erase(storedBuffer);
1414         }
1415         else {
1416           last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
1417           last.receivedChunks = 0;
1418         }
1419       }
1420       last.bufferNumber = chunk.bufferNumber;
1421     }
1422     else if (last.bufferNumber != chunk.bufferNumber) {
1423       // add last to list of out of order buffers
1424       ChunkOutOfOrderBuffer lastOutOfOrderBuffer(last.bufferNumber,
1425                                                  last.receivedChunks,
1426                                                  chunk.sourcePe, last.buffer);
1427       outOfOrderBuffers_.push_front(lastOutOfOrderBuffer);
1428
1429       //search through list of out of order buffers for this chunk's buffer
1430       std::list<ChunkOutOfOrderBuffer >::iterator storedBuffer =
1431         find(outOfOrderBuffers_.begin(), outOfOrderBuffers_.end(), chunk);
1432
1433       if (storedBuffer == outOfOrderBuffers_.end() ) {
1434         // allocate new buffer
1435         last.bufferNumber = chunk.bufferNumber;
1436         last.receivedChunks = 0;
1437         last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
1438       }
1439       else {
1440         // use existing buffer
1441         last.bufferNumber = storedBuffer->bufferNumber;
1442         last.receivedChunks = storedBuffer->receivedChunks;
1443         last.buffer = storedBuffer->buffer;
1444         outOfOrderBuffers_.erase(storedBuffer);
1445       }
1446     }
1447
1448     char *receiveBuffer = last.buffer;
1449
1450     memcpy(receiveBuffer + chunk.chunkNumber * CHUNK_SIZE,
1451            chunk.rawData, chunk.chunkSize);
1452     if (++last.receivedChunks == chunk.numChunks) {
1453       clientObj_->receiveArray(
1454                   (dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
1455       last.receivedChunks = 0;
1456       if (!userHandlesFreeing_) {
1457         delete [] last.buffer;
1458       }
1459       last.buffer = NULL;
1460     }
1461
1462   }
1463
1464   inline void localDeliver(const ChunkDataItem& chunk) {
1465     processChunk(chunk);
1466     if (this->useCompletionDetection_) {
1467       this->detectorLocalObj_->consume();
1468     }
1469     QdProcess(1);
1470   }
1471
1472   void receiveAtDestination(
1473        MeshStreamerMessage<ChunkDataItem> *msg) {
1474
1475     for (int i = 0; i < msg->numDataItems; i++) {
1476       const ChunkDataItem& chunk = msg->getDataItem(i);
1477       processChunk(chunk);
1478     }
1479
1480     if (this->useStagedCompletion_) {
1481 #ifdef STREAMER_VERBOSE_OUTPUT
1482       envelope *env = UsrToEnv(msg);
1483       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n",
1484                CkMyPe(), env->getSrcPe(), msg->numDataItems,
1485                msg->finalMsgCount);
1486 #endif
1487       this->markMessageReceived(msg->dimension, msg->finalMsgCount);
1488     }
1489     else if (this->useCompletionDetection_){
1490       this->detectorLocalObj_->consume(msg->numDataItems);
1491     }
1492     QdProcess(msg->numDataItems);
1493     delete msg;
1494
1495   }
1496
1497   inline void localBroadcast(const ChunkDataItem& dataItem) {
1498     localDeliver(dataItem);
1499   }
1500
1501   inline void initLocalClients() {
1502     // no action required
1503   }
1504
1505 };
1506
1507
1508
1509 #define CK_TEMPLATES_ONLY
1510 #include "NDMeshStreamer.def.h"
1511 #undef CK_TEMPLATES_ONLY
1512
1513 #endif