37ca015bcc21814ce8fe094d8e01b60f19d332e5
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define CACHE_LOCATIONS
17 // #define SUPPORT_INCOMPLETE_MESH
18 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
19 // #define STREAMER_VERBOSE_OUTPUT
20
21 struct MeshLocation {
22   int dimension; 
23   int bufferIndex; 
24 }; 
25
26 template<class dtype>
27 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
28
29 public:
30
31   int finalMsgCount; 
32   int dimension; 
33   int numDataItems;
34   int *destinationPes;
35   dtype *dataItems;
36
37   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
38     finalMsgCount = -1; 
39   }
40
41   int addDataItem(const dtype &dataItem) {
42     dataItems[numDataItems] = dataItem;
43     return ++numDataItems; 
44   }
45
46   void markDestination(const int index, const int destinationPe) {
47     destinationPes[index] = destinationPe;
48   }
49
50   dtype &getDataItem(const int index) {
51     return dataItems[index];
52   }
53
54 };
55
56 template <class dtype>
57 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
58 private:
59   CompletionDetector *detectorLocalObj_;
60 public:
61   MeshStreamerArrayClient(){}
62   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
63   // would like to make it pure virtual but charm will try to
64   // instantiate the abstract class, leading to errors
65   virtual void process(dtype &data) {
66     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
67             "This virtual function should have been defined by the user.\n");
68   };     
69   void setDetector(CompletionDetector *detectorLocalObj) {
70     detectorLocalObj_ = detectorLocalObj;
71   }
72   void receiveRedeliveredItem(dtype data) {
73 #ifdef STREAMER_VERBOSE_OUTPUT
74     CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
75 #endif
76     detectorLocalObj_->consume();
77     process(data);
78   }
79
80   void pup(PUP::er &p) {
81     CBase_MeshStreamerArrayClient<dtype>::pup(p);
82    }  
83
84 };
85
86 template <class dtype>
87 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
88
89 public:
90   virtual void process(dtype &data) = 0;
91
92 };
93
94 template <class dtype>
95 class MeshStreamer : public CBase_MeshStreamer<dtype> {
96
97 private:
98   int bufferSize_; 
99   int maxNumDataItemsBuffered_;
100   int numDataItemsBuffered_;
101
102   int numMembers_; 
103   int numDimensions_;
104   int *individualDimensionSizes_;
105   int *combinedDimensionSizes_;
106
107   int *startingIndexAtDimension_; 
108
109   int myIndex_;
110   int *myLocationIndex_;
111
112   CkCallback   userCallback_;
113   bool yieldFlag_;
114
115   double progressPeriodInMs_; 
116   bool isPeriodicFlushEnabled_; 
117   bool hasSentRecently_;
118   MeshStreamerMessage<dtype> ***dataBuffers_;
119
120   CProxy_CompletionDetector detector_;
121   int prio_;
122   int yieldCount_;
123
124 #ifdef CACHE_LOCATIONS
125   MeshLocation *cachedLocations_;
126   bool *isCached_; 
127 #endif
128
129
130   // only used for staged completion
131   int **cntMsgSent_;
132   int *cntMsgReceived_;
133   int *cntMsgExpected_;
134   int *cntFinished_; 
135   int dimensionToFlush_;
136   int numLocalDone_; 
137   int numLocalContributors_;
138
139   void storeMessage(int destinationPe, 
140                     const MeshLocation &destinationCoordinates, 
141                     void *dataItem, bool copyIndirectly = false);
142   virtual void localDeliver(dtype &dataItem) = 0; 
143
144   virtual int numElementsInClient() = 0;
145   virtual int numLocalElementsInClient() = 0; 
146
147   virtual void initLocalClients() = 0;
148
149   void sendLargestBuffer();
150   void flushToIntermediateDestinations();
151   void flushDimension(int dimension, bool sendMsgCounts = false); 
152
153 protected:
154
155   bool useStagedCompletion_;
156   CompletionDetector *detectorLocalObj_;
157   virtual int copyDataItemIntoMessage(
158               MeshStreamerMessage<dtype> *destinationBuffer, 
159               void *dataItemHandle, bool copyIndirectly = false);
160   MeshLocation determineLocation(int destinationPe, 
161                                  int dimensionReceivedAlong);
162 public:
163
164   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
165                int *dimensionSizes, int bufferSize,
166                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
167   ~MeshStreamer();
168
169   // entry
170   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
171   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
172   void flushIfIdle();
173   void finish();
174
175   // non entry
176   bool isPeriodicFlushEnabled() {
177     return isPeriodicFlushEnabled_;
178   }
179   virtual void insertData(dtype &dataItem, int destinationPe); 
180   void insertData(void *dataItemHandle, int destinationPe);
181   void associateCallback(int numContributors, 
182                          CkCallback startCb, CkCallback endCb, 
183                          CProxy_CompletionDetector detector,
184                          int prio, bool usePeriodicFlushing);
185   void registerPeriodicProgressFunction();
186
187   // flushing begins only after enablePeriodicFlushing has been invoked
188
189   void enablePeriodicFlushing(){
190     isPeriodicFlushEnabled_ = true; 
191     registerPeriodicProgressFunction();
192   }
193
194   void done(int numContributorsFinished = 1) {
195
196     if (useStagedCompletion_) {
197       numLocalDone_ += numContributorsFinished; 
198       if (numLocalDone_ == numLocalContributors_) {
199         startStagedCompletion();
200       }
201     }
202     else {
203       detectorLocalObj_->done(numContributorsFinished);
204     }
205   }
206
207   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
208             int prio, bool usePeriodicFlushing);
209   
210   bool stagedCompletionStarted() {    
211     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
212   }
213
214   void startStagedCompletion() {          
215     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
216       flushDimension(dimensionToFlush_, true);
217     }
218     dimensionToFlush_--;
219
220     checkForCompletedStages();
221   }
222
223   void markMessageReceived(int dimension, int finalCount) {
224     cntMsgReceived_[dimension]++;
225     if (finalCount != -1) {
226       cntFinished_[dimension]++; 
227       cntMsgExpected_[dimension] += finalCount; 
228 #ifdef STREAMER_VERBOSE_OUTPUT
229       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
230                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
231                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
232                cntMsgReceived_[dimension]); 
233 #endif
234     }  
235     if (dimensionToFlush_ != numDimensions_ - 1) {
236       checkForCompletedStages();
237     }
238   }
239
240   void checkForCompletedStages() {
241
242     while (cntFinished_[dimensionToFlush_ + 1] == 
243            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
244            cntMsgExpected_[dimensionToFlush_ + 1] == 
245            cntMsgReceived_[dimensionToFlush_ + 1]) {
246       if (dimensionToFlush_ == -1) {
247 #ifdef STREAMER_VERBOSE_OUTPUT
248         CkPrintf("[%d] contribute\n", CkMyPe()); 
249 #endif
250         CkAssert(numDataItemsBuffered_ == 0); 
251         this->contribute(userCallback_);
252         return; 
253       }
254       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
255         flushDimension(dimensionToFlush_, true); 
256       }
257       dimensionToFlush_--; 
258     }
259   }
260
261 };
262
263 template <class dtype>
264 MeshStreamer<dtype>::MeshStreamer(
265                      int maxNumDataItemsBuffered, int numDimensions, 
266                      int *dimensionSizes, 
267                      int bufferSize,
268                      bool yieldFlag, 
269                      double progressPeriodInMs)
270  :numDimensions_(numDimensions), 
271   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
272   yieldFlag_(yieldFlag), 
273   progressPeriodInMs_(progressPeriodInMs), 
274   bufferSize_(bufferSize)
275 {
276
277   int sumAlongAllDimensions = 0;   
278   individualDimensionSizes_ = new int[numDimensions_];
279   combinedDimensionSizes_ = new int[numDimensions_ + 1];
280   myLocationIndex_ = new int[numDimensions_];
281   startingIndexAtDimension_ = new int[numDimensions_ + 1]; 
282   memcpy(individualDimensionSizes_, dimensionSizes, 
283          numDimensions * sizeof(int)); 
284   combinedDimensionSizes_[0] = 1; 
285   for (int i = 0; i < numDimensions; i++) {
286     sumAlongAllDimensions += individualDimensionSizes_[i];
287     combinedDimensionSizes_[i + 1] = 
288       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
289   }
290
291   // a bufferSize input of 0 indicates it should be calculated by the library
292   if (bufferSize_ == 0) {
293     CkAssert(maxNumDataItemsBuffered_ > 0);
294     // except for personalized messages, the buffers for dimensions with the 
295     //   same index as the sender's are not used
296     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
297       / (sumAlongAllDimensions - numDimensions_ + 1); 
298   }
299   else {
300     maxNumDataItemsBuffered_ = 
301       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1);
302   }
303
304   if (bufferSize_ <= 0) {
305     bufferSize_ = 1; 
306     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
307              "is invalid. Defaulting to a single buffer per destination.\n");
308   }
309   numDataItemsBuffered_ = 0; 
310   numMembers_ = CkNumPes(); 
311
312   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
313   for (int i = 0; i < numDimensions; i++) {
314     int numMembersAlongDimension = individualDimensionSizes_[i]; 
315     dataBuffers_[i] = 
316       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
317     for (int j = 0; j < numMembersAlongDimension; j++) {
318       dataBuffers_[i][j] = NULL;
319     }
320   }
321
322   myIndex_ = CkMyPe();
323   int remainder = myIndex_;
324   startingIndexAtDimension_[numDimensions_] = 0;
325   for (int i = numDimensions_ - 1; i >= 0; i--) {    
326     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
327     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
328     remainder -= dimensionOffset; 
329     startingIndexAtDimension_[i] = startingIndexAtDimension_[i+1] + dimensionOffset; 
330   }
331
332   isPeriodicFlushEnabled_ = false; 
333   detectorLocalObj_ = NULL;
334
335 #ifdef CACHE_LOCATIONS
336   cachedLocations_ = new MeshLocation[numMembers_];
337   isCached_ = new bool[numMembers_];
338   std::fill(isCached_, isCached_ + numMembers_, false);
339 #endif
340
341   cntMsgSent_ = NULL; 
342   cntMsgReceived_ = NULL; 
343   cntMsgExpected_ = NULL; 
344   cntFinished_ = NULL; 
345
346 }
347
348 template <class dtype>
349 MeshStreamer<dtype>::~MeshStreamer() {
350
351   for (int i = 0; i < numDimensions_; i++) {
352     for (int j=0; j < individualDimensionSizes_[i]; j++) {
353       delete[] dataBuffers_[i][j]; 
354     }
355     delete[] dataBuffers_[i]; 
356   }
357
358   delete[] individualDimensionSizes_;
359   delete[] combinedDimensionSizes_; 
360   delete[] myLocationIndex_;
361   delete[] startingIndexAtDimension_;
362
363 #ifdef CACHE_LOCATIONS
364   delete[] cachedLocations_;
365   delete[] isCached_; 
366 #endif
367
368   if (cntMsgSent_ != NULL) {
369     for (int i = 0; i < numDimensions_; i++) {
370       delete[] cntMsgSent_[i]; 
371     }
372     delete[] cntMsgSent_; 
373     delete[] cntMsgReceived_; 
374     delete[] cntMsgExpected_; 
375     delete[] cntFinished_;
376   }
377
378 }
379
380 template <class dtype>
381 inline
382 MeshLocation MeshStreamer<dtype>::determineLocation(
383                                   int destinationPe, 
384                                   int dimensionReceivedAlong) {
385
386 #ifdef CACHE_LOCATIONS
387   if (isCached_[destinationPe]) {    
388     return cachedLocations_[destinationPe]; 
389   }
390 #endif
391
392   MeshLocation destinationLocation;
393   int remainder = destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
394   int dimensionIndex; 
395   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {        
396     dimensionIndex = remainder / combinedDimensionSizes_[i];
397     
398     if (dimensionIndex != myLocationIndex_[i]) {
399       destinationLocation.dimension = i; 
400       destinationLocation.bufferIndex = dimensionIndex; 
401 #ifdef CACHE_LOCATIONS
402       cachedLocations_[destinationPe] = destinationLocation;
403       isCached_[destinationPe] = true; 
404 #endif
405       return destinationLocation;
406     }
407
408     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
409   }
410
411   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
412           "equal to sender's PE. This is unexpected and may cause errors.\n"); 
413   // to prevent warnings
414   return destinationLocation; 
415 }
416
417 template <class dtype>
418 inline 
419 int MeshStreamer<dtype>::copyDataItemIntoMessage(
420                          MeshStreamerMessage<dtype> *destinationBuffer,
421                          void *dataItemHandle, bool copyIndirectly) {
422   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
423 }
424
425 template <class dtype>
426 inline
427 void MeshStreamer<dtype>::storeMessage(
428                           int destinationPe, 
429                           const MeshLocation& destinationLocation,
430                           void *dataItem, bool copyIndirectly) {
431
432   int dimension = destinationLocation.dimension;
433   int bufferIndex = destinationLocation.bufferIndex; 
434   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
435
436   // allocate new message if necessary
437   if (messageBuffers[bufferIndex] == NULL) {
438     if (dimension == 0) {
439       // personalized messages do not require destination indices
440       messageBuffers[bufferIndex] = 
441         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
442     }
443     else {
444       messageBuffers[bufferIndex] = 
445         new (bufferSize_, bufferSize_, sizeof(int)) 
446         MeshStreamerMessage<dtype>(dimension);
447     }
448     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
449     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
450     CkAssert(messageBuffers[bufferIndex] != NULL);
451   }
452   
453   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
454   int numBuffered = 
455     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
456   if (dimension != 0) {
457     destinationBuffer->markDestination(numBuffered-1, destinationPe);
458   }  
459   numDataItemsBuffered_++;
460
461   // send if buffer is full
462   if (numBuffered == bufferSize_) {
463
464     int destinationIndex;
465
466     destinationIndex = myIndex_ + 
467       (bufferIndex - myLocationIndex_[dimension]) * 
468       combinedDimensionSizes_[dimension];
469
470     if (dimension == 0) {
471 #ifdef STREAMER_VERBOSE_OUTPUT
472       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
473 #endif
474       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
475     }
476     else {
477 #ifdef STREAMER_VERBOSE_OUTPUT
478       CkPrintf("[%d] sending intermediate to %d\n", CkMyPe(), destinationIndex); 
479 #endif
480       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
481     }
482
483     if (useStagedCompletion_) {
484       cntMsgSent_[dimension][bufferIndex]++; 
485     }
486
487     messageBuffers[bufferIndex] = NULL;
488     numDataItemsBuffered_ -= numBuffered; 
489     hasSentRecently_ = true; 
490
491   }
492   // send if total buffering capacity has been reached
493   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
494     sendLargestBuffer();
495     hasSentRecently_ = true; 
496   }
497
498 }
499
500 template <class dtype>
501 inline
502 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
503   const static bool copyIndirectly = true;
504
505   // treat newly inserted items as if they were received along
506   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
507   MeshLocation destinationLocation = determineLocation(destinationPe, 
508                                                        numDimensions_);
509   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
510                copyIndirectly); 
511   // release control to scheduler if requested by the user, 
512   //   assume caller is threaded entry
513   if (yieldFlag_ && ++yieldCount_ == 1024) {
514     yieldCount_ = 0; 
515     CthYield();
516   }
517
518 }
519
520 template <class dtype>
521 inline
522 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
523
524   // no data items should be submitted after all local contributors call done 
525   // and staged completion has begun
526   CkAssert(stagedCompletionStarted() == false);
527
528   if (!useStagedCompletion_) {
529     detectorLocalObj_->produce();
530   }
531   if (destinationPe == CkMyPe()) {
532     // copying here is necessary - user code should not be 
533     // passed back a reference to the original item
534     dtype dataItemCopy = dataItem;
535     localDeliver(dataItemCopy);
536     return;
537   }
538
539   insertData((void *) &dataItem, destinationPe);
540 }
541
542 template <class dtype>
543 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
544                                CkCallback endCb, int prio, 
545                                bool usePeriodicFlushing) {
546   useStagedCompletion_ = true; 
547   // allocate memory on first use
548   if (cntMsgSent_ == NULL) {
549     cntMsgSent_ = new int*[numDimensions_]; 
550     cntMsgReceived_ = new int[numDimensions_];
551     cntMsgExpected_ = new int[numDimensions_];
552     cntFinished_ = new int[numDimensions_]; 
553
554     for (int i = 0; i < numDimensions_; i++) {
555       cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
556     }
557   }
558
559
560   for (int i = 0; i < numDimensions_; i++) {
561     std::fill(cntMsgSent_[i], 
562               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
563     cntMsgReceived_[i] = 0;
564     cntMsgExpected_[i] = 0; 
565     cntFinished_[i] = 0; 
566   }
567   dimensionToFlush_ = numDimensions_ - 1;
568
569   yieldCount_ = 0; 
570   userCallback_ = endCb; 
571   prio_ = prio;
572
573   numLocalDone_ = 0; 
574   numLocalContributors_ = numLocalContributors; 
575   initLocalClients();
576
577   if (numLocalContributors_ == 0) {
578     startStagedCompletion();
579   }
580
581   hasSentRecently_ = false;
582   if (usePeriodicFlushing) {
583     enablePeriodicFlushing();
584   }
585
586   this->contribute(startCb);
587 }
588
589 template <class dtype>
590 void MeshStreamer<dtype>::associateCallback(
591                           int numContributors,
592                           CkCallback startCb, CkCallback endCb, 
593                           CProxy_CompletionDetector detector, 
594                           int prio, bool usePeriodicFlushing) {
595
596   useStagedCompletion_ = false; 
597   yieldCount_ = 0; 
598   prio_ = prio;
599   userCallback_ = endCb; 
600   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushIfIdle(), 
601                      this->thisProxy);
602   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
603                     this->thisProxy);
604   detector_ = detector;      
605   detectorLocalObj_ = detector_.ckLocalBranch();
606   initLocalClients();
607
608   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, 
609                                      finish , 0);
610   
611   if (progressPeriodInMs_ <= 0) {
612     CkPrintf("Using completion detection in NDMeshStreamer requires"
613              " setting a valid periodic flush period. Defaulting"
614              " to 10 ms\n");
615     progressPeriodInMs_ = 10;
616   }
617   
618   hasSentRecently_ = false; 
619   if (usePeriodicFlushing) {
620     enablePeriodicFlushing();
621   }
622
623 }
624
625 template <class dtype>
626 void MeshStreamer<dtype>::finish() {
627   isPeriodicFlushEnabled_ = false; 
628
629   if (!userCallback_.isInvalid()) {
630     this->contribute(userCallback_);
631     userCallback_ = CkCallback();      // nullify the current callback
632   }
633
634 }
635
636 template <class dtype>
637 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
638
639   int destinationPe, lastDestinationPe; 
640   MeshLocation destinationLocation;
641
642   lastDestinationPe = -1;
643   for (int i = 0; i < msg->numDataItems; i++) {
644     destinationPe = msg->destinationPes[i];
645     dtype &dataItem = msg->getDataItem(i);
646     if (destinationPe == CkMyPe()) {
647       localDeliver(dataItem);
648     }
649     else {
650       if (destinationPe != lastDestinationPe) {
651         // do this once per sequence of items with the same destination
652         destinationLocation = determineLocation(destinationPe, msg->dimension);
653       }
654       storeMessage(destinationPe, destinationLocation, &dataItem);   
655     }
656     lastDestinationPe = destinationPe; 
657   }
658
659   if (useStagedCompletion_) {
660     markMessageReceived(msg->dimension, msg->finalMsgCount); 
661   }
662
663   delete msg;
664
665 }
666
667 template <class dtype>
668 void MeshStreamer<dtype>::sendLargestBuffer() {
669
670   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
671   MeshStreamerMessage<dtype> ** messageBuffers; 
672   MeshStreamerMessage<dtype> *destinationBuffer; 
673
674   for (int i = 0; i < numDimensions_; i++) {
675
676     messageBuffers = dataBuffers_[i]; 
677     numBuffers = individualDimensionSizes_[i]; 
678
679     flushDimension = i; 
680     maxSize = 0;    
681     for (int j = 0; j < numBuffers; j++) {
682       if (messageBuffers[j] != NULL && 
683           messageBuffers[j]->numDataItems > maxSize) {
684         maxSize = messageBuffers[j]->numDataItems;
685         flushIndex = j; 
686       }
687     }
688
689     if (maxSize > 0) {
690
691       messageBuffers = dataBuffers_[flushDimension]; 
692       destinationBuffer = messageBuffers[flushIndex];
693       destinationIndex = myIndex_ + 
694         (flushIndex - myLocationIndex_[flushDimension]) * 
695         combinedDimensionSizes_[flushDimension] ;
696
697       // not sending the full buffer, shrink the message size
698       envelope *env = UsrToEnv(destinationBuffer);
699       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
700                         (bufferSize_ - destinationBuffer->numDataItems));
701       *((int *) env->getPrioPtr()) = prio_;
702
703       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
704
705       if (flushDimension == 0) {
706 #ifdef STREAMER_VERBOSE_OUTPUT
707         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
708 #endif
709         this->thisProxy[destinationIndex].
710           receiveAtDestination(destinationBuffer);
711       }
712       else {
713 #ifdef STREAMER_VERBOSE_OUTPUT
714         CkPrintf("[%d] sending intermediate flush to %d\n", CkMyPe(), destinationIndex); 
715 #endif
716         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
717       }
718
719       if (useStagedCompletion_) {
720         cntMsgSent_[i][flushIndex]++; 
721       }
722
723       messageBuffers[flushIndex] = NULL;
724
725     }
726
727   }
728 }
729
730 template <class dtype>
731 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
732   
733   for (int i = 0; i < numDimensions_; i++) {
734     flushDimension(i); 
735   }
736 }
737
738 template <class dtype>
739 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
740 #ifdef STREAMER_VERBOSE_OUTPUT
741   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", CkMyPe(), dimension, sendMsgCounts); 
742 #endif
743   MeshStreamerMessage<dtype> **messageBuffers; 
744   MeshStreamerMessage<dtype> *destinationBuffer; 
745   int destinationIndex, numBuffers; 
746
747   messageBuffers = dataBuffers_[dimension]; 
748   numBuffers = individualDimensionSizes_[dimension]; 
749
750   for (int j = 0; j < numBuffers; j++) {
751
752     if(messageBuffers[j] == NULL) {      
753       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
754         messageBuffers[j] = 
755           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
756         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
757         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
758       }
759       else {
760         continue; 
761       } 
762     }
763
764     destinationBuffer = messageBuffers[j];
765     destinationIndex = myIndex_ + 
766       (j - myLocationIndex_[dimension]) * 
767       combinedDimensionSizes_[dimension] ;
768
769     if (destinationBuffer->numDataItems != 0) {
770       // not sending the full buffer, shrink the message size
771       envelope *env = UsrToEnv(destinationBuffer);
772       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
773                         (bufferSize_ - destinationBuffer->numDataItems));
774       *((int *) env->getPrioPtr()) = prio_;
775     }
776     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
777
778     if (useStagedCompletion_) {
779       cntMsgSent_[dimension][j]++;
780       if (sendMsgCounts) {
781         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
782       }
783     }
784
785     if (dimension == 0) {
786 #ifdef STREAMER_VERBOSE_OUTPUT
787       CkPrintf("[%d] sending dimension flush to %d\n", CkMyPe(), destinationIndex); 
788 #endif
789       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
790     }
791     else {
792 #ifdef STREAMER_VERBOSE_OUTPUT
793       CkPrintf("[%d] sending intermediate dimension flush to %d\n", CkMyPe(), destinationIndex); 
794 #endif
795       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
796     }
797     messageBuffers[j] = NULL;
798   }
799   
800 }
801
802
803 template <class dtype>
804 void MeshStreamer<dtype>::flushIfIdle(){
805   // flush if (1) this is not a periodic call or 
806   //          (2) this is a periodic call and no sending took place
807   //              since the last time the function was invoked
808   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
809
810     if (numDataItemsBuffered_ != 0) {
811       flushToIntermediateDestinations();
812     }    
813     CkAssert(numDataItemsBuffered_ == 0); 
814     
815   }
816
817   hasSentRecently_ = false; 
818
819 }
820
821 template <class dtype>
822 void periodicProgressFunction(void *MeshStreamerObj, double time) {
823
824   MeshStreamer<dtype> *properObj = 
825     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
826
827   if (properObj->isPeriodicFlushEnabled()) {
828     properObj->flushIfIdle();
829     properObj->registerPeriodicProgressFunction();
830   }
831 }
832
833 template <class dtype>
834 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
835   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
836                  progressPeriodInMs_); 
837 }
838
839
840 template <class dtype>
841 class GroupMeshStreamer : public MeshStreamer<dtype> {
842 private:
843
844   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
845   MeshStreamerGroupClient<dtype> *clientObj_;
846
847   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
848     for (int i = 0; i < msg->numDataItems; i++) {
849       dtype &data = msg->getDataItem(i);
850       clientObj_->process(data);
851     }
852
853     if (MeshStreamer<dtype>::useStagedCompletion_) {
854 #ifdef STREAMER_VERBOSE_OUTPUT
855       envelope *env = UsrToEnv(msg);
856       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
857 #endif
858       markMessageReceived(msg->dimension, msg->finalMsgCount); 
859     }
860     else {
861       this->detectorLocalObj_->consume(msg->numDataItems);    
862     }
863
864     delete msg;
865   }
866
867   void localDeliver(dtype &dataItem) {
868     clientObj_->process(dataItem);
869     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
870       MeshStreamer<dtype>::detectorLocalObj_->consume();
871     }
872   }
873
874   int numElementsInClient() {
875     // client is a group - there is one element per PE
876     return CkNumPes();
877   }
878
879   int numLocalElementsInClient() {
880     return 1; 
881   }
882
883   void initLocalClients() {
884     // no action required
885   }
886
887 public:
888
889   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
890                     int *dimensionSizes, 
891                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
892                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
893    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
894                         0, yieldFlag, progressPeriodInMs) 
895   {
896     clientProxy_ = clientProxy; 
897     clientObj_ = 
898       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
899   }
900
901   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
902                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
903                     int bufferSize, bool yieldFlag = 0, 
904                     double progressPeriodInMs = -1.0)
905    :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
906                         yieldFlag, progressPeriodInMs) 
907   {
908     clientProxy_ = clientProxy; 
909     clientObj_ = 
910       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
911   }
912
913 };
914
915 template <class dtype>
916 class MeshStreamerClientIterator : public CkLocIterator {
917
918 public:
919   
920   CompletionDetector *detectorLocalObj_;
921   CkArray *clientArrMgr_;
922   MeshStreamerClientIterator(CompletionDetector *detectorObj, 
923                              CkArray *clientArrMgr) 
924     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
925
926   // CkLocMgr::iterate will call addLocation on all elements local to this PE
927   void addLocation(CkLocation &loc) {
928
929     MeshStreamerArrayClient<dtype> *clientObj = 
930       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
931
932     CkAssert(clientObj != NULL); 
933     clientObj->setDetector(detectorLocalObj_); 
934   }
935
936 };
937
938 template <class dtype, class itype>
939 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
940   
941 private:
942   
943   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
944   CkArray *clientArrayMgr_;
945   int numArrayElements_;
946   int numLocalArrayElements_;
947 #ifdef CACHE_ARRAY_METADATA
948   MeshStreamerArrayClient<dtype> **clientObjs_;
949   int *destinationPes_;
950   bool *isCachedArrayMetadata_;
951 #endif
952
953   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
954     itype arrayId = packedDataItem.arrayIndex; 
955
956     MeshStreamerArrayClient<dtype> *clientObj;
957 #ifdef CACHE_ARRAY_METADATA
958     clientObj = clientObjs_[arrayId];
959 #else
960     clientObj = clientProxy_[arrayId].ckLocal();
961 #endif
962
963     if (clientObj != NULL) {
964       clientObj->process(packedDataItem.dataItem);
965       if (MeshStreamer<ArrayDataItem<dtype, itype> >
966            ::useStagedCompletion_ == false) {
967         MeshStreamer<ArrayDataItem<dtype, itype> >
968          ::detectorLocalObj_->consume();
969       }
970     }
971     else { 
972       // array element is no longer present locally - redeliver using proxy
973       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
974     }
975   }
976
977   int numElementsInClient() {
978     return numArrayElements_;
979   }
980
981   int numLocalElementsInClient() {
982     return numLocalArrayElements_;
983   }
984
985   void initLocalClients() {
986     if (MeshStreamer<ArrayDataItem<dtype, itype> >
987          ::useStagedCompletion_ == false) {
988 #ifdef CACHE_ARRAY_METADATA
989       std::fill(isCachedArrayMetadata_, 
990                 isCachedArrayMetadata_ + numArrayElements_, false);
991
992       for (int i = 0; i < numArrayElements_; i++) {
993         clientObjs_[i] = clientProxy_[i].ckLocal();
994         if (clientObjs_[i] != NULL) {
995           clientObjs_[i]->setDetector(
996            MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
997         }
998       }
999 #else
1000       // set completion detector in local elements of the client
1001       CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1002       MeshStreamerClientIterator<dtype> clientIterator(
1003           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1004           clientProxy_.ckLocalBranch());
1005       clientLocMgr->iterate(clientIterator);
1006 #endif    
1007     }
1008     else {
1009       numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1010     }
1011   }
1012
1013   void commonInit() {
1014 #ifdef CACHE_ARRAY_METADATA
1015     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1016     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1017     destinationPes_ = new int[numArrayElements_];
1018     isCachedArrayMetadata_ = new bool[numArrayElements_];
1019     std::fill(isCachedArrayMetadata_, 
1020               isCachedArrayMetadata_ + numArrayElements_, false);
1021 #endif    
1022   }
1023
1024 public:
1025
1026   struct DataItemHandle {
1027     itype arrayIndex; 
1028     dtype *dataItem;
1029   };
1030
1031   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1032                     int *dimensionSizes, 
1033                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
1034                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1035     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1036                   maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
1037                   0, yieldFlag, progressPeriodInMs) 
1038   {
1039     clientProxy_ = clientProxy; 
1040     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1041     commonInit();
1042   }
1043
1044   ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
1045                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
1046                     int bufferSize, bool yieldFlag = 0, 
1047                     double progressPeriodInMs = -1.0)
1048     :MeshStreamer<ArrayDataItem<dtype,itype> >(
1049                   0, numDimensions, dimensionSizes, 
1050                   bufferSize, yieldFlag, progressPeriodInMs) 
1051   {
1052     clientProxy_ = clientProxy; 
1053     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1054     commonInit();
1055
1056   }
1057
1058   ~ArrayMeshStreamer() {
1059 #ifdef CACHE_ARRAY_METADATA
1060     delete [] clientObjs_;
1061     delete [] destinationPes_;
1062     delete [] isCachedArrayMetadata_; 
1063 #endif
1064   }
1065
1066   void init(CkArrayID senderArrayID, CkCallback startCb, 
1067             CkCallback endCb, int prio, bool usePeriodicFlushing) {
1068     CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
1069     int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
1070     MeshStreamer<ArrayDataItem<dtype, itype> >::init(
1071                  numLocalElements, startCb, endCb, prio,
1072                  usePeriodicFlushing); 
1073   }
1074
1075   void receiveAtDestination(
1076        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1077
1078     for (int i = 0; i < msg->numDataItems; i++) {
1079       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
1080       localDeliver(packedData);
1081     }
1082     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1083       markMessageReceived(msg->dimension, msg->finalMsgCount);
1084     }
1085
1086     delete msg;
1087   }
1088
1089   void insertData(dtype &dataItem, itype arrayIndex) {
1090
1091     // no data items should be submitted after all local contributors call done
1092     // and staged completion has begun
1093     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1094                ::stagedCompletionStarted()) == false);
1095
1096     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1097          ::useStagedCompletion_ == false) {
1098       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1099     }
1100     int destinationPe; 
1101 #ifdef CACHE_ARRAY_METADATA
1102   if (isCachedArrayMetadata_[arrayIndex]) {    
1103     destinationPe =  destinationPes_[arrayIndex];
1104   }
1105   else {
1106     destinationPe = 
1107       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1108     isCachedArrayMetadata_[arrayIndex] = true;
1109     destinationPes_[arrayIndex] = destinationPe;
1110   }
1111 #else 
1112   destinationPe = 
1113     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1114 #endif
1115
1116   ArrayDataItem<dtype, itype> packedDataItem;
1117     if (destinationPe == CkMyPe()) {
1118       // copying here is necessary - user code should not be 
1119       // passed back a reference to the original item
1120       packedDataItem.arrayIndex = arrayIndex; 
1121       packedDataItem.dataItem = dataItem;
1122       localDeliver(packedDataItem);
1123       return;
1124     }
1125
1126     // this implementation avoids copying an item before transfer into message
1127
1128     DataItemHandle tempHandle; 
1129     tempHandle.arrayIndex = arrayIndex; 
1130     tempHandle.dataItem = &dataItem;
1131
1132     MeshStreamer<ArrayDataItem<dtype, itype> >::
1133      insertData(&tempHandle, destinationPe);
1134
1135   }
1136
1137   int copyDataItemIntoMessage(
1138       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1139       void *dataItemHandle, bool copyIndirectly) {
1140
1141     if (copyIndirectly == true) {
1142       // newly inserted items are passed through a handle to avoid copying
1143       int numDataItems = destinationBuffer->numDataItems;
1144       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
1145       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1146         *(tempHandle->dataItem);
1147       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1148         tempHandle->arrayIndex;
1149       return ++destinationBuffer->numDataItems;
1150     }
1151     else {
1152       // this is an item received along the route to destination
1153       // we can copy it from the received message
1154       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1155               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1156     }
1157   }
1158
1159 };
1160
1161 #define CK_TEMPLATES_ONLY
1162 #include "NDMeshStreamer.def.h"
1163 #undef CK_TEMPLATES_ONLY
1164
1165 #endif