57fb05b3989efd065ee9d64a93332a476b065a8c
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define CACHE_LOCATIONS
17 // #define SUPPORT_INCOMPLETE_MESH
18 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
19 // #define STREAMER_VERBOSE_OUTPUT
20
21 #define TRAM_BROADCAST (-100)
22
23 struct MeshLocation {
24   int dimension; 
25   int bufferIndex; 
26 }; 
27
28 template<class dtype>
29 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
30
31 public:
32
33   int finalMsgCount; 
34   int dimension; 
35   int numDataItems;
36   int *destinationPes;
37   dtype *dataItems;
38
39   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
40     finalMsgCount = -1; 
41   }
42
43   int addDataItem(const dtype& dataItem) {
44     dataItems[numDataItems] = dataItem;
45     return ++numDataItems; 
46   }
47
48   void markDestination(const int index, const int destinationPe) {
49     destinationPes[index] = destinationPe;
50   }
51
52   const dtype& getDataItem(const int index) {
53     return dataItems[index];
54   }
55
56 };
57
58 template <class dtype>
59 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
60 private:
61   CompletionDetector *detectorLocalObj_;
62 public:
63   MeshStreamerArrayClient(){
64     detectorLocalObj_ = NULL; 
65   }
66   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
67   // would like to make it pure virtual but charm will try to
68   // instantiate the abstract class, leading to errors
69   virtual void process(const dtype& data) {
70     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
71             "This virtual function should have been defined by the user.\n");
72   };     
73   void setDetector(CompletionDetector *detectorLocalObj) {
74     detectorLocalObj_ = detectorLocalObj;
75   }
76   void receiveRedeliveredItem(dtype data) {
77 #ifdef STREAMER_VERBOSE_OUTPUT
78     CkPrintf("[%d] redelivered to index %d\n", 
79              CkMyPe(), this->thisIndex.data[0]);
80 #endif
81     if (detectorLocalObj_ != NULL) {
82       detectorLocalObj_->consume();
83     }
84     process(data);
85   }
86
87   void pup(PUP::er& p) {
88     CBase_MeshStreamerArrayClient<dtype>::pup(p);
89    }  
90
91 };
92
93 template <class dtype>
94 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
95
96 public:
97   virtual void process(const dtype& data) = 0;
98   virtual void receiveArray(dtype *data, int numItems) {
99     for (int i = 0; i < numItems; i++) {
100       process(data[i]);
101     }
102   }
103 };
104
105 template <class dtype>
106 class MeshStreamer : public CBase_MeshStreamer<dtype> {
107
108 private:
109   int bufferSize_; 
110   int maxNumDataItemsBuffered_;
111   int numDataItemsBuffered_;
112
113   int numMembers_; 
114   int *individualDimensionSizes_;
115   int *combinedDimensionSizes_;
116
117   int *startingIndexAtDimension_; 
118
119   int myIndex_;
120   int *myLocationIndex_;
121
122   CkCallback   userCallback_;
123   bool yieldFlag_;
124
125   double progressPeriodInMs_; 
126   bool isPeriodicFlushEnabled_; 
127   bool hasSentRecently_;
128   MeshStreamerMessage<dtype> ***dataBuffers_;
129
130   CProxy_CompletionDetector detector_;
131   int prio_;
132   int yieldCount_;
133
134 #ifdef CACHE_LOCATIONS
135   MeshLocation *cachedLocations_;
136   bool *isCached_; 
137 #endif
138
139
140   // only used for staged completion
141   int **cntMsgSent_;
142   int *cntMsgReceived_;
143   int *cntMsgExpected_;
144   int *cntFinished_; 
145   int dimensionToFlush_;
146   int numLocalDone_; 
147   int numLocalContributors_;
148
149   void storeMessage(int destinationPe, 
150                     const MeshLocation& destinationCoordinates, 
151                     const void *dataItem, bool copyIndirectly = false);
152   virtual void localDeliver(const dtype& dataItem) = 0; 
153   virtual void localBroadcast(const dtype& dataItem) = 0; 
154   virtual int numElementsInClient() = 0;
155   virtual int numLocalElementsInClient() = 0; 
156
157   virtual void initLocalClients() = 0;
158
159   void sendLargestBuffer();
160   void flushToIntermediateDestinations();
161   void flushDimension(int dimension, bool sendMsgCounts = false); 
162   MeshLocation determineLocation(int destinationPe, 
163                                  int dimensionReceivedAlong);
164
165 protected:
166
167   int numDimensions_;
168   bool useStagedCompletion_;
169   CompletionDetector *detectorLocalObj_;
170   virtual int copyDataItemIntoMessage(
171               MeshStreamerMessage<dtype> *destinationBuffer, 
172               const void *dataItemHandle, bool copyIndirectly = false);
173   void insertData(const void *dataItemHandle, int destinationPe);
174   void broadcast(const void *dataItemHandle, int dimension, 
175                  bool copyIndirectly);
176
177 public:
178
179   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
180                int *dimensionSizes, int bufferSize,
181                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
182   ~MeshStreamer();
183
184   // entry
185   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
186             int prio, bool usePeriodicFlushing);
187   void init(int numContributors, CkCallback startCb, CkCallback endCb, 
188             CProxy_CompletionDetector detector,
189             int prio, bool usePeriodicFlushing);
190   void init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb, 
191             int prio, bool usePeriodicFlushing);
192
193   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
194   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
195   void flushIfIdle();
196   void finish();
197
198   // non entry
199   bool isPeriodicFlushEnabled() {
200     return isPeriodicFlushEnabled_;
201   }
202   virtual void insertData(const dtype& dataItem, int destinationPe); 
203   virtual void broadcast(const dtype& dataItem); 
204   void registerPeriodicProgressFunction();
205
206   // flushing begins only after enablePeriodicFlushing has been invoked
207
208   void enablePeriodicFlushing(){
209     isPeriodicFlushEnabled_ = true; 
210     registerPeriodicProgressFunction();
211   }
212
213   void done(int numContributorsFinished = 1) {
214
215     if (useStagedCompletion_) {
216       numLocalDone_ += numContributorsFinished; 
217       if (numLocalDone_ == numLocalContributors_) {
218         startStagedCompletion();
219       }
220     }
221     else {
222       detectorLocalObj_->done(numContributorsFinished);
223     }
224   }
225   
226   bool stagedCompletionStarted() {    
227     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
228   }
229
230   void startStagedCompletion() {          
231     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
232       flushDimension(dimensionToFlush_, true);
233     }
234     dimensionToFlush_--;
235
236     checkForCompletedStages();
237   }
238
239   void markMessageReceived(int dimension, int finalCount) {
240     cntMsgReceived_[dimension]++;
241     if (finalCount != -1) {
242       cntFinished_[dimension]++; 
243       cntMsgExpected_[dimension] += finalCount; 
244 #ifdef STREAMER_VERBOSE_OUTPUT
245       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
246                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
247                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
248                cntMsgReceived_[dimension]); 
249 #endif
250     }  
251     if (dimensionToFlush_ != numDimensions_ - 1) {
252       checkForCompletedStages();
253     }
254   }
255
256   void checkForCompletedStages() {
257
258     while (cntFinished_[dimensionToFlush_ + 1] == 
259            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
260            cntMsgExpected_[dimensionToFlush_ + 1] == 
261            cntMsgReceived_[dimensionToFlush_ + 1]) {
262       if (dimensionToFlush_ == -1) {
263 #ifdef STREAMER_VERBOSE_OUTPUT
264         CkPrintf("[%d] contribute\n", CkMyPe()); 
265 #endif
266         CkAssert(numDataItemsBuffered_ == 0); 
267         isPeriodicFlushEnabled_ = false; 
268         if (!userCallback_.isInvalid()) {
269           this->contribute(userCallback_);
270           userCallback_ = CkCallback();
271         }
272         return; 
273       }
274       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
275         flushDimension(dimensionToFlush_, true); 
276       }
277       dimensionToFlush_--; 
278     }
279   }
280
281 };
282
283 template <class dtype>
284 MeshStreamer<dtype>::MeshStreamer(
285                      int maxNumDataItemsBuffered, int numDimensions, 
286                      int *dimensionSizes, 
287                      int bufferSize,
288                      bool yieldFlag, 
289                      double progressPeriodInMs)
290  :numDimensions_(numDimensions), 
291   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
292   yieldFlag_(yieldFlag), 
293   progressPeriodInMs_(progressPeriodInMs), 
294   bufferSize_(bufferSize)
295 {
296
297   int sumAlongAllDimensions = 0;   
298   individualDimensionSizes_ = new int[numDimensions_];
299   combinedDimensionSizes_ = new int[numDimensions_ + 1];
300   myLocationIndex_ = new int[numDimensions_];
301   startingIndexAtDimension_ = new int[numDimensions_ + 1]; 
302   memcpy(individualDimensionSizes_, dimensionSizes, 
303          numDimensions * sizeof(int)); 
304   combinedDimensionSizes_[0] = 1; 
305   for (int i = 0; i < numDimensions; i++) {
306     sumAlongAllDimensions += individualDimensionSizes_[i];
307     combinedDimensionSizes_[i + 1] = 
308       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
309   }
310
311   CkAssert(combinedDimensionSizes_[numDimensions] == CkNumPes()); 
312
313   // a bufferSize input of 0 indicates it should be calculated by the library
314   if (bufferSize_ == 0) {
315     CkAssert(maxNumDataItemsBuffered_ > 0);
316     // except for personalized messages, the buffers for dimensions with the 
317     //   same index as the sender's are not used
318     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
319       / (sumAlongAllDimensions - numDimensions_ + 1); 
320   }
321   else {
322     maxNumDataItemsBuffered_ = 
323       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1);
324   }
325
326   if (bufferSize_ <= 0) {
327     bufferSize_ = 1; 
328     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
329              "is invalid. Defaulting to a single buffer per destination.\n");
330   }
331   numDataItemsBuffered_ = 0; 
332   numMembers_ = CkNumPes(); 
333
334   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
335   for (int i = 0; i < numDimensions; i++) {
336     int numMembersAlongDimension = individualDimensionSizes_[i]; 
337     dataBuffers_[i] = 
338       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
339     for (int j = 0; j < numMembersAlongDimension; j++) {
340       dataBuffers_[i][j] = NULL;
341     }
342   }
343
344   myIndex_ = CkMyPe();
345   int remainder = myIndex_;
346   startingIndexAtDimension_[numDimensions_] = 0;
347   for (int i = numDimensions_ - 1; i >= 0; i--) {    
348     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
349     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
350     remainder -= dimensionOffset; 
351     startingIndexAtDimension_[i] = 
352       startingIndexAtDimension_[i+1] + dimensionOffset; 
353   }
354
355   isPeriodicFlushEnabled_ = false; 
356   detectorLocalObj_ = NULL;
357
358 #ifdef CACHE_LOCATIONS
359   cachedLocations_ = new MeshLocation[numMembers_];
360   isCached_ = new bool[numMembers_];
361   std::fill(isCached_, isCached_ + numMembers_, false);
362 #endif
363
364   cntMsgSent_ = NULL; 
365   cntMsgReceived_ = NULL; 
366   cntMsgExpected_ = NULL; 
367   cntFinished_ = NULL; 
368
369 }
370
371 template <class dtype>
372 MeshStreamer<dtype>::~MeshStreamer() {
373
374   for (int i = 0; i < numDimensions_; i++) {
375     for (int j=0; j < individualDimensionSizes_[i]; j++) {
376       delete[] dataBuffers_[i][j]; 
377     }
378     delete[] dataBuffers_[i]; 
379   }
380
381   delete[] individualDimensionSizes_;
382   delete[] combinedDimensionSizes_; 
383   delete[] myLocationIndex_;
384   delete[] startingIndexAtDimension_;
385
386 #ifdef CACHE_LOCATIONS
387   delete[] cachedLocations_;
388   delete[] isCached_; 
389 #endif
390
391   if (cntMsgSent_ != NULL) {
392     for (int i = 0; i < numDimensions_; i++) {
393       delete[] cntMsgSent_[i]; 
394     }
395     delete[] cntMsgSent_; 
396     delete[] cntMsgReceived_; 
397     delete[] cntMsgExpected_; 
398     delete[] cntFinished_;
399   }
400
401 }
402
403 template <class dtype>
404 inline
405 MeshLocation MeshStreamer<dtype>::determineLocation(
406                                   int destinationPe, 
407                                   int dimensionReceivedAlong) {
408
409 #ifdef CACHE_LOCATIONS
410   if (isCached_[destinationPe]) {    
411     return cachedLocations_[destinationPe]; 
412   }
413 #endif
414
415   MeshLocation destinationLocation;
416   int remainder = 
417     destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
418   int dimensionIndex; 
419   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {        
420     dimensionIndex = remainder / combinedDimensionSizes_[i];
421     
422     if (dimensionIndex != myLocationIndex_[i]) {
423       destinationLocation.dimension = i; 
424       destinationLocation.bufferIndex = dimensionIndex; 
425 #ifdef CACHE_LOCATIONS
426       cachedLocations_[destinationPe] = destinationLocation;
427       isCached_[destinationPe] = true; 
428 #endif
429       return destinationLocation;
430     }
431
432     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
433   }
434
435   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
436           "equal to sender's PE. This is unexpected and may cause errors.\n"); 
437   // to prevent warnings
438   return destinationLocation; 
439 }
440
441 template <class dtype>
442 inline 
443 int MeshStreamer<dtype>::copyDataItemIntoMessage(
444                          MeshStreamerMessage<dtype> *destinationBuffer,
445                          const void *dataItemHandle, bool copyIndirectly) {
446   return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle)); 
447 }
448
449 template <class dtype>
450 inline
451 void MeshStreamer<dtype>::storeMessage(
452                           int destinationPe, 
453                           const MeshLocation& destinationLocation,
454                           const void *dataItem, bool copyIndirectly) {
455
456   int dimension = destinationLocation.dimension;
457   int bufferIndex = destinationLocation.bufferIndex; 
458   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
459
460   // allocate new message if necessary
461   if (messageBuffers[bufferIndex] == NULL) {
462     if (dimension == 0) {
463       // personalized messages do not require destination indices
464       messageBuffers[bufferIndex] = 
465         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
466     }
467     else {
468       messageBuffers[bufferIndex] = 
469         new (bufferSize_, bufferSize_, sizeof(int)) 
470         MeshStreamerMessage<dtype>(dimension);
471     }
472     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
473     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
474     CkAssert(messageBuffers[bufferIndex] != NULL);
475   }
476   
477   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
478   int numBuffered = 
479     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
480   if (dimension != 0) {
481     destinationBuffer->markDestination(numBuffered-1, destinationPe);
482   }
483   numDataItemsBuffered_++;
484
485   // send if buffer is full
486   if (numBuffered == bufferSize_) {
487
488     int destinationIndex;
489
490     destinationIndex = myIndex_ + 
491       (bufferIndex - myLocationIndex_[dimension]) * 
492       combinedDimensionSizes_[dimension];
493
494     if (dimension == 0) {
495 #ifdef STREAMER_VERBOSE_OUTPUT
496       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
497 #endif
498       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
499     }
500     else {
501 #ifdef STREAMER_VERBOSE_OUTPUT
502       CkPrintf("[%d] sending intermediate to %d\n", 
503                CkMyPe(), destinationIndex); 
504 #endif
505       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
506     }
507
508     if (useStagedCompletion_) {
509       cntMsgSent_[dimension][bufferIndex]++; 
510     }
511
512     messageBuffers[bufferIndex] = NULL;
513     numDataItemsBuffered_ -= numBuffered; 
514     hasSentRecently_ = true; 
515
516   }
517   // send if total buffering capacity has been reached
518   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
519     sendLargestBuffer();
520     hasSentRecently_ = true; 
521   }
522
523 }
524 template <class dtype>
525 inline
526 void MeshStreamer<dtype>::broadcast(const dtype& dataItem) {
527   const static bool copyIndirectly = true;
528
529   // no data items should be submitted after all local contributors call done 
530   // and staged completion has begun
531   CkAssert(stagedCompletionStarted() == false);
532
533   // produce and consume once per PE
534   if (!useStagedCompletion_) {
535     detectorLocalObj_->produce(CkNumPes());
536   }
537
538   // deliver locally
539   localBroadcast(dataItem);
540
541   broadcast(&dataItem, numDimensions_ - 1, copyIndirectly); 
542 }
543
544 template <class dtype>
545 inline
546 void MeshStreamer<dtype>::broadcast(const void *dataItemHandle, int dimension, 
547                                     bool copyIndirectly) {
548
549   MeshLocation destinationLocation;
550   destinationLocation.dimension = dimension; 
551
552   while (destinationLocation.dimension != -1) {
553     for (int i = 0; 
554          i < individualDimensionSizes_[destinationLocation.dimension]; 
555          i++) {
556
557       if (i != myLocationIndex_[destinationLocation.dimension]) {
558         destinationLocation.bufferIndex = i; 
559         storeMessage(TRAM_BROADCAST, destinationLocation, 
560                      dataItemHandle, copyIndirectly); 
561       }
562       // release control to scheduler if requested by the user, 
563       //   assume caller is threaded entry
564       if (yieldFlag_ && ++yieldCount_ == 1024) {
565         yieldCount_ = 0; 
566         CthYield();
567       }
568     }
569     destinationLocation.dimension--; 
570   }
571
572 }
573
574
575 template <class dtype>
576 inline
577 void MeshStreamer<dtype>::insertData(const void *dataItemHandle, 
578                                      int destinationPe) {
579   const static bool copyIndirectly = true;
580
581   // treat newly inserted items as if they were received along
582   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
583   MeshLocation destinationLocation = determineLocation(destinationPe, 
584                                                        numDimensions_);
585   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
586                copyIndirectly); 
587   // release control to scheduler if requested by the user, 
588   //   assume caller is threaded entry
589   if (yieldFlag_ && ++yieldCount_ == 1024) {
590     yieldCount_ = 0; 
591     CthYield();
592   }
593
594 }
595
596 template <class dtype>
597 inline
598 void MeshStreamer<dtype>::insertData(const dtype& dataItem, int destinationPe) {
599
600   // no data items should be submitted after all local contributors call done 
601   // and staged completion has begun
602   CkAssert(stagedCompletionStarted() == false);
603
604   if (!useStagedCompletion_) {
605     detectorLocalObj_->produce();
606   }
607   if (destinationPe == CkMyPe()) {
608     localDeliver(dataItem);
609     return;
610   }
611
612   insertData((const void *) &dataItem, destinationPe);
613 }
614
615 template <class dtype>
616 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
617                                CkCallback endCb, int prio, 
618                                bool usePeriodicFlushing) {
619   useStagedCompletion_ = true; 
620   // allocate memory on first use
621   if (cntMsgSent_ == NULL) {
622     cntMsgSent_ = new int*[numDimensions_]; 
623     cntMsgReceived_ = new int[numDimensions_];
624     cntMsgExpected_ = new int[numDimensions_];
625     cntFinished_ = new int[numDimensions_]; 
626
627     for (int i = 0; i < numDimensions_; i++) {
628       cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
629     }
630   }
631
632
633   for (int i = 0; i < numDimensions_; i++) {
634     std::fill(cntMsgSent_[i], 
635               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
636     cntMsgReceived_[i] = 0;
637     cntMsgExpected_[i] = 0; 
638     cntFinished_[i] = 0; 
639   }
640   dimensionToFlush_ = numDimensions_ - 1;
641
642   yieldCount_ = 0; 
643   userCallback_ = endCb; 
644   prio_ = prio;
645
646   numLocalDone_ = 0; 
647   numLocalContributors_ = numLocalContributors; 
648   initLocalClients();
649
650   if (numLocalContributors_ == 0) {
651     startStagedCompletion();
652   }
653
654   hasSentRecently_ = false;
655   if (usePeriodicFlushing) {
656     enablePeriodicFlushing();
657   }
658
659   this->contribute(startCb);
660 }
661
662 template <class dtype>
663 void MeshStreamer<dtype>::init(int numContributors,       
664                                CkCallback startCb, CkCallback endCb,
665                                CProxy_CompletionDetector detector, 
666                                int prio, bool usePeriodicFlushing) {
667
668   useStagedCompletion_ = false; 
669   yieldCount_ = 0; 
670   prio_ = prio;
671   userCallback_ = endCb; 
672   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushIfIdle(), 
673                      this->thisProxy);
674   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
675                     this->thisProxy);
676   detector_ = detector;      
677   detectorLocalObj_ = detector_.ckLocalBranch();
678   initLocalClients();
679
680   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, 
681                                      finish , 0);
682   
683   if (progressPeriodInMs_ <= 0) {
684     CkPrintf("Using completion detection in NDMeshStreamer requires"
685              " setting a valid periodic flush period. Defaulting"
686              " to 10 ms\n");
687     progressPeriodInMs_ = 10;
688   }
689   
690   hasSentRecently_ = false; 
691   if (usePeriodicFlushing) {
692     enablePeriodicFlushing();
693   }
694
695 }
696
697 template <class dtype>
698 void MeshStreamer<dtype>::init(CkArrayID senderArrayID, CkCallback startCb, 
699             CkCallback endCb, int prio, bool usePeriodicFlushing) {
700     CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
701     int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
702     init(numLocalElements, startCb, endCb, prio, usePeriodicFlushing); 
703   }
704
705
706 template <class dtype>
707 void MeshStreamer<dtype>::finish() {
708   isPeriodicFlushEnabled_ = false; 
709
710   if (!userCallback_.isInvalid()) {
711     this->contribute(userCallback_);
712     userCallback_ = CkCallback();      // nullify the current callback
713   }
714
715 }
716
717 template <class dtype>
718 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
719
720   int destinationPe, lastDestinationPe; 
721   MeshLocation destinationLocation;
722
723   lastDestinationPe = -1;
724   for (int i = 0; i < msg->numDataItems; i++) {
725     destinationPe = msg->destinationPes[i];
726     const dtype& dataItem = msg->getDataItem(i);
727     if (destinationPe == CkMyPe()) {
728       localDeliver(dataItem);
729     }
730     else if (destinationPe != TRAM_BROADCAST) {
731       if (destinationPe != lastDestinationPe) {
732         // do this once per sequence of items with the same destination
733         destinationLocation = determineLocation(destinationPe, msg->dimension);
734       }
735       storeMessage(destinationPe, destinationLocation, &dataItem);   
736     }
737     else /* if (destinationPe == TRAM_BROADCAST) */ {
738       localBroadcast(dataItem);       
739       broadcast(&dataItem, msg->dimension - 1, false); 
740     }
741     lastDestinationPe = destinationPe; 
742   }
743
744   if (useStagedCompletion_) {
745     markMessageReceived(msg->dimension, msg->finalMsgCount); 
746   }
747
748   delete msg;
749
750 }
751
752 template <class dtype>
753 void MeshStreamer<dtype>::sendLargestBuffer() {
754
755   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
756   MeshStreamerMessage<dtype> ** messageBuffers; 
757   MeshStreamerMessage<dtype> *destinationBuffer; 
758
759   for (int i = 0; i < numDimensions_; i++) {
760
761     messageBuffers = dataBuffers_[i]; 
762     numBuffers = individualDimensionSizes_[i]; 
763
764     flushDimension = i; 
765     maxSize = 0;    
766     for (int j = 0; j < numBuffers; j++) {
767       if (messageBuffers[j] != NULL && 
768           messageBuffers[j]->numDataItems > maxSize) {
769         maxSize = messageBuffers[j]->numDataItems;
770         flushIndex = j; 
771       }
772     }
773
774     if (maxSize > 0) {
775
776       messageBuffers = dataBuffers_[flushDimension]; 
777       destinationBuffer = messageBuffers[flushIndex];
778       destinationIndex = myIndex_ + 
779         (flushIndex - myLocationIndex_[flushDimension]) * 
780         combinedDimensionSizes_[flushDimension] ;
781
782       // not sending the full buffer, shrink the message size
783       envelope *env = UsrToEnv(destinationBuffer);
784       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
785                         (bufferSize_ - destinationBuffer->numDataItems));
786       *((int *) env->getPrioPtr()) = prio_;
787
788       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
789
790       if (flushDimension == 0) {
791 #ifdef STREAMER_VERBOSE_OUTPUT
792         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
793 #endif
794         this->thisProxy[destinationIndex].
795           receiveAtDestination(destinationBuffer);
796       }
797       else {
798 #ifdef STREAMER_VERBOSE_OUTPUT
799         CkPrintf("[%d] sending intermediate flush to %d\n", 
800                  CkMyPe(), destinationIndex); 
801 #endif
802         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
803       }
804
805       if (useStagedCompletion_) {
806         cntMsgSent_[i][flushIndex]++; 
807       }
808
809       messageBuffers[flushIndex] = NULL;
810
811     }
812
813   }
814 }
815
816 template <class dtype>
817 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
818   
819   for (int i = 0; i < numDimensions_; i++) {
820     flushDimension(i); 
821   }
822 }
823
824 template <class dtype>
825 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
826 #ifdef STREAMER_VERBOSE_OUTPUT
827   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", 
828            CkMyPe(), dimension, sendMsgCounts); 
829 #endif
830   MeshStreamerMessage<dtype> **messageBuffers; 
831   MeshStreamerMessage<dtype> *destinationBuffer; 
832   int destinationIndex, numBuffers; 
833
834   messageBuffers = dataBuffers_[dimension]; 
835   numBuffers = individualDimensionSizes_[dimension]; 
836
837   for (int j = 0; j < numBuffers; j++) {
838
839     if(messageBuffers[j] == NULL) {      
840       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
841         messageBuffers[j] = 
842           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
843         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
844         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
845       }
846       else {
847         continue; 
848       } 
849     }
850
851     destinationBuffer = messageBuffers[j];
852     destinationIndex = myIndex_ + 
853       (j - myLocationIndex_[dimension]) * 
854       combinedDimensionSizes_[dimension] ;
855
856     if (destinationBuffer->numDataItems != 0) {
857       // not sending the full buffer, shrink the message size
858       envelope *env = UsrToEnv(destinationBuffer);
859       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
860                         (bufferSize_ - destinationBuffer->numDataItems));
861       *((int *) env->getPrioPtr()) = prio_;
862     }
863     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
864
865     if (useStagedCompletion_) {
866       cntMsgSent_[dimension][j]++;
867       if (sendMsgCounts) {
868         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
869       }
870     }
871
872     if (dimension == 0) {
873 #ifdef STREAMER_VERBOSE_OUTPUT
874       CkPrintf("[%d] sending dimension flush to %d\n", 
875                CkMyPe(), destinationIndex); 
876 #endif
877       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
878     }
879     else {
880 #ifdef STREAMER_VERBOSE_OUTPUT
881       CkPrintf("[%d] sending intermediate dimension flush to %d\n", 
882                CkMyPe(), destinationIndex); 
883 #endif
884       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
885     }
886     messageBuffers[j] = NULL;
887   }
888   
889 }
890
891
892 template <class dtype>
893 void MeshStreamer<dtype>::flushIfIdle(){
894   // flush if (1) this is not a periodic call or 
895   //          (2) this is a periodic call and no sending took place
896   //              since the last time the function was invoked
897   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
898
899     if (numDataItemsBuffered_ != 0) {
900       flushToIntermediateDestinations();
901     }    
902     CkAssert(numDataItemsBuffered_ == 0); 
903     
904   }
905
906   hasSentRecently_ = false; 
907
908 }
909
910 template <class dtype>
911 void periodicProgressFunction(void *MeshStreamerObj, double time) {
912
913   MeshStreamer<dtype> *properObj = 
914     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
915
916   if (properObj->isPeriodicFlushEnabled()) {
917     properObj->flushIfIdle();
918     properObj->registerPeriodicProgressFunction();
919   }
920 }
921
922 template <class dtype>
923 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
924   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
925                  progressPeriodInMs_); 
926 }
927
928
929 template <class dtype>
930 class GroupMeshStreamer : public MeshStreamer<dtype> {
931 private:
932
933   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
934   MeshStreamerGroupClient<dtype> *clientObj_;
935
936   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
937     for (int i = 0; i < msg->numDataItems; i++) {
938       const dtype& data = msg->getDataItem(i);
939       clientObj_->process(data);
940     }
941
942     if (MeshStreamer<dtype>::useStagedCompletion_) {
943 #ifdef STREAMER_VERBOSE_OUTPUT
944       envelope *env = UsrToEnv(msg);
945       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
946                CkMyPe(), env->getSrcPe(), msg->numDataItems, 
947                msg->finalMsgCount);  
948 #endif
949       markMessageReceived(msg->dimension, msg->finalMsgCount); 
950     }
951     else {
952       this->detectorLocalObj_->consume(msg->numDataItems);    
953     }
954
955     delete msg;
956   }
957
958   void localDeliver(const dtype& dataItem) {
959     clientObj_->process(dataItem);
960     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
961       MeshStreamer<dtype>::detectorLocalObj_->consume();
962     }
963   }
964
965   void localBroadcast(const dtype& dataItem) {
966     localDeliver(dataItem); 
967   }
968
969   int numElementsInClient() {
970     // client is a group - there is one element per PE
971     return CkNumPes();
972   }
973
974   int numLocalElementsInClient() {
975     return 1; 
976   }
977
978   void initLocalClients() {
979     // no action required
980   }
981
982 public:
983
984   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
985                     int *dimensionSizes, 
986                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
987                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
988    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
989                         0, yieldFlag, progressPeriodInMs) 
990   {
991     clientProxy_ = clientProxy; 
992     clientObj_ = 
993       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
994   }
995
996   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
997                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
998                     int bufferSize, bool yieldFlag = 0, 
999                     double progressPeriodInMs = -1.0)
1000    :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
1001                         yieldFlag, progressPeriodInMs) 
1002   {
1003     clientProxy_ = clientProxy; 
1004     clientObj_ = 
1005       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
1006   }
1007
1008 };
1009
1010 template <class dtype>
1011 class ClientInitializer : public CkLocIterator {
1012
1013 public:
1014   
1015   CompletionDetector *detectorLocalObj_;
1016   CkArray *clientArrMgr_;
1017   ClientInitializer(CompletionDetector *detectorObj, 
1018                              CkArray *clientArrMgr) 
1019     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
1020
1021   // CkLocMgr::iterate will call addLocation on all elements local to this PE
1022   void addLocation(CkLocation& loc) {
1023
1024     MeshStreamerArrayClient<dtype> *clientObj = 
1025       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1026
1027     CkAssert(clientObj != NULL); 
1028     clientObj->setDetector(detectorLocalObj_); 
1029   }
1030
1031 };
1032
1033 template <class dtype>
1034 class LocalBroadcaster : public CkLocIterator {
1035
1036 public:
1037   CkArray *clientArrMgr_;
1038   const dtype *dataItem_; 
1039
1040   LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem) 
1041    : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
1042
1043   void addLocation(CkLocation& loc) {
1044     MeshStreamerArrayClient<dtype> *clientObj = 
1045       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1046
1047     CkAssert(clientObj != NULL); 
1048
1049     clientObj->process(*dataItem_); 
1050   }
1051
1052 };
1053
1054 template <class dtype, class itype>
1055 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
1056   
1057 private:
1058   
1059   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
1060   CkArray *clientArrayMgr_;
1061   int numArrayElements_;
1062   int numLocalArrayElements_;
1063 #ifdef CACHE_ARRAY_METADATA
1064   MeshStreamerArrayClient<dtype> **clientObjs_;
1065   int *destinationPes_;
1066   bool *isCachedArrayMetadata_;
1067 #endif
1068
1069   void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
1070     itype arrayId = packedDataItem.arrayIndex; 
1071     if (arrayId == itype(TRAM_BROADCAST)) {
1072       localBroadcast(packedDataItem);
1073       return;
1074     }
1075     MeshStreamerArrayClient<dtype> *clientObj;
1076 #ifdef CACHE_ARRAY_METADATA
1077     clientObj = clientObjs_[arrayId];
1078 #else
1079     clientObj = clientProxy_[arrayId].ckLocal();
1080 #endif
1081
1082     if (clientObj != NULL) {
1083       clientObj->process(packedDataItem.dataItem);
1084       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1085            ::useStagedCompletion_ == false) {
1086         MeshStreamer<ArrayDataItem<dtype, itype> >
1087          ::detectorLocalObj_->consume();
1088       }
1089     }
1090     else { 
1091       // array element is no longer present locally - redeliver using proxy
1092       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
1093     }
1094   }
1095
1096   void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
1097
1098     LocalBroadcaster<dtype> clientIterator(clientProxy_.ckLocalBranch(), 
1099                                            &packedDataItem.dataItem);
1100     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1101     clientLocMgr->iterate(clientIterator);
1102
1103     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1104          ::useStagedCompletion_ == false) {
1105         MeshStreamer<ArrayDataItem<dtype, itype> >
1106          ::detectorLocalObj_->consume();      
1107     }
1108
1109   }
1110
1111   int numElementsInClient() {
1112     return numArrayElements_;
1113   }
1114
1115   int numLocalElementsInClient() {
1116     return numLocalArrayElements_;
1117   }
1118
1119   void initLocalClients() {
1120     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1121          ::useStagedCompletion_ == false) {
1122 #ifdef CACHE_ARRAY_METADATA
1123       std::fill(isCachedArrayMetadata_, 
1124                 isCachedArrayMetadata_ + numArrayElements_, false);
1125
1126       for (int i = 0; i < numArrayElements_; i++) {
1127         clientObjs_[i] = clientProxy_[i].ckLocal();
1128         if (clientObjs_[i] != NULL) {
1129           clientObjs_[i]->setDetector(
1130            MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
1131         }
1132       }
1133 #else
1134       // set completion detector in local elements of the client
1135       CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1136       ClientInitializer<dtype> clientIterator(
1137           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1138           clientProxy_.ckLocalBranch());
1139       clientLocMgr->iterate(clientIterator);
1140 #endif    
1141     }
1142     else {
1143       numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1144     }
1145   }
1146
1147   void commonInit() {
1148 #ifdef CACHE_ARRAY_METADATA
1149     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1150     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1151     destinationPes_ = new int[numArrayElements_];
1152     isCachedArrayMetadata_ = new bool[numArrayElements_];
1153     std::fill(isCachedArrayMetadata_, 
1154               isCachedArrayMetadata_ + numArrayElements_, false);
1155 #endif    
1156   }
1157
1158 public:
1159
1160   struct DataItemHandle {
1161     itype arrayIndex; 
1162     const dtype *dataItem;
1163   };
1164
1165   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1166                     int *dimensionSizes, 
1167                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1168                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1169     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1170                   maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
1171                   0, yieldFlag, progressPeriodInMs) 
1172   {
1173     clientProxy_ = clientProxy; 
1174     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1175     commonInit();
1176   }
1177
1178   ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
1179                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1180                     int bufferSize, bool yieldFlag = 0, 
1181                     double progressPeriodInMs = -1.0)
1182     :MeshStreamer<ArrayDataItem<dtype,itype> >(
1183                   0, numDimensions, dimensionSizes, 
1184                   bufferSize, yieldFlag, progressPeriodInMs) 
1185   {
1186     clientProxy_ = clientProxy; 
1187     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1188     commonInit();
1189
1190   }
1191
1192   ~ArrayMeshStreamer() {
1193 #ifdef CACHE_ARRAY_METADATA
1194     delete [] clientObjs_;
1195     delete [] destinationPes_;
1196     delete [] isCachedArrayMetadata_; 
1197 #endif
1198   }
1199
1200   void receiveAtDestination(
1201        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1202
1203     for (int i = 0; i < msg->numDataItems; i++) {
1204       const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
1205       localDeliver(packedData);
1206     }
1207     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1208       markMessageReceived(msg->dimension, msg->finalMsgCount);
1209     }
1210
1211     delete msg;
1212   }
1213
1214   void broadcast(const dtype& dataItem) {
1215     const static bool copyIndirectly = true;
1216
1217     // no data items should be submitted after all local contributors call done
1218     // and staged completion has begun
1219     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1220                ::stagedCompletionStarted()) == false);
1221
1222     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1223          ::useStagedCompletion_ == false) {
1224       MeshStreamer<ArrayDataItem<dtype, itype> >
1225         ::detectorLocalObj_->produce(CkNumPes());
1226     }
1227
1228     // deliver locally
1229     ArrayDataItem<dtype, itype>& packedDataItem(TRAM_BROADCAST, dataItem);
1230     localBroadcast(packedDataItem);
1231
1232     DataItemHandle tempHandle; 
1233     tempHandle.dataItem = &dataItem;
1234     tempHandle.arrayIndex = TRAM_BROADCAST;
1235
1236     int numDimensions = 
1237       MeshStreamer<ArrayDataItem<dtype, itype> >::numDimensions_;
1238     MeshStreamer<ArrayDataItem<dtype, itype> >::
1239       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
1240   }
1241
1242   void insertData(const dtype& dataItem, itype arrayIndex) {
1243
1244     // no data items should be submitted after all local contributors call done
1245     // and staged completion has begun
1246     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1247                ::stagedCompletionStarted()) == false);
1248
1249     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1250          ::useStagedCompletion_ == false) {
1251       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1252     }
1253     int destinationPe; 
1254 #ifdef CACHE_ARRAY_METADATA
1255   if (isCachedArrayMetadata_[arrayIndex]) {    
1256     destinationPe =  destinationPes_[arrayIndex];
1257   }
1258   else {
1259     destinationPe = 
1260       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1261     isCachedArrayMetadata_[arrayIndex] = true;
1262     destinationPes_[arrayIndex] = destinationPe;
1263   }
1264 #else 
1265   destinationPe = 
1266     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1267 #endif
1268
1269     if (destinationPe == CkMyPe()) {
1270       ArrayDataItem<dtype, itype> packedDataItem(arrayIndex, dataItem);
1271       localDeliver(packedDataItem);
1272       return;
1273     }
1274
1275     // this implementation avoids copying an item before transfer into message
1276     DataItemHandle tempHandle; 
1277     tempHandle.arrayIndex = arrayIndex; 
1278     tempHandle.dataItem = &dataItem;
1279
1280     MeshStreamer<ArrayDataItem<dtype, itype> >::
1281      insertData(&tempHandle, destinationPe);
1282
1283   }
1284
1285   int copyDataItemIntoMessage(
1286       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1287       const void *dataItemHandle, bool copyIndirectly) {
1288
1289     if (copyIndirectly == true) {
1290       // newly inserted items are passed through a handle to avoid copying
1291       int numDataItems = destinationBuffer->numDataItems;
1292       const DataItemHandle *tempHandle = 
1293         (const DataItemHandle *) dataItemHandle;
1294       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1295         *(tempHandle->dataItem);
1296       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1297         tempHandle->arrayIndex;
1298       return ++destinationBuffer->numDataItems;
1299     }
1300     else {
1301       // this is an item received along the route to destination
1302       // we can copy it from the received message
1303       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1304               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1305     }
1306   }
1307
1308 };
1309
1310 template <class dtype>
1311 class GroupChunkMeshStreamer : 
1312   public GroupMeshStreamer<ChunkDataItem> {
1313
1314 private:
1315   ChunkDataItem **receiveBuffers;
1316   int *receivedChunks;
1317
1318 public:
1319
1320   GroupChunkMeshStreamer(
1321        int maxNumDataItemsBuffered, int numDimensions,
1322        int *dimensionSizes, 
1323        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1324        bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1325     :GroupMeshStreamer<ChunkDataItem>(maxNumDataItemsBuffered, 
1326                                       numDimensions, dimensionSizes,
1327                                       0, yieldFlag, progressPeriodInMs) {
1328     
1329     receiveBuffers = new ChunkDataItem*[CkNumPes()];
1330     receivedChunks = new int[CkNumPes()]; 
1331     memset(receivedChunks, 0, CkNumPes() * sizeof(int));
1332     memset(receiveBuffers, 0, CkNumPes() * sizeof(ChunkDataItem*)); 
1333   }
1334
1335   GroupChunkMeshStreamer(
1336        int numDimensions, int *dimensionSizes, 
1337        const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
1338        int bufferSize, bool yieldFlag = 0, 
1339        double progressPeriodInMs = -1.0)
1340     :GroupMeshStreamer<ChunkDataItem>(0, numDimensions, dimensionSizes, 
1341                                       bufferSize, yieldFlag, 
1342                                       progressPeriodInMs) {}
1343
1344   void insertData(dtype *dataArray, int numElements, int destinationPe) {
1345
1346     char *inputData = (char *) dataArray; 
1347     int arraySizeInBytes = numElements * sizeof(dtype); 
1348     ChunkDataItem chunk;
1349     int chunkNumber = 0; 
1350     chunk.sourcePe = CkMyPe();
1351     chunk.chunkNumber = 0; 
1352     chunk.chunkSize = CHUNK_SIZE;
1353     chunk.numChunks = numElements * sizeof(dtype) / CHUNK_SIZE; 
1354     chunk.numItems = numElements; 
1355     for (int offset = 0; offset < arraySizeInBytes; offset += CHUNK_SIZE) {
1356
1357       if (offset + CHUNK_SIZE > arraySizeInBytes) {
1358         chunk.chunkSize = arraySizeInBytes - offset; 
1359         memset(chunk.rawData, 0, CHUNK_SIZE);
1360         memcpy(chunk.rawData + offset, inputData + offset, chunk.chunkSize); 
1361       }
1362       else {
1363         memcpy(chunk.rawData, inputData + offset, CHUNK_SIZE);
1364       }
1365
1366     }
1367
1368     insertData(chunk, destinationPe); 
1369
1370   }
1371
1372   void receiveAtDestination(
1373        MeshStreamerMessage<ChunkDataItem> *msg) {
1374
1375     for (int i = 0; i < msg->numDataItems; i++) {
1376       const ChunkDataItem& chunk = msg->getDataItem(i);
1377       
1378       if (receiveBuffers[chunk.sourcePe] == NULL) {
1379         receiveBuffers[chunk.sourcePe] = new dtype[chunk.numItems]; 
1380       }      
1381
1382       char *receiveBuffer = &receiveBuffers[chunk.sourcePe];
1383
1384       memcpy(receiveBuffer + chunk.chunkNumber * sizeof(dtype), 
1385              chunk.rawData, chunk.chunkSize);
1386       if (++receivedChunks[chunk.sourcePe] == chunk.numChunks) {
1387
1388         clientObj_->receiveArray((dtype *) receiveBuffer, chunk.numItems);
1389         receivedChunks[chunk.sourcePe] = 0;        
1390         delete [] receiveBuffers[chunk.sourcePe]; 
1391         receiveBuffers[chunk.sourcePe] = NULL;
1392       }
1393       
1394     }
1395
1396     if (MeshStreamer<dtype>::useStagedCompletion_) {
1397 #ifdef STREAMER_VERBOSE_OUTPUT
1398       envelope *env = UsrToEnv(msg);
1399       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
1400                CkMyPe(), env->getSrcPe(), msg->numDataItems, 
1401                msg->finalMsgCount);  
1402 #endif
1403       markMessageReceived(msg->dimension, msg->finalMsgCount); 
1404     }
1405     else {
1406       this->detectorLocalObj_->consume(msg->numDataItems);    
1407     }
1408
1409     delete msg;
1410     
1411   }
1412
1413 };
1414
1415
1416
1417 #define CK_TEMPLATES_ONLY
1418 #include "NDMeshStreamer.def.h"
1419 #undef CK_TEMPLATES_ONLY
1420
1421 #endif