3c2d38c54fa0b14f8a25ff33055da9c99977e12b
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include <list>
6 #include <map>
7 #include "NDMeshStreamer.decl.h"
8 #include "DataItemTypes.h"
9 #include "completion.h"
10 #include "ckarray.h"
11
12 // limit total number of buffered data items to
13 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
14 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
15 // take advantage of nonuniform filling of buffers
16 #define OVERALLOCATION_FACTOR 4
17
18 // #define CACHE_LOCATIONS
19 // #define SUPPORT_INCOMPLETE_MESH
20 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
21 // #define STREAMER_VERBOSE_OUTPUT
22
23 #define TRAM_BROADCAST (-100)
24
25 extern void QdCreate(int n);
26 extern void QdProcess(int n);
27
28 struct MeshLocation {
29   int dimension;
30   int bufferIndex;
31 };
32
33 template<class dtype>
34 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
35
36 public:
37
38   int finalMsgCount;
39   int dimension;
40   int numDataItems;
41   int *destinationPes;
42   dtype *dataItems;
43
44   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
45     finalMsgCount = -1;
46   }
47
48   inline int addDataItem(const dtype& dataItem) {
49     dataItems[numDataItems] = dataItem;
50     return ++numDataItems;
51   }
52
53   inline void markDestination(const int index, const int destinationPe) {
54     destinationPes[index] = destinationPe;
55   }
56
57   inline const dtype& getDataItem(const int index) {
58     return dataItems[index];
59   }
60 };
61
62 template <class dtype>
63 class MeshStreamer : public CBase_MeshStreamer<dtype> {
64
65 private:
66   int bufferSize_;
67   int maxNumDataItemsBuffered_;
68   int numDataItemsBuffered_;
69
70   int numMembers_;
71   int *individualDimensionSizes_;
72   int *combinedDimensionSizes_;
73
74   int *startingIndexAtDimension_;
75
76   int myIndex_;
77   int *myLocationIndex_;
78
79   CkCallback   userCallback_;
80   bool yieldFlag_;
81
82   double progressPeriodInMs_;
83   bool isPeriodicFlushEnabled_;
84   bool hasSentRecently_;
85   MeshStreamerMessage<dtype> ***dataBuffers_;
86
87   CProxy_CompletionDetector detector_;
88   int prio_;
89   int yieldCount_;
90
91 #ifdef CACHE_LOCATIONS
92   MeshLocation *cachedLocations_;
93   bool *isCached_;
94 #endif
95
96
97   // only used for staged completion
98   int **cntMsgSent_;
99   int *cntMsgReceived_;
100   int *cntMsgExpected_;
101   int *cntFinished_;
102   int dimensionToFlush_;
103   int numLocalDone_;
104   int numLocalContributors_;
105
106   virtual void localDeliver(const dtype& dataItem) = 0;
107   virtual void localBroadcast(const dtype& dataItem) = 0;
108
109   virtual void initLocalClients() = 0;
110
111   void sendLargestBuffer();
112   void flushToIntermediateDestinations();
113   void flushDimension(int dimension, bool sendMsgCounts = false);
114
115 protected:
116
117   int numDimensions_;
118   bool useStagedCompletion_;
119   bool useCompletionDetection_;
120   CompletionDetector *detectorLocalObj_;
121   virtual int copyDataItemIntoMessage(
122               MeshStreamerMessage<dtype> *destinationBuffer,
123               const void *dataItemHandle, bool copyIndirectly = false);
124   void insertData(const void *dataItemHandle, int destinationPe);
125   void broadcast(const void *dataItemHandle, int dimension,
126                  bool copyIndirectly);
127   MeshLocation determineLocation(int destinationPe,
128                                  int dimensionReceivedAlong);
129   void storeMessage(int destinationPe,
130                     const MeshLocation& destinationCoordinates,
131                     const void *dataItem, bool copyIndirectly = false);
132
133   void ctorHelper(int maxNumDataItemsBuffered, int numDimensions,
134                   int *dimensionSizes, int bufferSize,
135                   bool yieldFlag, double progressPeriodInMs);
136
137 public:
138
139   MeshStreamer() {}
140   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
141                int *dimensionSizes, int bufferSize,
142                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
143   ~MeshStreamer();
144
145   // entry
146   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb,
147             int prio, bool usePeriodicFlushing);
148   void init(int numContributors, CkCallback startCb, CkCallback endCb,
149             CProxy_CompletionDetector detector,
150             int prio, bool usePeriodicFlushing);
151   void init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb,
152             int prio, bool usePeriodicFlushing);
153   void init(CkCallback endCb, int prio);
154
155   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
156   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
157   void flushIfIdle();
158   void finish();
159
160   // non entry
161   inline bool isPeriodicFlushEnabled() {
162     return isPeriodicFlushEnabled_;
163   }
164   virtual void insertData(const dtype& dataItem, int destinationPe);
165   virtual void broadcast(const dtype& dataItem);
166
167   void sendMeshStreamerMessage(MeshStreamerMessage<dtype> *destinationBuffer,
168                                int dimension, int destinationIndex);
169
170   void registerPeriodicProgressFunction();
171   // flushing begins only after enablePeriodicFlushing has been invoked
172   inline void enablePeriodicFlushing(){
173     isPeriodicFlushEnabled_ = true;
174     registerPeriodicProgressFunction();
175   }
176
177   inline void done(int numContributorsFinished = 1) {
178
179     if (useStagedCompletion_) {
180       numLocalDone_ += numContributorsFinished;
181       CkAssert(numLocalDone_ <= numLocalContributors_);
182       if (numLocalDone_ == numLocalContributors_) {
183         startStagedCompletion();
184       }
185     }
186     else if (useCompletionDetection_){
187       detectorLocalObj_->done(numContributorsFinished);
188     }
189   }
190
191   inline bool stagedCompletionStarted() {
192     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1);
193   }
194
195   inline void startStagedCompletion() {
196
197     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
198       flushDimension(dimensionToFlush_, true);
199     }
200     dimensionToFlush_--;
201
202     checkForCompletedStages();
203   }
204
205   inline void markMessageReceived(int dimension, int finalCount) {
206
207     cntMsgReceived_[dimension]++;
208     if (finalCount != -1) {
209       cntFinished_[dimension]++;
210       cntMsgExpected_[dimension] += finalCount;
211 #ifdef STREAMER_VERBOSE_OUTPUT
212       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
213                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension,
214                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
215                cntMsgReceived_[dimension]);
216 #endif
217     }
218     if (dimensionToFlush_ != numDimensions_ - 1) {
219       checkForCompletedStages();
220     }
221   }
222
223   inline void checkForCompletedStages() {
224
225     while (cntFinished_[dimensionToFlush_ + 1] ==
226            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
227            cntMsgExpected_[dimensionToFlush_ + 1] ==
228            cntMsgReceived_[dimensionToFlush_ + 1]) {
229       if (dimensionToFlush_ == -1) {
230 #ifdef STREAMER_VERBOSE_OUTPUT
231         CkPrintf("[%d] contribute\n", CkMyPe());
232 #endif
233         CkAssert(numDataItemsBuffered_ == 0);
234         isPeriodicFlushEnabled_ = false;
235         if (!userCallback_.isInvalid()) {
236           this->contribute(userCallback_);
237           userCallback_ = CkCallback();
238         }
239         return;
240       }
241       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
242         flushDimension(dimensionToFlush_, true);
243       }
244       dimensionToFlush_--;
245     }
246   }
247 };
248
249 template <class dtype>
250 MeshStreamer<dtype>::
251 MeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
252              int *dimensionSizes, int bufferSize, bool yieldFlag,
253              double progressPeriodInMs) {
254   ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
255              bufferSize, yieldFlag, progressPeriodInMs);
256 }
257
258 template <class dtype>
259 void MeshStreamer<dtype>::
260 ctorHelper(int maxNumDataItemsBuffered, int numDimensions,
261            int *dimensionSizes, int bufferSize,
262            bool yieldFlag, double progressPeriodInMs) {
263
264   numDimensions_ = numDimensions;
265   maxNumDataItemsBuffered_ = maxNumDataItemsBuffered;
266   yieldFlag_ = yieldFlag;
267   progressPeriodInMs_ = progressPeriodInMs;
268   bufferSize_ = bufferSize;
269
270   int sumAlongAllDimensions = 0;
271   individualDimensionSizes_ = new int[numDimensions_];
272   combinedDimensionSizes_ = new int[numDimensions_ + 1];
273   myLocationIndex_ = new int[numDimensions_];
274   startingIndexAtDimension_ = new int[numDimensions_ + 1];
275   memcpy(individualDimensionSizes_, dimensionSizes,
276          numDimensions * sizeof(int));
277   combinedDimensionSizes_[0] = 1;
278   for (int i = 0; i < numDimensions; i++) {
279     sumAlongAllDimensions += individualDimensionSizes_[i];
280     combinedDimensionSizes_[i + 1] =
281       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
282   }
283
284   CkAssert(combinedDimensionSizes_[numDimensions] == CkNumPes());
285
286   // a bufferSize input of 0 indicates it should be calculated by the library
287   if (bufferSize_ == 0) {
288     CkAssert(maxNumDataItemsBuffered_ > 0);
289     // buffers for dimensions with the
290     //   same index as the sender's are not allocated/used
291     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_
292       / (sumAlongAllDimensions - numDimensions_ + 1);
293   }
294   else {
295     maxNumDataItemsBuffered_ =
296       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1)
297       / OVERALLOCATION_FACTOR;
298   }
299
300   if (bufferSize_ <= 0) {
301     bufferSize_ = 1;
302     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
303              "is invalid. Defaulting to a single buffer per destination.\n");
304   }
305   numDataItemsBuffered_ = 0;
306   numMembers_ = CkNumPes();
307
308   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_];
309   for (int i = 0; i < numDimensions; i++) {
310     int numMembersAlongDimension = individualDimensionSizes_[i];
311     dataBuffers_[i] =
312       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
313     for (int j = 0; j < numMembersAlongDimension; j++) {
314       dataBuffers_[i][j] = NULL;
315     }
316   }
317
318   myIndex_ = CkMyPe();
319   int remainder = myIndex_;
320   startingIndexAtDimension_[numDimensions_] = 0;
321   for (int i = numDimensions_ - 1; i >= 0; i--) {
322     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
323     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
324     remainder -= dimensionOffset;
325     startingIndexAtDimension_[i] =
326       startingIndexAtDimension_[i+1] + dimensionOffset;
327   }
328
329   isPeriodicFlushEnabled_ = false;
330   detectorLocalObj_ = NULL;
331
332 #ifdef CACHE_LOCATIONS
333   cachedLocations_ = new MeshLocation[numMembers_];
334   isCached_ = new bool[numMembers_];
335   std::fill(isCached_, isCached_ + numMembers_, false);
336 #endif
337
338   cntMsgSent_ = NULL;
339   cntMsgReceived_ = NULL;
340   cntMsgExpected_ = NULL;
341   cntFinished_ = NULL;
342 }
343
344 template <class dtype>
345 MeshStreamer<dtype>::~MeshStreamer() {
346
347   for (int i = 0; i < numDimensions_; i++) {
348     for (int j=0; j < individualDimensionSizes_[i]; j++) {
349       delete[] dataBuffers_[i][j];
350     }
351     delete[] dataBuffers_[i];
352   }
353
354   delete[] individualDimensionSizes_;
355   delete[] combinedDimensionSizes_;
356   delete[] myLocationIndex_;
357   delete[] startingIndexAtDimension_;
358
359 #ifdef CACHE_LOCATIONS
360   delete[] cachedLocations_;
361   delete[] isCached_;
362 #endif
363
364   if (cntMsgSent_ != NULL) {
365     for (int i = 0; i < numDimensions_; i++) {
366       delete[] cntMsgSent_[i];
367     }
368     delete[] cntMsgSent_;
369     delete[] cntMsgReceived_;
370     delete[] cntMsgExpected_;
371     delete[] cntFinished_;
372   }
373 }
374
375 template <class dtype>
376 inline MeshLocation MeshStreamer<dtype>::
377 determineLocation(int destinationPe, int dimensionReceivedAlong) {
378
379 #ifdef CACHE_LOCATIONS
380   if (isCached_[destinationPe]) {
381     return cachedLocations_[destinationPe];
382   }
383 #endif
384
385   MeshLocation destinationLocation;
386   int remainder =
387     destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
388   int dimensionIndex;
389   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {
390     dimensionIndex = remainder / combinedDimensionSizes_[i];
391
392     if (dimensionIndex != myLocationIndex_[i]) {
393       destinationLocation.dimension = i;
394       destinationLocation.bufferIndex = dimensionIndex;
395 #ifdef CACHE_LOCATIONS
396       cachedLocations_[destinationPe] = destinationLocation;
397       isCached_[destinationPe] = true;
398 #endif
399       return destinationLocation;
400     }
401
402     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
403   }
404
405   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
406           "equal to sender's PE. This is unexpected and may cause errors.\n");
407   // to prevent warnings
408   return destinationLocation;
409 }
410
411 template <class dtype>
412 inline int MeshStreamer<dtype>::
413 copyDataItemIntoMessage(MeshStreamerMessage<dtype> *destinationBuffer,
414                         const void *dataItemHandle, bool copyIndirectly) {
415   return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle));
416 }
417
418 template <class dtype>
419 inline void MeshStreamer<dtype>::
420 sendMeshStreamerMessage(MeshStreamerMessage<dtype> *destinationBuffer,
421                         int dimension, int destinationIndex) {
422
423     if (dimension == 0) {
424 #ifdef STREAMER_VERBOSE_OUTPUT
425       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex);
426 #endif
427       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
428     }
429     else {
430 #ifdef STREAMER_VERBOSE_OUTPUT
431       CkPrintf("[%d] sending intermediate to %d\n",
432                CkMyPe(), destinationIndex);
433 #endif
434       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
435     }
436 }
437
438 template <class dtype>
439 inline void MeshStreamer<dtype>::
440 storeMessage(int destinationPe, const MeshLocation& destinationLocation,
441              const void *dataItem, bool copyIndirectly) {
442
443   int dimension = destinationLocation.dimension;
444   int bufferIndex = destinationLocation.bufferIndex;
445   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];
446
447   // allocate new message if necessary
448   if (messageBuffers[bufferIndex] == NULL) {
449     if (dimension == 0) {
450       // personalized messages do not require destination indices
451       messageBuffers[bufferIndex] =
452         new (0, bufferSize_, 8 * sizeof(int))
453          MeshStreamerMessage<dtype>(dimension);
454     }
455     else {
456       messageBuffers[bufferIndex] =
457         new (bufferSize_, bufferSize_, 8 * sizeof(int))
458          MeshStreamerMessage<dtype>(dimension);
459     }
460     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
461     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
462     CkAssert(messageBuffers[bufferIndex] != NULL);
463   }
464
465   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
466   int numBuffered =
467     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
468   if (dimension != 0) {
469     destinationBuffer->markDestination(numBuffered-1, destinationPe);
470   }
471   numDataItemsBuffered_++;
472
473   // send if buffer is full
474   if (numBuffered == bufferSize_) {
475
476     int destinationIndex;
477
478     destinationIndex = myIndex_ +
479       (bufferIndex - myLocationIndex_[dimension]) *
480       combinedDimensionSizes_[dimension];
481
482     sendMeshStreamerMessage(destinationBuffer, dimension, destinationIndex);
483
484     if (useStagedCompletion_) {
485       cntMsgSent_[dimension][bufferIndex]++;
486     }
487
488     messageBuffers[bufferIndex] = NULL;
489     numDataItemsBuffered_ -= numBuffered;
490     hasSentRecently_ = true;
491
492   }
493   // send if total buffering capacity has been reached
494   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
495     sendLargestBuffer();
496     hasSentRecently_ = true;
497   }
498 }
499
500 template <class dtype>
501 inline void MeshStreamer<dtype>::broadcast(const dtype& dataItem) {
502
503   const static bool copyIndirectly = true;
504
505   // no data items should be submitted after all local contributors call done
506   // and staged completion has begun
507   CkAssert(stagedCompletionStarted() == false);
508
509   // produce and consume once per PE
510   if (useCompletionDetection_) {
511     detectorLocalObj_->produce(CkNumPes());
512   }
513   QdCreate(CkNumPes());
514
515   // deliver locally
516   localBroadcast(dataItem);
517
518   broadcast(&dataItem, numDimensions_ - 1, copyIndirectly);
519 }
520
521 template <class dtype>
522 inline void MeshStreamer<dtype>::
523 broadcast(const void *dataItemHandle, int dimension, bool copyIndirectly) {
524
525   MeshLocation destinationLocation;
526   destinationLocation.dimension = dimension;
527
528   while (destinationLocation.dimension != -1) {
529     for (int i = 0;
530          i < individualDimensionSizes_[destinationLocation.dimension];
531          i++) {
532
533       if (i != myLocationIndex_[destinationLocation.dimension]) {
534         destinationLocation.bufferIndex = i;
535         storeMessage(TRAM_BROADCAST, destinationLocation,
536                      dataItemHandle, copyIndirectly);
537       }
538       // release control to scheduler if requested by the user,
539       //   assume caller is threaded entry
540       if (yieldFlag_ && ++yieldCount_ == 1024) {
541         yieldCount_ = 0;
542         CthYield();
543       }
544     }
545     destinationLocation.dimension--;
546   }
547 }
548
549 template <class dtype>
550 inline void MeshStreamer<dtype>::
551 insertData(const void *dataItemHandle, int destinationPe) {
552
553   const static bool copyIndirectly = true;
554
555   // treat newly inserted items as if they were received along
556   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
557   MeshLocation destinationLocation = determineLocation(destinationPe,
558                                                        numDimensions_);
559   storeMessage(destinationPe, destinationLocation, dataItemHandle,
560                copyIndirectly);
561   // release control to scheduler if requested by the user,
562   //   assume caller is threaded entry
563   if (yieldFlag_ && ++yieldCount_ == 1024) {
564     yieldCount_ = 0;
565     CthYield();
566   }
567 }
568
569 template <class dtype>
570 inline void MeshStreamer<dtype>::
571 insertData(const dtype& dataItem, int destinationPe) {
572
573   // no data items should be submitted after all local contributors call done
574   // and staged completion has begun
575   CkAssert(stagedCompletionStarted() == false);
576
577   if (useCompletionDetection_) {
578     detectorLocalObj_->produce();
579   }
580   QdCreate(1);
581   if (destinationPe == CkMyPe()) {
582     localDeliver(dataItem);
583     return;
584   }
585
586   insertData((const void *) &dataItem, destinationPe);
587 }
588
589 template <class dtype>
590 void MeshStreamer<dtype>::init(CkCallback endCb, int prio) {
591
592   useStagedCompletion_ = false;
593   useCompletionDetection_ = false;
594
595   yieldCount_ = 0;
596   userCallback_ = endCb;
597   prio_ = prio;
598
599   initLocalClients();
600
601   hasSentRecently_ = false;
602   enablePeriodicFlushing();
603 }
604
605 template <class dtype>
606 void MeshStreamer<dtype>::
607 init(int numLocalContributors, CkCallback startCb, CkCallback endCb, int prio,
608      bool usePeriodicFlushing) {
609
610   useStagedCompletion_ = true;
611   useCompletionDetection_ = false;
612   // allocate memory on first use
613   if (cntMsgSent_ == NULL) {
614     cntMsgSent_ = new int*[numDimensions_];
615     cntMsgReceived_ = new int[numDimensions_];
616     cntMsgExpected_ = new int[numDimensions_];
617     cntFinished_ = new int[numDimensions_];
618
619     for (int i = 0; i < numDimensions_; i++) {
620       cntMsgSent_[i] = new int[individualDimensionSizes_[i]];
621     }
622   }
623
624
625   for (int i = 0; i < numDimensions_; i++) {
626     std::fill(cntMsgSent_[i],
627               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
628     cntMsgReceived_[i] = 0;
629     cntMsgExpected_[i] = 0;
630     cntFinished_[i] = 0;
631   }
632   dimensionToFlush_ = numDimensions_ - 1;
633
634   yieldCount_ = 0;
635   userCallback_ = endCb;
636   prio_ = prio;
637
638   numLocalDone_ = 0;
639   numLocalContributors_ = numLocalContributors;
640   initLocalClients();
641
642   if (numLocalContributors_ == 0) {
643     startStagedCompletion();
644   }
645
646   hasSentRecently_ = false;
647   if (usePeriodicFlushing) {
648     enablePeriodicFlushing();
649   }
650
651   this->contribute(startCb);
652 }
653
654 template <class dtype>
655 void MeshStreamer<dtype>::
656 init(int numContributors, CkCallback startCb, CkCallback endCb,
657      CProxy_CompletionDetector detector,
658      int prio, bool usePeriodicFlushing) {
659
660   useStagedCompletion_ = false;
661   useCompletionDetection_ = true;
662   yieldCount_ = 0;
663   prio_ = prio;
664   userCallback_ = endCb;
665
666   // to facilitate completion, enable flushing after all contributors
667   //  have finished submitting items
668   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::enablePeriodicFlushing(),
669                      this->thisProxy);
670   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(),
671                     this->thisProxy);
672   detector_ = detector;
673   detectorLocalObj_ = detector_.ckLocalBranch();
674   initLocalClients();
675
676   detectorLocalObj_->start_detection(numContributors, startCb, flushCb,
677                                      finish , 0);
678
679   if (progressPeriodInMs_ <= 0) {
680     CkPrintf("Using completion detection in NDMeshStreamer requires"
681              " setting a valid periodic flush period. Defaulting"
682              " to 10 ms\n");
683     progressPeriodInMs_ = 10;
684   }
685
686   hasSentRecently_ = false;
687   if (usePeriodicFlushing) {
688     enablePeriodicFlushing();
689   }
690 }
691
692 template <class dtype>
693 void MeshStreamer<dtype>::
694 init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb, int prio,
695      bool usePeriodicFlushing) {
696
697   CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
698   int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
699   init(numLocalElements, startCb, endCb, prio, usePeriodicFlushing);
700 }
701
702
703 template <class dtype>
704 void MeshStreamer<dtype>::finish() {
705
706   isPeriodicFlushEnabled_ = false;
707
708   if (!userCallback_.isInvalid()) {
709     this->contribute(userCallback_);
710     userCallback_ = CkCallback();      // nullify the current callback
711   }
712 }
713
714 template <class dtype>
715 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
716
717   int destinationPe, lastDestinationPe;
718   MeshLocation destinationLocation;
719
720   lastDestinationPe = -1;
721   for (int i = 0; i < msg->numDataItems; i++) {
722     destinationPe = msg->destinationPes[i];
723     const dtype& dataItem = msg->getDataItem(i);
724     if (destinationPe == CkMyPe()) {
725       localDeliver(dataItem);
726     }
727     else if (destinationPe != TRAM_BROADCAST) {
728       if (destinationPe != lastDestinationPe) {
729         // do this once per sequence of items with the same destination
730         destinationLocation = determineLocation(destinationPe, msg->dimension);
731       }
732       storeMessage(destinationPe, destinationLocation, &dataItem);
733     }
734     else /* if (destinationPe == TRAM_BROADCAST) */ {
735       localBroadcast(dataItem);
736       broadcast(&dataItem, msg->dimension - 1, false);
737     }
738     lastDestinationPe = destinationPe;
739   }
740
741   if (useStagedCompletion_) {
742     markMessageReceived(msg->dimension, msg->finalMsgCount);
743   }
744
745   delete msg;
746 }
747
748 template <class dtype>
749 inline void MeshStreamer<dtype>::sendLargestBuffer() {
750
751   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
752   MeshStreamerMessage<dtype> ** messageBuffers;
753   MeshStreamerMessage<dtype> *destinationBuffer;
754
755   for (int i = 0; i < numDimensions_; i++) {
756
757     messageBuffers = dataBuffers_[i];
758     numBuffers = individualDimensionSizes_[i];
759
760     flushDimension = i;
761     maxSize = 0;
762     for (int j = 0; j < numBuffers; j++) {
763       if (messageBuffers[j] != NULL &&
764           messageBuffers[j]->numDataItems > maxSize) {
765         maxSize = messageBuffers[j]->numDataItems;
766         flushIndex = j;
767       }
768     }
769
770     if (maxSize > 0) {
771
772       messageBuffers = dataBuffers_[flushDimension];
773       destinationBuffer = messageBuffers[flushIndex];
774       destinationIndex = myIndex_ +
775         (flushIndex - myLocationIndex_[flushDimension]) *
776         combinedDimensionSizes_[flushDimension] ;
777
778       // not sending the full buffer, shrink the message size
779       envelope *env = UsrToEnv(destinationBuffer);
780       env->shrinkUsersize((bufferSize_ - destinationBuffer->numDataItems)
781                         * sizeof(dtype));
782       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
783       sendMeshStreamerMessage(destinationBuffer, flushDimension,
784                               destinationIndex);
785
786       if (useStagedCompletion_) {
787         cntMsgSent_[i][flushIndex]++;
788       }
789
790       messageBuffers[flushIndex] = NULL;
791     }
792   }
793 }
794
795 template <class dtype>
796 inline void MeshStreamer<dtype>::flushToIntermediateDestinations() {
797   for (int i = 0; i < numDimensions_; i++) {
798     flushDimension(i);
799   }
800 }
801
802 template <class dtype>
803 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
804
805 #ifdef STREAMER_VERBOSE_OUTPUT
806   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n",
807            CkMyPe(), dimension, sendMsgCounts);
808 #endif
809   MeshStreamerMessage<dtype> **messageBuffers;
810   MeshStreamerMessage<dtype> *destinationBuffer;
811   int destinationIndex, numBuffers;
812
813   messageBuffers = dataBuffers_[dimension];
814   numBuffers = individualDimensionSizes_[dimension];
815
816   for (int j = 0; j < numBuffers; j++) {
817
818     if(messageBuffers[j] == NULL) {
819       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
820         messageBuffers[j] =
821           new (0, 0, 8 * sizeof(int)) MeshStreamerMessage<dtype>(dimension);
822         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
823         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
824       }
825       else {
826         continue;
827       }
828     }
829
830     destinationBuffer = messageBuffers[j];
831     destinationIndex = myIndex_ +
832       (j - myLocationIndex_[dimension]) *
833       combinedDimensionSizes_[dimension] ;
834
835     if (destinationBuffer->numDataItems != 0) {
836       // not sending the full buffer, shrink the message size
837       envelope *env = UsrToEnv(destinationBuffer);
838       env->shrinkUsersize((bufferSize_ - destinationBuffer->numDataItems)
839                           * sizeof(dtype));
840     }
841     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
842
843     if (useStagedCompletion_) {
844       cntMsgSent_[dimension][j]++;
845       if (sendMsgCounts) {
846         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
847       }
848     }
849
850     sendMeshStreamerMessage(destinationBuffer, dimension,
851                             destinationIndex);
852     messageBuffers[j] = NULL;
853   }
854 }
855
856 template <class dtype>
857 void MeshStreamer<dtype>::flushIfIdle(){
858
859   // flush if (1) this is not a periodic call or
860   //          (2) this is a periodic call and no sending took place
861   //              since the last time the function was invoked
862   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
863
864     if (numDataItemsBuffered_ != 0) {
865       flushToIntermediateDestinations();
866     }
867     CkAssert(numDataItemsBuffered_ == 0);
868
869   }
870
871   hasSentRecently_ = false;
872 }
873
874 template <class dtype>
875 void periodicProgressFunction(void *MeshStreamerObj, double time) {
876
877   MeshStreamer<dtype> *properObj =
878     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj);
879
880   if (properObj->isPeriodicFlushEnabled()) {
881     properObj->flushIfIdle();
882     properObj->registerPeriodicProgressFunction();
883   }
884 }
885
886 template <class dtype>
887 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
888   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this,
889                  progressPeriodInMs_);
890 }
891
892
893 template <class dtype, class ClientType>
894 class GroupMeshStreamer : public CBase_GroupMeshStreamer<dtype, ClientType> {
895 private:
896
897   CkGroupID clientGID_;
898   ClientType *clientObj_;
899
900   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
901     for (int i = 0; i < msg->numDataItems; i++) {
902       const dtype& data = msg->getDataItem(i);
903       clientObj_->process(data);
904     }
905
906     if (MeshStreamer<dtype>::useStagedCompletion_) {
907 #ifdef STREAMER_VERBOSE_OUTPUT
908       envelope *env = UsrToEnv(msg);
909       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n",
910                CkMyPe(), env->getSrcPe(), msg->numDataItems,
911                msg->finalMsgCount);
912 #endif
913       this->markMessageReceived(msg->dimension, msg->finalMsgCount);
914     }
915     else if (MeshStreamer<dtype>::useCompletionDetection_){
916       this->detectorLocalObj_->consume(msg->numDataItems);
917     }
918     QdProcess(msg->numDataItems);
919     delete msg;
920   }
921
922   inline void localDeliver(const dtype& dataItem) {
923     clientObj_->process(dataItem);
924     if (MeshStreamer<dtype>::useCompletionDetection_) {
925       MeshStreamer<dtype>::detectorLocalObj_->consume();
926     }
927     QdProcess(1);
928   }
929
930   inline void localBroadcast(const dtype& dataItem) {
931     localDeliver(dataItem);
932   }
933
934   inline void initLocalClients() {
935     // no action required
936   }
937
938 public:
939
940   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
941                     int *dimensionSizes,
942                     CkGroupID clientGID,
943                     bool yieldFlag = 0, double progressPeriodInMs = -1.0) {
944     this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
945                0, yieldFlag, progressPeriodInMs);
946     clientGID_ = clientGID;
947     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
948
949   }
950
951   GroupMeshStreamer(int numDimensions, int *dimensionSizes,
952                     CkGroupID clientGID,
953                     int bufferSize, bool yieldFlag = 0,
954                     double progressPeriodInMs = -1.0) {
955     this->ctorHelper(0, numDimensions, dimensionSizes, bufferSize,
956                yieldFlag, progressPeriodInMs);
957     clientGID_ = clientGID;
958     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
959
960   }
961 };
962
963 template <class dtype, class ClientType>
964 class LocalBroadcaster : public CkLocIterator {
965
966 public:
967   CkArray *clientArrMgr_;
968   const dtype *dataItem_;
969
970   LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem)
971    : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
972
973   void addLocation(CkLocation& loc) {
974     ClientType *clientObj =
975       (ClientType *) clientArrMgr_->lookup(loc.getIndex());
976     CkAssert(clientObj != NULL);
977     clientObj->process(*dataItem_);
978   }
979
980 };
981
982 template <class dtype, class itype, class ClientType>
983 class ArrayMeshStreamer : public CBase_ArrayMeshStreamer<dtype, itype,
984                                                          ClientType> {
985
986 private:
987
988   CkArrayID clientAID_;
989   CkArray *clientArrayMgr_;
990   CkLocMgr *clientLocMgr_;
991   int numArrayElements_;
992   int numLocalArrayElements_;
993   std::map<itype, std::vector<ArrayDataItem<dtype, itype> > > misdeliveredItems;
994 #ifdef CACHE_ARRAY_METADATA
995   ClientType **clientObjs_;
996   int *destinationPes_;
997   bool *isCachedArrayMetadata_;
998 #endif
999
1000   inline
1001   void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
1002
1003     itype arrayId = packedDataItem.arrayIndex;
1004     if (arrayId == itype(TRAM_BROADCAST)) {
1005       localBroadcast(packedDataItem);
1006       return;
1007     }
1008     ClientType *clientObj;
1009 #ifdef CACHE_ARRAY_METADATA
1010     clientObj = clientObjs_[arrayId];
1011 #else
1012     clientObj = (ClientType *) clientArrayMgr_->lookup(arrayId);
1013 #endif
1014
1015     if (clientObj != NULL) {
1016       clientObj->process(packedDataItem.dataItem);
1017       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1018            ::useCompletionDetection_) {
1019         MeshStreamer<ArrayDataItem<dtype, itype> >
1020          ::detectorLocalObj_->consume();
1021       }
1022       QdProcess(1);
1023     }
1024     else {
1025       // array element arrayId is no longer present locally:
1026       //  buffer the data item and request updated PE index
1027       //  to be sent to the source and this PE
1028       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1029           ::useStagedCompletion_) {
1030         CkAbort("Using staged completion when array locations"
1031                 " are not guaranteed to be correct is currently"
1032                 " not supported.");
1033       }
1034       misdeliveredItems[arrayId].push_back(packedDataItem);
1035       if (misdeliveredItems[arrayId].size() == 1) {
1036         int homePe = clientLocMgr_->homePe(arrayId);
1037         this->thisProxy[homePe].
1038           processLocationRequest(arrayId, CkMyPe(), packedDataItem.sourcePe);
1039       }
1040     }
1041   }
1042
1043   inline
1044   void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
1045
1046     LocalBroadcaster<dtype, ClientType>
1047       clientIterator(clientArrayMgr_, &packedDataItem.dataItem);
1048     clientLocMgr_->iterate(clientIterator);
1049
1050     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1051          ::useCompletionDetection_) {
1052         MeshStreamer<ArrayDataItem<dtype, itype> >
1053          ::detectorLocalObj_->consume();
1054     }
1055     QdProcess(1);
1056   }
1057
1058   inline void initLocalClients() {
1059
1060     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1061          ::useCompletionDetection_) {
1062 #ifdef CACHE_ARRAY_METADATA
1063       std::fill(isCachedArrayMetadata_,
1064                 isCachedArrayMetadata_ + numArrayElements_, false);
1065
1066       for (int i = 0; i < numArrayElements_; i++) {
1067         clientObjs_[i] =
1068           (ClientType*) ( clientArrayMgr_->lookup(CkArrayIndex1D(i)) );
1069       }
1070 #endif
1071     }
1072   }
1073
1074   inline void commonInit() {
1075 #ifdef CACHE_ARRAY_METADATA
1076     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1077     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1078     destinationPes_ = new int[numArrayElements_];
1079     isCachedArrayMetadata_ = new bool[numArrayElements_];
1080     std::fill(isCachedArrayMetadata_,
1081               isCachedArrayMetadata_ + numArrayElements_, false);
1082 #endif
1083   }
1084
1085 public:
1086
1087   struct DataItemHandle {
1088     itype arrayIndex;
1089     const dtype *dataItem;
1090   };
1091
1092   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1093                     int *dimensionSizes, CkArrayID clientAID,
1094                     bool yieldFlag = 0, double progressPeriodInMs = -1.0) {
1095
1096     this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes, 0,
1097                      yieldFlag, progressPeriodInMs);
1098     clientAID_ = clientAID;
1099     clientArrayMgr_ = clientAID_.ckLocalBranch();
1100     clientLocMgr_ = clientArrayMgr_->getLocMgr();
1101     commonInit();
1102   }
1103
1104   ArrayMeshStreamer(int numDimensions, int *dimensionSizes,
1105                     CkArrayID clientAID, int bufferSize, bool yieldFlag = 0,
1106                     double progressPeriodInMs = -1.0) {
1107
1108     this->ctorHelper(0, numDimensions, dimensionSizes, bufferSize, yieldFlag,
1109                      progressPeriodInMs);
1110     clientAID_ = clientAID;
1111     clientArrayMgr_ = clientAID_.ckLocalBranch();
1112     clientLocMgr_ = clientArrayMgr_->getLocMgr();
1113     commonInit();
1114   }
1115
1116   ~ArrayMeshStreamer() {
1117 #ifdef CACHE_ARRAY_METADATA
1118     delete [] clientObjs_;
1119     delete [] destinationPes_;
1120     delete [] isCachedArrayMetadata_;
1121 #endif
1122   }
1123
1124   void receiveAtDestination(
1125        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1126
1127     for (int i = 0; i < msg->numDataItems; i++) {
1128       const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
1129       localDeliver(packedData);
1130     }
1131     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1132       this->markMessageReceived(msg->dimension, msg->finalMsgCount);
1133     }
1134
1135     delete msg;
1136   }
1137
1138   inline void broadcast(const dtype& dataItem) {
1139     const static bool copyIndirectly = true;
1140
1141     // no data items should be submitted after all local contributors call done
1142     // and staged completion has begun
1143     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1144                ::stagedCompletionStarted()) == false);
1145
1146     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1147          ::useCompletionDetection_) {
1148       MeshStreamer<ArrayDataItem<dtype, itype> >
1149         ::detectorLocalObj_->produce(CkNumPes());
1150     }
1151     QdCreate(CkNumPes());
1152
1153     // deliver locally
1154     ArrayDataItem<dtype, itype>& packedDataItem(TRAM_BROADCAST, CkMyPe(),
1155                                                 dataItem);
1156     localBroadcast(packedDataItem);
1157
1158     DataItemHandle tempHandle;
1159     tempHandle.dataItem = &dataItem;
1160     tempHandle.arrayIndex = TRAM_BROADCAST;
1161
1162     int numDimensions =
1163       MeshStreamer<ArrayDataItem<dtype, itype> >::numDimensions_;
1164     MeshStreamer<ArrayDataItem<dtype, itype> >::
1165       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
1166   }
1167
1168   inline void insertData(const dtype& dataItem, itype arrayIndex) {
1169
1170     // no data items should be submitted after all local contributors call done
1171     // and staged completion has begun
1172     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1173                ::stagedCompletionStarted()) == false);
1174
1175     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1176          ::useCompletionDetection_) {
1177       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1178     }
1179     QdCreate(1);
1180     int destinationPe;
1181 #ifdef CACHE_ARRAY_METADATA
1182     if (isCachedArrayMetadata_[arrayIndex]) {
1183       destinationPe =  destinationPes_[arrayIndex];
1184     }
1185     else {
1186       destinationPe =
1187         clientArrayMgr_->lastKnown(arrayIndex);
1188       isCachedArrayMetadata_[arrayIndex] = true;
1189       destinationPes_[arrayIndex] = destinationPe;
1190     }
1191 #else
1192     destinationPe =
1193       clientArrayMgr_->lastKnown(arrayIndex);
1194 #endif
1195
1196     if (destinationPe == CkMyPe()) {
1197       ArrayDataItem<dtype, itype> packedDataItem(arrayIndex, CkMyPe(), dataItem);
1198       localDeliver(packedDataItem);
1199       return;
1200     }
1201
1202     // this implementation avoids copying an item before transfer into message
1203     DataItemHandle tempHandle;
1204     tempHandle.arrayIndex = arrayIndex;
1205     tempHandle.dataItem = &dataItem;
1206
1207     MeshStreamer<ArrayDataItem<dtype, itype> >::
1208       insertData(&tempHandle, destinationPe);
1209
1210   }
1211
1212   inline int copyDataItemIntoMessage(
1213
1214       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer,
1215       const void *dataItemHandle, bool copyIndirectly) {
1216
1217     if (copyIndirectly == true) {
1218       // newly inserted items are passed through a handle to avoid copying
1219       int numDataItems = destinationBuffer->numDataItems;
1220       const DataItemHandle *tempHandle =
1221         (const DataItemHandle *) dataItemHandle;
1222       (destinationBuffer->dataItems)[numDataItems].arrayIndex =
1223         tempHandle->arrayIndex;
1224       (destinationBuffer->dataItems)[numDataItems].sourcePe = CkMyPe();
1225       (destinationBuffer->dataItems)[numDataItems].dataItem =
1226         *(tempHandle->dataItem);
1227       return ++destinationBuffer->numDataItems;
1228     }
1229     else {
1230       // this is an item received along the route to destination
1231       // we can copy it from the received message
1232       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1233               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1234     }
1235   }
1236
1237   // always called on homePE for array element arrayId
1238   void processLocationRequest(itype arrayId, int deliveredToPe, int sourcePe) {
1239     int ownerPe = clientArrayMgr_->lastKnown(arrayId);
1240     this->thisProxy[deliveredToPe].resendMisdeliveredItems(arrayId, ownerPe);
1241     this->thisProxy[sourcePe].updateLocationAtSource(arrayId, sourcePe);
1242   }
1243
1244   void resendMisdeliveredItems(itype arrayId, int destinationPe) {
1245
1246     clientLocMgr_->updateLocation(arrayId, destinationPe);
1247
1248     std::vector<ArrayDataItem<dtype, itype> > &bufferedItems
1249       = misdeliveredItems[arrayId];
1250
1251     MeshLocation destinationLocation =
1252       determineLocation(destinationPe, MeshStreamer
1253                         <ArrayDataItem<dtype, itype> >::numDimensions_);
1254     for (int i = 0; i < bufferedItems.size(); i++) {
1255       storeMessage(destinationPe, destinationLocation, &bufferedItems[i]);
1256     }
1257
1258     bufferedItems.clear();
1259   }
1260
1261   void updateLocationAtSource(itype arrayId, int destinationPe) {
1262
1263     int prevOwner = clientArrayMgr_->lastKnown(arrayId);
1264
1265     if (prevOwner != destinationPe) {
1266       clientLocMgr_->updateLocation(arrayId, destinationPe);
1267
1268 //    // could also try to correct destinations of items buffered for arrayId,
1269 //    // but it would take significant additional computation, so leaving it out;
1270 //    // the items will get forwarded after being delivered to the previous owner
1271 //    MeshLocation oldLocation = determineLocation(prevOwner, numDimensions_);
1272
1273 //    MeshStreamerMessage<dtype> *messageBuffer = dataBuffers_
1274 //     [oldLocation.dimension][oldLocation.bufferIndex];
1275
1276 //    if (messageBuffer != NULL) {
1277 //      // TODO: find items for arrayId, move them to buffer for destinationPe
1278 //      // do not leave holes in messageBuffer
1279 //    }
1280     }
1281   }
1282
1283 };
1284
1285 struct ChunkReceiveBuffer {
1286   int bufferNumber;
1287   int receivedChunks;
1288   char *buffer;
1289 };
1290
1291 struct ChunkOutOfOrderBuffer {
1292   int bufferNumber;
1293   int receivedChunks;
1294   int sourcePe;
1295   char *buffer;
1296
1297   ChunkOutOfOrderBuffer(int b, int r, int s, char *buf)
1298     : bufferNumber(b), receivedChunks(r), sourcePe(s), buffer(buf) {}
1299
1300   bool operator==(const ChunkDataItem &chunk) {
1301     return ( (chunk.bufferNumber == bufferNumber) &&
1302              (chunk.sourcePe == sourcePe) );
1303   }
1304
1305 };
1306
1307 template <class dtype, class ClientType >
1308 class GroupChunkMeshStreamer
1309   : public CBase_GroupChunkMeshStreamer<dtype, ClientType> {
1310
1311 private:
1312   // implementation assumes very few buffers will be received out of order
1313   // if this is not the case a different data structure may be preferable
1314   std::list<ChunkOutOfOrderBuffer> outOfOrderBuffers_;
1315   ChunkReceiveBuffer *lastReceived_;
1316   int *currentBufferNumbers_;
1317
1318   CkGroupID clientGID_;
1319   ClientType *clientObj_;
1320
1321   bool userHandlesFreeing_;
1322 public:
1323
1324   GroupChunkMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1325                          int *dimensionSizes, CkGroupID clientGID,
1326                          bool yieldFlag = 0, double progressPeriodInMs = -1.0,
1327                          bool userHandlesFreeing = false) {
1328
1329     this->ctorHelper(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
1330                      0, yieldFlag, progressPeriodInMs);
1331     clientGID_ = clientGID;
1332     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
1333     userHandlesFreeing_ = userHandlesFreeing;
1334     commonInit();
1335   }
1336
1337   GroupChunkMeshStreamer(int numDimensions, int *dimensionSizes,
1338                          CkGroupID clientGID, int bufferSize,
1339                          bool yieldFlag = 0, double progressPeriodInMs = -1.0,
1340                          bool userHandlesFreeing = false) {
1341
1342     this->ctorHelper(0, numDimensions, dimensionSizes,  bufferSize, yieldFlag,
1343                progressPeriodInMs);
1344     clientGID_ = clientGID;
1345     clientObj_ = (ClientType *) CkLocalBranch(clientGID_);
1346     userHandlesFreeing_ = userHandlesFreeing;
1347     commonInit();
1348   }
1349
1350   inline void commonInit() {
1351     lastReceived_ = new ChunkReceiveBuffer[CkNumPes()];
1352     currentBufferNumbers_ = new int[CkNumPes()];
1353     memset(lastReceived_, 0, CkNumPes() * sizeof(ChunkReceiveBuffer));
1354     memset(currentBufferNumbers_, 0, CkNumPes() * sizeof(int));
1355   }
1356
1357   inline void insertData(dtype *dataArray, int numElements, int destinationPe,
1358                          void *extraData = NULL, int extraDataSize = 0) {
1359
1360     char *inputData = (char *) dataArray;
1361     int arraySizeInBytes = numElements * sizeof(dtype);
1362     int totalSizeInBytes = arraySizeInBytes + extraDataSize;
1363     ChunkDataItem chunk;
1364     int offset;
1365     int chunkNumber = 0;
1366     chunk.bufferNumber = currentBufferNumbers_[destinationPe]++;
1367     chunk.sourcePe = CkMyPe();
1368     chunk.chunkNumber = 0;
1369     chunk.chunkSize = CHUNK_SIZE;
1370     chunk.numChunks =  (int) ceil ( (float) totalSizeInBytes / CHUNK_SIZE);
1371     chunk.numItems = numElements;
1372
1373     // loop over full chunks - handle leftovers and extra data later
1374     for (offset = 0; offset < arraySizeInBytes - CHUNK_SIZE;
1375          offset += CHUNK_SIZE) {
1376         memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
1377         MeshStreamer<ChunkDataItem>::insertData(chunk, destinationPe);
1378         chunk.chunkNumber++;
1379     }
1380
1381     // final (possibly incomplete) array chunk
1382     chunk.chunkSize = arraySizeInBytes - offset;
1383     memset(chunk.rawData, 0, CHUNK_SIZE);
1384     memcpy(chunk.rawData, inputData + offset, chunk.chunkSize);
1385
1386     // extra data (place in last chunk if possible)
1387     int remainingToSend = extraDataSize;
1388     int tempOffset = chunk.chunkSize;
1389     int extraOffset = 0;
1390     do {
1391       chunk.chunkSize = std::min(tempOffset + remainingToSend, CHUNK_SIZE);
1392       memcpy(chunk.rawData + tempOffset, (char *) extraData + extraOffset,
1393              chunk.chunkSize - tempOffset);
1394
1395       MeshStreamer<ChunkDataItem>::insertData(chunk, destinationPe);
1396       chunk.chunkNumber++;
1397       offset += CHUNK_SIZE;
1398       extraOffset += (chunk.chunkSize - tempOffset);
1399       remainingToSend -= (chunk.chunkSize - tempOffset);
1400       tempOffset = 0;
1401     } while (offset < totalSizeInBytes);
1402
1403   }
1404
1405   inline void processChunk(const ChunkDataItem& chunk) {
1406
1407     ChunkReceiveBuffer &last = lastReceived_[chunk.sourcePe];
1408
1409     if (last.buffer == NULL) {
1410       if (outOfOrderBuffers_.size() == 0) {
1411         // make common case fast
1412         last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
1413         last.receivedChunks = 0;
1414       }
1415       else {
1416         // check if chunks for this buffer have been received previously
1417         std::list<ChunkOutOfOrderBuffer>::iterator storedBuffer =
1418           find(outOfOrderBuffers_.begin(), outOfOrderBuffers_.end(), chunk);
1419         if (storedBuffer != outOfOrderBuffers_.end()) {
1420           last.buffer = storedBuffer->buffer;
1421           last.receivedChunks = storedBuffer->receivedChunks;
1422           outOfOrderBuffers_.erase(storedBuffer);
1423         }
1424         else {
1425           last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
1426           last.receivedChunks = 0;
1427         }
1428       }
1429       last.bufferNumber = chunk.bufferNumber;
1430     }
1431     else if (last.bufferNumber != chunk.bufferNumber) {
1432       // add last to list of out of order buffers
1433       ChunkOutOfOrderBuffer lastOutOfOrderBuffer(last.bufferNumber,
1434                                                  last.receivedChunks,
1435                                                  chunk.sourcePe, last.buffer);
1436       outOfOrderBuffers_.push_front(lastOutOfOrderBuffer);
1437
1438       //search through list of out of order buffers for this chunk's buffer
1439       std::list<ChunkOutOfOrderBuffer >::iterator storedBuffer =
1440         find(outOfOrderBuffers_.begin(), outOfOrderBuffers_.end(), chunk);
1441
1442       if (storedBuffer == outOfOrderBuffers_.end() ) {
1443         // allocate new buffer
1444         last.bufferNumber = chunk.bufferNumber;
1445         last.receivedChunks = 0;
1446         last.buffer = new char[chunk.numChunks * CHUNK_SIZE];
1447       }
1448       else {
1449         // use existing buffer
1450         last.bufferNumber = storedBuffer->bufferNumber;
1451         last.receivedChunks = storedBuffer->receivedChunks;
1452         last.buffer = storedBuffer->buffer;
1453         outOfOrderBuffers_.erase(storedBuffer);
1454       }
1455     }
1456
1457     char *receiveBuffer = last.buffer;
1458
1459     memcpy(receiveBuffer + chunk.chunkNumber * CHUNK_SIZE,
1460            chunk.rawData, chunk.chunkSize);
1461     if (++last.receivedChunks == chunk.numChunks) {
1462       clientObj_->receiveArray(
1463                   (dtype *) receiveBuffer, chunk.numItems, chunk.sourcePe);
1464       last.receivedChunks = 0;
1465       if (!userHandlesFreeing_) {
1466         delete [] last.buffer;
1467       }
1468       last.buffer = NULL;
1469     }
1470
1471   }
1472
1473   inline void localDeliver(const ChunkDataItem& chunk) {
1474     processChunk(chunk);
1475     if (this->useCompletionDetection_) {
1476       this->detectorLocalObj_->consume();
1477     }
1478     QdProcess(1);
1479   }
1480
1481   void receiveAtDestination(
1482        MeshStreamerMessage<ChunkDataItem> *msg) {
1483
1484     for (int i = 0; i < msg->numDataItems; i++) {
1485       const ChunkDataItem& chunk = msg->getDataItem(i);
1486       processChunk(chunk);
1487     }
1488
1489     if (this->useStagedCompletion_) {
1490 #ifdef STREAMER_VERBOSE_OUTPUT
1491       envelope *env = UsrToEnv(msg);
1492       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n",
1493                CkMyPe(), env->getSrcPe(), msg->numDataItems,
1494                msg->finalMsgCount);
1495 #endif
1496       this->markMessageReceived(msg->dimension, msg->finalMsgCount);
1497     }
1498     else if (this->useCompletionDetection_){
1499       this->detectorLocalObj_->consume(msg->numDataItems);
1500     }
1501     QdProcess(msg->numDataItems);
1502     delete msg;
1503
1504   }
1505
1506   inline void localBroadcast(const ChunkDataItem& dataItem) {
1507     localDeliver(dataItem);
1508   }
1509
1510   inline void initLocalClients() {
1511     // no action required
1512   }
1513
1514 };
1515
1516
1517
1518 #define CK_TEMPLATES_ONLY
1519 #include "NDMeshStreamer.def.h"
1520 #undef CK_TEMPLATES_ONLY
1521
1522 #endif