NDMeshStreamer: ignore invalid callbacks
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define CACHE_LOCATIONS
17 // #define SUPPORT_INCOMPLETE_MESH
18 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
19 // #define STREAMER_VERBOSE_OUTPUT
20
21 struct MeshLocation {
22   int dimension; 
23   int bufferIndex; 
24 }; 
25
26 template<class dtype>
27 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
28
29 public:
30
31   int finalMsgCount; 
32   int dimension; 
33   int numDataItems;
34   int *destinationPes;
35   dtype *dataItems;
36
37   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
38     finalMsgCount = -1; 
39   }
40
41   int addDataItem(const dtype &dataItem) {
42     dataItems[numDataItems] = dataItem;
43     return ++numDataItems; 
44   }
45
46   void markDestination(const int index, const int destinationPe) {
47     destinationPes[index] = destinationPe;
48   }
49
50   dtype &getDataItem(const int index) {
51     return dataItems[index];
52   }
53
54 };
55
56 template <class dtype>
57 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
58 private:
59   CompletionDetector *detectorLocalObj_;
60 public:
61   MeshStreamerArrayClient(){
62     detectorLocalObj_ = NULL; 
63   }
64   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
65   // would like to make it pure virtual but charm will try to
66   // instantiate the abstract class, leading to errors
67   virtual void process(dtype &data) {
68     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
69             "This virtual function should have been defined by the user.\n");
70   };     
71   void setDetector(CompletionDetector *detectorLocalObj) {
72     detectorLocalObj_ = detectorLocalObj;
73   }
74   void receiveRedeliveredItem(dtype data) {
75 #ifdef STREAMER_VERBOSE_OUTPUT
76     CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
77 #endif
78     if (detectorLocalObj_ != NULL) {
79       detectorLocalObj_->consume();
80     }
81     process(data);
82   }
83
84   void pup(PUP::er &p) {
85     CBase_MeshStreamerArrayClient<dtype>::pup(p);
86    }  
87
88 };
89
90 template <class dtype>
91 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
92
93 public:
94   virtual void process(dtype &data) = 0;
95
96 };
97
98 template <class dtype>
99 class MeshStreamer : public CBase_MeshStreamer<dtype> {
100
101 private:
102   int bufferSize_; 
103   int maxNumDataItemsBuffered_;
104   int numDataItemsBuffered_;
105
106   int numMembers_; 
107   int numDimensions_;
108   int *individualDimensionSizes_;
109   int *combinedDimensionSizes_;
110
111   int *startingIndexAtDimension_; 
112
113   int myIndex_;
114   int *myLocationIndex_;
115
116   CkCallback   userCallback_;
117   bool yieldFlag_;
118
119   double progressPeriodInMs_; 
120   bool isPeriodicFlushEnabled_; 
121   bool hasSentRecently_;
122   MeshStreamerMessage<dtype> ***dataBuffers_;
123
124   CProxy_CompletionDetector detector_;
125   int prio_;
126   int yieldCount_;
127
128 #ifdef CACHE_LOCATIONS
129   MeshLocation *cachedLocations_;
130   bool *isCached_; 
131 #endif
132
133
134   // only used for staged completion
135   int **cntMsgSent_;
136   int *cntMsgReceived_;
137   int *cntMsgExpected_;
138   int *cntFinished_; 
139   int dimensionToFlush_;
140   int numLocalDone_; 
141   int numLocalContributors_;
142
143   void storeMessage(int destinationPe, 
144                     const MeshLocation &destinationCoordinates, 
145                     void *dataItem, bool copyIndirectly = false);
146   virtual void localDeliver(dtype &dataItem) = 0; 
147
148   virtual int numElementsInClient() = 0;
149   virtual int numLocalElementsInClient() = 0; 
150
151   virtual void initLocalClients() = 0;
152
153   void sendLargestBuffer();
154   void flushToIntermediateDestinations();
155   void flushDimension(int dimension, bool sendMsgCounts = false); 
156
157 protected:
158
159   bool useStagedCompletion_;
160   CompletionDetector *detectorLocalObj_;
161   virtual int copyDataItemIntoMessage(
162               MeshStreamerMessage<dtype> *destinationBuffer, 
163               void *dataItemHandle, bool copyIndirectly = false);
164   MeshLocation determineLocation(int destinationPe, 
165                                  int dimensionReceivedAlong);
166 public:
167
168   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
169                int *dimensionSizes, int bufferSize,
170                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
171   ~MeshStreamer();
172
173   // entry
174   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
175   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
176   void flushIfIdle();
177   void finish();
178
179   // non entry
180   bool isPeriodicFlushEnabled() {
181     return isPeriodicFlushEnabled_;
182   }
183   virtual void insertData(dtype &dataItem, int destinationPe); 
184   void insertData(void *dataItemHandle, int destinationPe);
185   void associateCallback(int numContributors, 
186                          CkCallback startCb, CkCallback endCb, 
187                          CProxy_CompletionDetector detector,
188                          int prio, bool usePeriodicFlushing);
189   void registerPeriodicProgressFunction();
190
191   // flushing begins only after enablePeriodicFlushing has been invoked
192
193   void enablePeriodicFlushing(){
194     isPeriodicFlushEnabled_ = true; 
195     registerPeriodicProgressFunction();
196   }
197
198   void done(int numContributorsFinished = 1) {
199
200     if (useStagedCompletion_) {
201       numLocalDone_ += numContributorsFinished; 
202       if (numLocalDone_ == numLocalContributors_) {
203         startStagedCompletion();
204       }
205     }
206     else {
207       detectorLocalObj_->done(numContributorsFinished);
208     }
209   }
210
211   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
212             int prio, bool usePeriodicFlushing);
213   
214   bool stagedCompletionStarted() {    
215     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
216   }
217
218   void startStagedCompletion() {          
219     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
220       flushDimension(dimensionToFlush_, true);
221     }
222     dimensionToFlush_--;
223
224     checkForCompletedStages();
225   }
226
227   void markMessageReceived(int dimension, int finalCount) {
228     cntMsgReceived_[dimension]++;
229     if (finalCount != -1) {
230       cntFinished_[dimension]++; 
231       cntMsgExpected_[dimension] += finalCount; 
232 #ifdef STREAMER_VERBOSE_OUTPUT
233       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
234                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
235                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
236                cntMsgReceived_[dimension]); 
237 #endif
238     }  
239     if (dimensionToFlush_ != numDimensions_ - 1) {
240       checkForCompletedStages();
241     }
242   }
243
244   void checkForCompletedStages() {
245
246     while (cntFinished_[dimensionToFlush_ + 1] == 
247            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
248            cntMsgExpected_[dimensionToFlush_ + 1] == 
249            cntMsgReceived_[dimensionToFlush_ + 1]) {
250       if (dimensionToFlush_ == -1) {
251 #ifdef STREAMER_VERBOSE_OUTPUT
252         CkPrintf("[%d] contribute\n", CkMyPe()); 
253 #endif
254         CkAssert(numDataItemsBuffered_ == 0); 
255         if (!userCallback_.isInvalid()) {
256           this->contribute(userCallback_);
257           userCallback_ = CkCallback();
258         }
259         return; 
260       }
261       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
262         flushDimension(dimensionToFlush_, true); 
263       }
264       dimensionToFlush_--; 
265     }
266   }
267
268 };
269
270 template <class dtype>
271 MeshStreamer<dtype>::MeshStreamer(
272                      int maxNumDataItemsBuffered, int numDimensions, 
273                      int *dimensionSizes, 
274                      int bufferSize,
275                      bool yieldFlag, 
276                      double progressPeriodInMs)
277  :numDimensions_(numDimensions), 
278   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
279   yieldFlag_(yieldFlag), 
280   progressPeriodInMs_(progressPeriodInMs), 
281   bufferSize_(bufferSize)
282 {
283
284   int sumAlongAllDimensions = 0;   
285   individualDimensionSizes_ = new int[numDimensions_];
286   combinedDimensionSizes_ = new int[numDimensions_ + 1];
287   myLocationIndex_ = new int[numDimensions_];
288   startingIndexAtDimension_ = new int[numDimensions_ + 1]; 
289   memcpy(individualDimensionSizes_, dimensionSizes, 
290          numDimensions * sizeof(int)); 
291   combinedDimensionSizes_[0] = 1; 
292   for (int i = 0; i < numDimensions; i++) {
293     sumAlongAllDimensions += individualDimensionSizes_[i];
294     combinedDimensionSizes_[i + 1] = 
295       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
296   }
297
298   // a bufferSize input of 0 indicates it should be calculated by the library
299   if (bufferSize_ == 0) {
300     CkAssert(maxNumDataItemsBuffered_ > 0);
301     // except for personalized messages, the buffers for dimensions with the 
302     //   same index as the sender's are not used
303     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
304       / (sumAlongAllDimensions - numDimensions_ + 1); 
305   }
306   else {
307     maxNumDataItemsBuffered_ = 
308       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1);
309   }
310
311   if (bufferSize_ <= 0) {
312     bufferSize_ = 1; 
313     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
314              "is invalid. Defaulting to a single buffer per destination.\n");
315   }
316   numDataItemsBuffered_ = 0; 
317   numMembers_ = CkNumPes(); 
318
319   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
320   for (int i = 0; i < numDimensions; i++) {
321     int numMembersAlongDimension = individualDimensionSizes_[i]; 
322     dataBuffers_[i] = 
323       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
324     for (int j = 0; j < numMembersAlongDimension; j++) {
325       dataBuffers_[i][j] = NULL;
326     }
327   }
328
329   myIndex_ = CkMyPe();
330   int remainder = myIndex_;
331   startingIndexAtDimension_[numDimensions_] = 0;
332   for (int i = numDimensions_ - 1; i >= 0; i--) {    
333     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
334     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
335     remainder -= dimensionOffset; 
336     startingIndexAtDimension_[i] = startingIndexAtDimension_[i+1] + dimensionOffset; 
337   }
338
339   isPeriodicFlushEnabled_ = false; 
340   detectorLocalObj_ = NULL;
341
342 #ifdef CACHE_LOCATIONS
343   cachedLocations_ = new MeshLocation[numMembers_];
344   isCached_ = new bool[numMembers_];
345   std::fill(isCached_, isCached_ + numMembers_, false);
346 #endif
347
348   cntMsgSent_ = NULL; 
349   cntMsgReceived_ = NULL; 
350   cntMsgExpected_ = NULL; 
351   cntFinished_ = NULL; 
352
353 }
354
355 template <class dtype>
356 MeshStreamer<dtype>::~MeshStreamer() {
357
358   for (int i = 0; i < numDimensions_; i++) {
359     for (int j=0; j < individualDimensionSizes_[i]; j++) {
360       delete[] dataBuffers_[i][j]; 
361     }
362     delete[] dataBuffers_[i]; 
363   }
364
365   delete[] individualDimensionSizes_;
366   delete[] combinedDimensionSizes_; 
367   delete[] myLocationIndex_;
368   delete[] startingIndexAtDimension_;
369
370 #ifdef CACHE_LOCATIONS
371   delete[] cachedLocations_;
372   delete[] isCached_; 
373 #endif
374
375   if (cntMsgSent_ != NULL) {
376     for (int i = 0; i < numDimensions_; i++) {
377       delete[] cntMsgSent_[i]; 
378     }
379     delete[] cntMsgSent_; 
380     delete[] cntMsgReceived_; 
381     delete[] cntMsgExpected_; 
382     delete[] cntFinished_;
383   }
384
385 }
386
387 template <class dtype>
388 inline
389 MeshLocation MeshStreamer<dtype>::determineLocation(
390                                   int destinationPe, 
391                                   int dimensionReceivedAlong) {
392
393 #ifdef CACHE_LOCATIONS
394   if (isCached_[destinationPe]) {    
395     return cachedLocations_[destinationPe]; 
396   }
397 #endif
398
399   MeshLocation destinationLocation;
400   int remainder = destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
401   int dimensionIndex; 
402   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {        
403     dimensionIndex = remainder / combinedDimensionSizes_[i];
404     
405     if (dimensionIndex != myLocationIndex_[i]) {
406       destinationLocation.dimension = i; 
407       destinationLocation.bufferIndex = dimensionIndex; 
408 #ifdef CACHE_LOCATIONS
409       cachedLocations_[destinationPe] = destinationLocation;
410       isCached_[destinationPe] = true; 
411 #endif
412       return destinationLocation;
413     }
414
415     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
416   }
417
418   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
419           "equal to sender's PE. This is unexpected and may cause errors.\n"); 
420   // to prevent warnings
421   return destinationLocation; 
422 }
423
424 template <class dtype>
425 inline 
426 int MeshStreamer<dtype>::copyDataItemIntoMessage(
427                          MeshStreamerMessage<dtype> *destinationBuffer,
428                          void *dataItemHandle, bool copyIndirectly) {
429   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
430 }
431
432 template <class dtype>
433 inline
434 void MeshStreamer<dtype>::storeMessage(
435                           int destinationPe, 
436                           const MeshLocation& destinationLocation,
437                           void *dataItem, bool copyIndirectly) {
438
439   int dimension = destinationLocation.dimension;
440   int bufferIndex = destinationLocation.bufferIndex; 
441   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
442
443   // allocate new message if necessary
444   if (messageBuffers[bufferIndex] == NULL) {
445     if (dimension == 0) {
446       // personalized messages do not require destination indices
447       messageBuffers[bufferIndex] = 
448         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
449     }
450     else {
451       messageBuffers[bufferIndex] = 
452         new (bufferSize_, bufferSize_, sizeof(int)) 
453         MeshStreamerMessage<dtype>(dimension);
454     }
455     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
456     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
457     CkAssert(messageBuffers[bufferIndex] != NULL);
458   }
459   
460   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
461   int numBuffered = 
462     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
463   if (dimension != 0) {
464     destinationBuffer->markDestination(numBuffered-1, destinationPe);
465   }  
466   numDataItemsBuffered_++;
467
468   // send if buffer is full
469   if (numBuffered == bufferSize_) {
470
471     int destinationIndex;
472
473     destinationIndex = myIndex_ + 
474       (bufferIndex - myLocationIndex_[dimension]) * 
475       combinedDimensionSizes_[dimension];
476
477     if (dimension == 0) {
478 #ifdef STREAMER_VERBOSE_OUTPUT
479       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
480 #endif
481       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
482     }
483     else {
484 #ifdef STREAMER_VERBOSE_OUTPUT
485       CkPrintf("[%d] sending intermediate to %d\n", CkMyPe(), destinationIndex); 
486 #endif
487       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
488     }
489
490     if (useStagedCompletion_) {
491       cntMsgSent_[dimension][bufferIndex]++; 
492     }
493
494     messageBuffers[bufferIndex] = NULL;
495     numDataItemsBuffered_ -= numBuffered; 
496     hasSentRecently_ = true; 
497
498   }
499   // send if total buffering capacity has been reached
500   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
501     sendLargestBuffer();
502     hasSentRecently_ = true; 
503   }
504
505 }
506
507 template <class dtype>
508 inline
509 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
510   const static bool copyIndirectly = true;
511
512   // treat newly inserted items as if they were received along
513   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
514   MeshLocation destinationLocation = determineLocation(destinationPe, 
515                                                        numDimensions_);
516   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
517                copyIndirectly); 
518   // release control to scheduler if requested by the user, 
519   //   assume caller is threaded entry
520   if (yieldFlag_ && ++yieldCount_ == 1024) {
521     yieldCount_ = 0; 
522     CthYield();
523   }
524
525 }
526
527 template <class dtype>
528 inline
529 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
530
531   // no data items should be submitted after all local contributors call done 
532   // and staged completion has begun
533   CkAssert(stagedCompletionStarted() == false);
534
535   if (!useStagedCompletion_) {
536     detectorLocalObj_->produce();
537   }
538   if (destinationPe == CkMyPe()) {
539     // copying here is necessary - user code should not be 
540     // passed back a reference to the original item
541     dtype dataItemCopy = dataItem;
542     localDeliver(dataItemCopy);
543     return;
544   }
545
546   insertData((void *) &dataItem, destinationPe);
547 }
548
549 template <class dtype>
550 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
551                                CkCallback endCb, int prio, 
552                                bool usePeriodicFlushing) {
553   useStagedCompletion_ = true; 
554   // allocate memory on first use
555   if (cntMsgSent_ == NULL) {
556     cntMsgSent_ = new int*[numDimensions_]; 
557     cntMsgReceived_ = new int[numDimensions_];
558     cntMsgExpected_ = new int[numDimensions_];
559     cntFinished_ = new int[numDimensions_]; 
560
561     for (int i = 0; i < numDimensions_; i++) {
562       cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
563     }
564   }
565
566
567   for (int i = 0; i < numDimensions_; i++) {
568     std::fill(cntMsgSent_[i], 
569               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
570     cntMsgReceived_[i] = 0;
571     cntMsgExpected_[i] = 0; 
572     cntFinished_[i] = 0; 
573   }
574   dimensionToFlush_ = numDimensions_ - 1;
575
576   yieldCount_ = 0; 
577   userCallback_ = endCb; 
578   prio_ = prio;
579
580   numLocalDone_ = 0; 
581   numLocalContributors_ = numLocalContributors; 
582   initLocalClients();
583
584   if (numLocalContributors_ == 0) {
585     startStagedCompletion();
586   }
587
588   hasSentRecently_ = false;
589   if (usePeriodicFlushing) {
590     enablePeriodicFlushing();
591   }
592
593   this->contribute(startCb);
594 }
595
596 template <class dtype>
597 void MeshStreamer<dtype>::associateCallback(
598                           int numContributors,
599                           CkCallback startCb, CkCallback endCb, 
600                           CProxy_CompletionDetector detector, 
601                           int prio, bool usePeriodicFlushing) {
602
603   useStagedCompletion_ = false; 
604   yieldCount_ = 0; 
605   prio_ = prio;
606   userCallback_ = endCb; 
607   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushIfIdle(), 
608                      this->thisProxy);
609   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
610                     this->thisProxy);
611   detector_ = detector;      
612   detectorLocalObj_ = detector_.ckLocalBranch();
613   initLocalClients();
614
615   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, 
616                                      finish , 0);
617   
618   if (progressPeriodInMs_ <= 0) {
619     CkPrintf("Using completion detection in NDMeshStreamer requires"
620              " setting a valid periodic flush period. Defaulting"
621              " to 10 ms\n");
622     progressPeriodInMs_ = 10;
623   }
624   
625   hasSentRecently_ = false; 
626   if (usePeriodicFlushing) {
627     enablePeriodicFlushing();
628   }
629
630 }
631
632 template <class dtype>
633 void MeshStreamer<dtype>::finish() {
634   isPeriodicFlushEnabled_ = false; 
635
636   if (!userCallback_.isInvalid()) {
637     this->contribute(userCallback_);
638     userCallback_ = CkCallback();      // nullify the current callback
639   }
640
641 }
642
643 template <class dtype>
644 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
645
646   int destinationPe, lastDestinationPe; 
647   MeshLocation destinationLocation;
648
649   lastDestinationPe = -1;
650   for (int i = 0; i < msg->numDataItems; i++) {
651     destinationPe = msg->destinationPes[i];
652     dtype &dataItem = msg->getDataItem(i);
653     if (destinationPe == CkMyPe()) {
654       localDeliver(dataItem);
655     }
656     else {
657       if (destinationPe != lastDestinationPe) {
658         // do this once per sequence of items with the same destination
659         destinationLocation = determineLocation(destinationPe, msg->dimension);
660       }
661       storeMessage(destinationPe, destinationLocation, &dataItem);   
662     }
663     lastDestinationPe = destinationPe; 
664   }
665
666   if (useStagedCompletion_) {
667     markMessageReceived(msg->dimension, msg->finalMsgCount); 
668   }
669
670   delete msg;
671
672 }
673
674 template <class dtype>
675 void MeshStreamer<dtype>::sendLargestBuffer() {
676
677   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
678   MeshStreamerMessage<dtype> ** messageBuffers; 
679   MeshStreamerMessage<dtype> *destinationBuffer; 
680
681   for (int i = 0; i < numDimensions_; i++) {
682
683     messageBuffers = dataBuffers_[i]; 
684     numBuffers = individualDimensionSizes_[i]; 
685
686     flushDimension = i; 
687     maxSize = 0;    
688     for (int j = 0; j < numBuffers; j++) {
689       if (messageBuffers[j] != NULL && 
690           messageBuffers[j]->numDataItems > maxSize) {
691         maxSize = messageBuffers[j]->numDataItems;
692         flushIndex = j; 
693       }
694     }
695
696     if (maxSize > 0) {
697
698       messageBuffers = dataBuffers_[flushDimension]; 
699       destinationBuffer = messageBuffers[flushIndex];
700       destinationIndex = myIndex_ + 
701         (flushIndex - myLocationIndex_[flushDimension]) * 
702         combinedDimensionSizes_[flushDimension] ;
703
704       // not sending the full buffer, shrink the message size
705       envelope *env = UsrToEnv(destinationBuffer);
706       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
707                         (bufferSize_ - destinationBuffer->numDataItems));
708       *((int *) env->getPrioPtr()) = prio_;
709
710       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
711
712       if (flushDimension == 0) {
713 #ifdef STREAMER_VERBOSE_OUTPUT
714         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
715 #endif
716         this->thisProxy[destinationIndex].
717           receiveAtDestination(destinationBuffer);
718       }
719       else {
720 #ifdef STREAMER_VERBOSE_OUTPUT
721         CkPrintf("[%d] sending intermediate flush to %d\n", CkMyPe(), destinationIndex); 
722 #endif
723         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
724       }
725
726       if (useStagedCompletion_) {
727         cntMsgSent_[i][flushIndex]++; 
728       }
729
730       messageBuffers[flushIndex] = NULL;
731
732     }
733
734   }
735 }
736
737 template <class dtype>
738 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
739   
740   for (int i = 0; i < numDimensions_; i++) {
741     flushDimension(i); 
742   }
743 }
744
745 template <class dtype>
746 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
747 #ifdef STREAMER_VERBOSE_OUTPUT
748   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", CkMyPe(), dimension, sendMsgCounts); 
749 #endif
750   MeshStreamerMessage<dtype> **messageBuffers; 
751   MeshStreamerMessage<dtype> *destinationBuffer; 
752   int destinationIndex, numBuffers; 
753
754   messageBuffers = dataBuffers_[dimension]; 
755   numBuffers = individualDimensionSizes_[dimension]; 
756
757   for (int j = 0; j < numBuffers; j++) {
758
759     if(messageBuffers[j] == NULL) {      
760       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
761         messageBuffers[j] = 
762           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
763         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
764         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
765       }
766       else {
767         continue; 
768       } 
769     }
770
771     destinationBuffer = messageBuffers[j];
772     destinationIndex = myIndex_ + 
773       (j - myLocationIndex_[dimension]) * 
774       combinedDimensionSizes_[dimension] ;
775
776     if (destinationBuffer->numDataItems != 0) {
777       // not sending the full buffer, shrink the message size
778       envelope *env = UsrToEnv(destinationBuffer);
779       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
780                         (bufferSize_ - destinationBuffer->numDataItems));
781       *((int *) env->getPrioPtr()) = prio_;
782     }
783     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
784
785     if (useStagedCompletion_) {
786       cntMsgSent_[dimension][j]++;
787       if (sendMsgCounts) {
788         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
789       }
790     }
791
792     if (dimension == 0) {
793 #ifdef STREAMER_VERBOSE_OUTPUT
794       CkPrintf("[%d] sending dimension flush to %d\n", CkMyPe(), destinationIndex); 
795 #endif
796       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
797     }
798     else {
799 #ifdef STREAMER_VERBOSE_OUTPUT
800       CkPrintf("[%d] sending intermediate dimension flush to %d\n", CkMyPe(), destinationIndex); 
801 #endif
802       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
803     }
804     messageBuffers[j] = NULL;
805   }
806   
807 }
808
809
810 template <class dtype>
811 void MeshStreamer<dtype>::flushIfIdle(){
812   // flush if (1) this is not a periodic call or 
813   //          (2) this is a periodic call and no sending took place
814   //              since the last time the function was invoked
815   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
816
817     if (numDataItemsBuffered_ != 0) {
818       flushToIntermediateDestinations();
819     }    
820     CkAssert(numDataItemsBuffered_ == 0); 
821     
822   }
823
824   hasSentRecently_ = false; 
825
826 }
827
828 template <class dtype>
829 void periodicProgressFunction(void *MeshStreamerObj, double time) {
830
831   MeshStreamer<dtype> *properObj = 
832     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
833
834   if (properObj->isPeriodicFlushEnabled()) {
835     properObj->flushIfIdle();
836     properObj->registerPeriodicProgressFunction();
837   }
838 }
839
840 template <class dtype>
841 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
842   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
843                  progressPeriodInMs_); 
844 }
845
846
847 template <class dtype>
848 class GroupMeshStreamer : public MeshStreamer<dtype> {
849 private:
850
851   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
852   MeshStreamerGroupClient<dtype> *clientObj_;
853
854   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
855     for (int i = 0; i < msg->numDataItems; i++) {
856       dtype &data = msg->getDataItem(i);
857       clientObj_->process(data);
858     }
859
860     if (MeshStreamer<dtype>::useStagedCompletion_) {
861 #ifdef STREAMER_VERBOSE_OUTPUT
862       envelope *env = UsrToEnv(msg);
863       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
864 #endif
865       markMessageReceived(msg->dimension, msg->finalMsgCount); 
866     }
867     else {
868       this->detectorLocalObj_->consume(msg->numDataItems);    
869     }
870
871     delete msg;
872   }
873
874   void localDeliver(dtype &dataItem) {
875     clientObj_->process(dataItem);
876     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
877       MeshStreamer<dtype>::detectorLocalObj_->consume();
878     }
879   }
880
881   int numElementsInClient() {
882     // client is a group - there is one element per PE
883     return CkNumPes();
884   }
885
886   int numLocalElementsInClient() {
887     return 1; 
888   }
889
890   void initLocalClients() {
891     // no action required
892   }
893
894 public:
895
896   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
897                     int *dimensionSizes, 
898                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
899                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
900    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
901                         0, yieldFlag, progressPeriodInMs) 
902   {
903     clientProxy_ = clientProxy; 
904     clientObj_ = 
905       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
906   }
907
908   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
909                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
910                     int bufferSize, bool yieldFlag = 0, 
911                     double progressPeriodInMs = -1.0)
912    :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
913                         yieldFlag, progressPeriodInMs) 
914   {
915     clientProxy_ = clientProxy; 
916     clientObj_ = 
917       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
918   }
919
920 };
921
922 template <class dtype>
923 class MeshStreamerClientIterator : public CkLocIterator {
924
925 public:
926   
927   CompletionDetector *detectorLocalObj_;
928   CkArray *clientArrMgr_;
929   MeshStreamerClientIterator(CompletionDetector *detectorObj, 
930                              CkArray *clientArrMgr) 
931     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
932
933   // CkLocMgr::iterate will call addLocation on all elements local to this PE
934   void addLocation(CkLocation &loc) {
935
936     MeshStreamerArrayClient<dtype> *clientObj = 
937       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
938
939     CkAssert(clientObj != NULL); 
940     clientObj->setDetector(detectorLocalObj_); 
941   }
942
943 };
944
945 template <class dtype, class itype>
946 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
947   
948 private:
949   
950   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
951   CkArray *clientArrayMgr_;
952   int numArrayElements_;
953   int numLocalArrayElements_;
954 #ifdef CACHE_ARRAY_METADATA
955   MeshStreamerArrayClient<dtype> **clientObjs_;
956   int *destinationPes_;
957   bool *isCachedArrayMetadata_;
958 #endif
959
960   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
961     itype arrayId = packedDataItem.arrayIndex; 
962
963     MeshStreamerArrayClient<dtype> *clientObj;
964 #ifdef CACHE_ARRAY_METADATA
965     clientObj = clientObjs_[arrayId];
966 #else
967     clientObj = clientProxy_[arrayId].ckLocal();
968 #endif
969
970     if (clientObj != NULL) {
971       clientObj->process(packedDataItem.dataItem);
972       if (MeshStreamer<ArrayDataItem<dtype, itype> >
973            ::useStagedCompletion_ == false) {
974         MeshStreamer<ArrayDataItem<dtype, itype> >
975          ::detectorLocalObj_->consume();
976       }
977     }
978     else { 
979       // array element is no longer present locally - redeliver using proxy
980       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
981     }
982   }
983
984   int numElementsInClient() {
985     return numArrayElements_;
986   }
987
988   int numLocalElementsInClient() {
989     return numLocalArrayElements_;
990   }
991
992   void initLocalClients() {
993     if (MeshStreamer<ArrayDataItem<dtype, itype> >
994          ::useStagedCompletion_ == false) {
995 #ifdef CACHE_ARRAY_METADATA
996       std::fill(isCachedArrayMetadata_, 
997                 isCachedArrayMetadata_ + numArrayElements_, false);
998
999       for (int i = 0; i < numArrayElements_; i++) {
1000         clientObjs_[i] = clientProxy_[i].ckLocal();
1001         if (clientObjs_[i] != NULL) {
1002           clientObjs_[i]->setDetector(
1003            MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
1004         }
1005       }
1006 #else
1007       // set completion detector in local elements of the client
1008       CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1009       MeshStreamerClientIterator<dtype> clientIterator(
1010           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1011           clientProxy_.ckLocalBranch());
1012       clientLocMgr->iterate(clientIterator);
1013 #endif    
1014     }
1015     else {
1016       numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1017     }
1018   }
1019
1020   void commonInit() {
1021 #ifdef CACHE_ARRAY_METADATA
1022     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1023     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1024     destinationPes_ = new int[numArrayElements_];
1025     isCachedArrayMetadata_ = new bool[numArrayElements_];
1026     std::fill(isCachedArrayMetadata_, 
1027               isCachedArrayMetadata_ + numArrayElements_, false);
1028 #endif    
1029   }
1030
1031 public:
1032
1033   struct DataItemHandle {
1034     itype arrayIndex; 
1035     dtype *dataItem;
1036   };
1037
1038   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1039                     int *dimensionSizes, 
1040                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
1041                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1042     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1043                   maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
1044                   0, yieldFlag, progressPeriodInMs) 
1045   {
1046     clientProxy_ = clientProxy; 
1047     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1048     commonInit();
1049   }
1050
1051   ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
1052                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
1053                     int bufferSize, bool yieldFlag = 0, 
1054                     double progressPeriodInMs = -1.0)
1055     :MeshStreamer<ArrayDataItem<dtype,itype> >(
1056                   0, numDimensions, dimensionSizes, 
1057                   bufferSize, yieldFlag, progressPeriodInMs) 
1058   {
1059     clientProxy_ = clientProxy; 
1060     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1061     commonInit();
1062
1063   }
1064
1065   ~ArrayMeshStreamer() {
1066 #ifdef CACHE_ARRAY_METADATA
1067     delete [] clientObjs_;
1068     delete [] destinationPes_;
1069     delete [] isCachedArrayMetadata_; 
1070 #endif
1071   }
1072
1073   void init(CkArrayID senderArrayID, CkCallback startCb, 
1074             CkCallback endCb, int prio, bool usePeriodicFlushing) {
1075     CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
1076     int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
1077     MeshStreamer<ArrayDataItem<dtype, itype> >::init(
1078                  numLocalElements, startCb, endCb, prio,
1079                  usePeriodicFlushing); 
1080   }
1081
1082   void receiveAtDestination(
1083        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1084
1085     for (int i = 0; i < msg->numDataItems; i++) {
1086       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
1087       localDeliver(packedData);
1088     }
1089     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1090       markMessageReceived(msg->dimension, msg->finalMsgCount);
1091     }
1092
1093     delete msg;
1094   }
1095
1096   void insertData(dtype &dataItem, itype arrayIndex) {
1097
1098     // no data items should be submitted after all local contributors call done
1099     // and staged completion has begun
1100     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1101                ::stagedCompletionStarted()) == false);
1102
1103     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1104          ::useStagedCompletion_ == false) {
1105       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1106     }
1107     int destinationPe; 
1108 #ifdef CACHE_ARRAY_METADATA
1109   if (isCachedArrayMetadata_[arrayIndex]) {    
1110     destinationPe =  destinationPes_[arrayIndex];
1111   }
1112   else {
1113     destinationPe = 
1114       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1115     isCachedArrayMetadata_[arrayIndex] = true;
1116     destinationPes_[arrayIndex] = destinationPe;
1117   }
1118 #else 
1119   destinationPe = 
1120     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1121 #endif
1122
1123   ArrayDataItem<dtype, itype> packedDataItem;
1124     if (destinationPe == CkMyPe()) {
1125       // copying here is necessary - user code should not be 
1126       // passed back a reference to the original item
1127       packedDataItem.arrayIndex = arrayIndex; 
1128       packedDataItem.dataItem = dataItem;
1129       localDeliver(packedDataItem);
1130       return;
1131     }
1132
1133     // this implementation avoids copying an item before transfer into message
1134
1135     DataItemHandle tempHandle; 
1136     tempHandle.arrayIndex = arrayIndex; 
1137     tempHandle.dataItem = &dataItem;
1138
1139     MeshStreamer<ArrayDataItem<dtype, itype> >::
1140      insertData(&tempHandle, destinationPe);
1141
1142   }
1143
1144   int copyDataItemIntoMessage(
1145       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1146       void *dataItemHandle, bool copyIndirectly) {
1147
1148     if (copyIndirectly == true) {
1149       // newly inserted items are passed through a handle to avoid copying
1150       int numDataItems = destinationBuffer->numDataItems;
1151       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
1152       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1153         *(tempHandle->dataItem);
1154       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1155         tempHandle->arrayIndex;
1156       return ++destinationBuffer->numDataItems;
1157     }
1158     else {
1159       // this is an item received along the route to destination
1160       // we can copy it from the received message
1161       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1162               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1163     }
1164   }
1165
1166 };
1167
1168 #define CK_TEMPLATES_ONLY
1169 #include "NDMeshStreamer.def.h"
1170 #undef CK_TEMPLATES_ONLY
1171
1172 #endif