NDMeshStreamer: bug fixes to staged completion scheme
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define DEBUG_STREAMER
17 // #define CACHE_LOCATIONS
18 // #define SUPPORT_INCOMPLETE_MESH
19 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
20 // #define STREAMER_EXPERIMENTAL
21 // #define STREAMER_VERBOSE_OUTPUT
22 // #define STAGED_COMPLETION
23
24 struct MeshLocation {
25   int dimension; 
26   int bufferIndex; 
27 }; 
28
29 template<class dtype>
30 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
31
32 public:
33
34 #ifdef STAGED_COMPLETION
35   int finalMsgCount; 
36 #endif
37   int numDataItems;
38   int *destinationPes;
39   dtype *dataItems;
40
41   MeshStreamerMessage(): numDataItems(0) {
42 #ifdef STAGED_COMPLETION
43     finalMsgCount = -1; 
44 #endif
45   }
46
47   int addDataItem(const dtype &dataItem) {
48     dataItems[numDataItems] = dataItem;
49     return ++numDataItems; 
50   }
51
52   void markDestination(const int index, const int destinationPe) {
53     destinationPes[index] = destinationPe;
54   }
55
56   dtype &getDataItem(const int index) {
57     return dataItems[index];
58   }
59
60 };
61
62 template <class dtype>
63 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
64 private:
65   CompletionDetector *detectorLocalObj_;
66 public:
67   MeshStreamerArrayClient(){}
68   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
69   // would like to make it pure virtual but charm will try to
70   // instantiate the abstract class, leading to errors
71   virtual void process(dtype &data) {
72     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
73             "This virtual function should have been defined by the user.\n");
74   };     
75   void setDetector(CompletionDetector *detectorLocalObj) {
76     detectorLocalObj_ = detectorLocalObj;
77   }
78   void receiveRedeliveredItem(dtype data) {
79 #ifdef STREAMER_VERBOSE_OUTPUT
80     CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
81 #endif
82     detectorLocalObj_->consume();
83     process(data);
84   }
85
86   void pup(PUP::er &p) {
87     CBase_MeshStreamerArrayClient<dtype>::pup(p);
88    }  
89
90 };
91
92 template <class dtype>
93 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
94
95 public:
96   virtual void process(dtype &data) = 0;
97
98 };
99
100 template <class dtype>
101 class MeshStreamer : public CBase_MeshStreamer<dtype> {
102
103 private:
104   int bufferSize_; 
105   int maxNumDataItemsBuffered_;
106   int numDataItemsBuffered_;
107
108   int numMembers_; 
109   int numDimensions_;
110   int *individualDimensionSizes_;
111   int *combinedDimensionSizes_;
112
113   int myIndex_;
114   int *myLocationIndex_;
115
116   CkCallback   userCallback_;
117   bool yieldFlag_;
118
119   double progressPeriodInMs_; 
120   bool isPeriodicFlushEnabled_; 
121   bool hasSentRecently_;
122 #ifdef STREAMER_EXPERIMENTAL
123   bool hasSentPreviously_;
124   bool immediateMode_; 
125 #endif
126   MeshStreamerMessage<dtype> ***dataBuffers_;
127
128   CProxy_CompletionDetector detector_;
129   int prio_;
130   int yieldCount_;
131
132 #ifdef CACHE_LOCATIONS
133   MeshLocation *cachedLocations_;
134   bool *isCached_; 
135 #endif
136
137
138   // only used for staged completion
139   int **cntMsgSent_;
140   int *cntMsgReceived_;
141   int *cntMsgExpected_;
142   int *cntFinished_; 
143   int dimensionToFlush_;
144   int numLocalDone_; 
145
146
147   void storeMessage(int destinationPe, 
148                     const MeshLocation &destinationCoordinates, 
149                     void *dataItem, bool copyIndirectly = false);
150   virtual void localDeliver(dtype &dataItem) = 0; 
151
152   virtual int numElementsInClient() = 0;
153   virtual int numLocalElementsInClient() = 0; 
154
155   virtual void initLocalClients() = 0;
156
157   void sendLargestBuffer();
158   void flushToIntermediateDestinations();
159   void flushDimension(int dimension, bool sendMsgCounts = false); 
160
161 protected:
162
163   CompletionDetector *detectorLocalObj_;
164   virtual int copyDataItemIntoMessage(
165               MeshStreamerMessage<dtype> *destinationBuffer, 
166               void *dataItemHandle, bool copyIndirectly = false);
167   MeshLocation determineLocation(int destinationPe);
168 public:
169
170   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
171                int *dimensionSizes,
172                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
173   ~MeshStreamer();
174
175   // entry
176   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
177   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
178   void flushDirect();
179   void finish();
180
181   // non entry
182   bool isPeriodicFlushEnabled() {
183     return isPeriodicFlushEnabled_;
184   }
185   virtual void insertData(dtype &dataItem, int destinationPe); 
186   void insertData(void *dataItemHandle, int destinationPe);
187   void associateCallback(int numContributors, 
188                          CkCallback startCb, CkCallback endCb, 
189                          CProxy_CompletionDetector detector,
190                          int prio);
191   void flushAllBuffers();
192   void registerPeriodicProgressFunction();
193
194   // flushing begins only after enablePeriodicFlushing has been invoked
195
196   void enablePeriodicFlushing(){
197     isPeriodicFlushEnabled_ = true; 
198     registerPeriodicProgressFunction();
199   }
200
201   void done(int numContributorsFinished = 1) {
202 #ifdef STAGED_COMPLETION
203     numLocalDone_ += numContributorsFinished; 
204     if (numLocalDone_ == numLocalElementsInClient()) {
205       startStagedCompletion();
206     }
207 #else
208     detectorLocalObj_->done(numContributorsFinished);
209 #endif
210   }
211
212   void init(CkCallback startCb, CkCallback endCb, int prio);
213   
214   void startStagedCompletion() {          
215     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
216       flushDimension(dimensionToFlush_, true);
217     }
218     dimensionToFlush_--;
219
220     checkForCompletedStages();
221   }
222
223   void markMessageReceived(int dimension, int finalCount) {
224     cntMsgReceived_[dimension]++;
225     if (finalCount != -1) {
226       cntFinished_[dimension]++; 
227       cntMsgExpected_[dimension] += finalCount; 
228 #ifdef STREAMER_VERBOSE_OUTPUT
229       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
230                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
231                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
232                cntMsgReceived_[dimension]); 
233 #endif
234     }  
235     if (dimensionToFlush_ != numDimensions_ - 1) {
236       checkForCompletedStages();
237     }
238   }
239
240   void checkForCompletedStages() {
241
242     while (cntFinished_[dimensionToFlush_ + 1] == 
243            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
244            cntMsgExpected_[dimensionToFlush_ + 1] == 
245            cntMsgReceived_[dimensionToFlush_ + 1]) {
246       if (dimensionToFlush_ == -1) {
247 #ifdef STREAMER_VERBOSE_OUTPUT
248         CkPrintf("[%d] contribute\n", CkMyPe()); 
249 #endif
250 #ifdef DEBUG_STREAMER
251         CkAssert(numDataItemsBuffered_ == 0); 
252 #endif
253         this->contribute(userCallback_);
254         return; 
255       }
256       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
257         flushDimension(dimensionToFlush_, true); 
258       }
259       dimensionToFlush_--; 
260     }
261   }
262
263 };
264
265 template <class dtype>
266 MeshStreamer<dtype>::MeshStreamer(
267                      int maxNumDataItemsBuffered, int numDimensions, 
268                      int *dimensionSizes, 
269                      bool yieldFlag, 
270                      double progressPeriodInMs)
271  :numDimensions_(numDimensions), 
272   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
273   yieldFlag_(yieldFlag), 
274   progressPeriodInMs_(progressPeriodInMs)
275 {
276
277   int sumAlongAllDimensions = 0;   
278   individualDimensionSizes_ = new int[numDimensions_];
279   combinedDimensionSizes_ = new int[numDimensions_ + 1];
280   myLocationIndex_ = new int[numDimensions_];
281   memcpy(individualDimensionSizes_, dimensionSizes, 
282          numDimensions * sizeof(int)); 
283   combinedDimensionSizes_[0] = 1; 
284   for (int i = 0; i < numDimensions; i++) {
285     sumAlongAllDimensions += individualDimensionSizes_[i];
286     combinedDimensionSizes_[i + 1] = 
287       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
288   }
289
290   // except for personalized messages, the buffers for dimensions with the 
291   //   same index as the sender's are not used
292   bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
293     / (sumAlongAllDimensions - numDimensions_ + 1); 
294   if (bufferSize_ <= 0) {
295     bufferSize_ = 1; 
296     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
297              "is invalid. Defaulting to a single buffer per destination.\n");
298   }
299   numDataItemsBuffered_ = 0; 
300   numMembers_ = CkNumPes(); 
301
302   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
303   for (int i = 0; i < numDimensions; i++) {
304     int numMembersAlongDimension = individualDimensionSizes_[i]; 
305     dataBuffers_[i] = 
306       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
307     for (int j = 0; j < numMembersAlongDimension; j++) {
308       dataBuffers_[i][j] = NULL;
309     }
310   }
311
312   myIndex_ = CkMyPe();
313   int remainder = myIndex_;
314   for (int i = numDimensions_ - 1; i >= 0; i--) {    
315     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
316     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
317   }
318
319   isPeriodicFlushEnabled_ = false; 
320   detectorLocalObj_ = NULL;
321 #ifdef STREAMER_EXPERIMENTAL
322   immediateMode_ = false; 
323 #endif
324
325 #ifdef CACHE_LOCATIONS
326   cachedLocations_ = new MeshLocation[numMembers_];
327   isCached_ = new bool[numMembers_];
328   std::fill(isCached_, isCached_ + numMembers_, false);
329 #endif
330
331 #ifdef STAGED_COMPLETION
332
333   cntMsgSent_ = new int*[numDimensions_]; 
334   cntMsgReceived_ = new int[numDimensions_];
335   cntMsgExpected_ = new int[numDimensions_];
336   cntFinished_ = new int[numDimensions_]; 
337
338   for (int i = 0; i < numDimensions_; i++) {
339     cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
340   }
341
342 #endif
343
344 }
345
346 template <class dtype>
347 MeshStreamer<dtype>::~MeshStreamer() {
348
349   for (int i = 0; i < numDimensions_; i++) {
350     for (int j=0; j < individualDimensionSizes_[i]; j++) {
351       delete[] dataBuffers_[i][j]; 
352     }
353     delete[] dataBuffers_[i]; 
354   }
355
356   delete[] individualDimensionSizes_;
357   delete[] combinedDimensionSizes_; 
358   delete[] myLocationIndex_;
359
360 #ifdef CACHE_LOCATIONS
361   delete[] cachedLocations_;
362   delete[] isCached_; 
363 #endif
364
365 #ifdef STAGED_COMPLETION
366   for (int i = 0; i < numDimensions_; i++) {
367     delete[] cntMsgSent_[i]; 
368   }
369   delete[] cntMsgSent_; 
370   delete[] cntMsgReceived_; 
371   delete[] cntMsgExpected_; 
372   delete[] cntFinished_;
373 #endif
374
375 }
376
377
378 template <class dtype>
379 inline
380 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
381
382 #ifdef CACHE_LOCATIONS
383   if (isCached_[destinationPe]) {    
384     return cachedLocations_[destinationPe]; 
385   }
386 #endif
387
388   MeshLocation destinationLocation;
389   int remainder = destinationPe;
390   int dimensionIndex; 
391   for (int i = numDimensions_ - 1; i >= 0; i--) {        
392     dimensionIndex = remainder / combinedDimensionSizes_[i];
393     
394     if (dimensionIndex != myLocationIndex_[i]) {
395       destinationLocation.dimension = i; 
396       destinationLocation.bufferIndex = dimensionIndex; 
397 #ifdef CACHE_LOCATIONS
398       cachedLocations_[destinationPe] = destinationLocation;
399       isCached_[destinationPe] = true; 
400 #endif
401       return destinationLocation;
402     }
403
404     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
405   }
406
407   // all indices agree - message to oneself
408   destinationLocation.dimension = 0; 
409   destinationLocation.bufferIndex = myLocationIndex_[0];
410   return destinationLocation; 
411 }
412
413 template <class dtype>
414 inline 
415 int MeshStreamer<dtype>::copyDataItemIntoMessage(
416                          MeshStreamerMessage<dtype> *destinationBuffer,
417                          void *dataItemHandle, bool copyIndirectly) {
418   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
419 }
420
421 template <class dtype>
422 inline
423 void MeshStreamer<dtype>::storeMessage(
424                           int destinationPe, 
425                           const MeshLocation& destinationLocation,
426                           void *dataItem, bool copyIndirectly) {
427
428   int dimension = destinationLocation.dimension;
429   int bufferIndex = destinationLocation.bufferIndex; 
430   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
431
432   // allocate new message if necessary
433   if (messageBuffers[bufferIndex] == NULL) {
434     if (dimension == 0) {
435       // personalized messages do not require destination indices
436       messageBuffers[bufferIndex] = 
437         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
438     }
439     else {
440       messageBuffers[bufferIndex] = 
441         new (bufferSize_, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
442     }
443     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
444     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
445 #ifdef DEBUG_STREAMER
446     CkAssert(messageBuffers[bufferIndex] != NULL);
447 #endif
448   }
449   
450   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
451   int numBuffered = 
452     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
453   if (dimension != 0) {
454     destinationBuffer->markDestination(numBuffered-1, destinationPe);
455   }  
456   numDataItemsBuffered_++;
457
458   // send if buffer is full
459   if (numBuffered == bufferSize_) {
460
461     int destinationIndex;
462
463     destinationIndex = myIndex_ + 
464       (bufferIndex - myLocationIndex_[dimension]) * 
465       combinedDimensionSizes_[dimension];
466
467     if (dimension == 0) {
468 #ifdef STREAMER_VERBOSE_OUTPUT
469       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
470 #endif
471       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
472     }
473     else {
474 #ifdef STREAMER_VERBOSE_OUTPUT
475       CkPrintf("[%d] sending intermediate to %d\n", CkMyPe(), destinationIndex); 
476 #endif
477       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
478     }
479
480 #ifdef STAGED_COMPLETION
481     cntMsgSent_[dimension][bufferIndex]++; 
482 #endif
483
484     messageBuffers[bufferIndex] = NULL;
485     numDataItemsBuffered_ -= numBuffered; 
486     hasSentRecently_ = true; 
487
488   }
489   // send if total buffering capacity has been reached
490   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
491     sendLargestBuffer();
492     hasSentRecently_ = true; 
493   }
494
495 }
496
497 template <class dtype>
498 inline
499 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
500   const static bool copyIndirectly = true;
501
502   MeshLocation destinationLocation = determineLocation(destinationPe);
503   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
504                copyIndirectly); 
505   // release control to scheduler if requested by the user, 
506   //   assume caller is threaded entry
507   if (yieldFlag_ && ++yieldCount_ == 1024) {
508     yieldCount_ = 0; 
509     CthYield();
510   }
511
512 }
513
514 template <class dtype>
515 inline
516 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
517 #ifndef STAGED_COMPLETION
518   detectorLocalObj_->produce();
519 #endif
520   if (destinationPe == CkMyPe()) {
521     // copying here is necessary - user code should not be 
522     // passed back a reference to the original item
523     dtype dataItemCopy = dataItem;
524     localDeliver(dataItemCopy);
525     return;
526   }
527
528   insertData((void *) &dataItem, destinationPe);
529 }
530
531 template <class dtype>
532 void MeshStreamer<dtype>::init(CkCallback startCb, CkCallback endCb, 
533                                int prio) {
534
535   for (int i = 0; i < numDimensions_; i++) {
536     std::fill(cntMsgSent_[i], 
537               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
538     cntMsgReceived_[i] = 0;
539     cntMsgExpected_[i] = 0; 
540     cntFinished_[i] = 0; 
541   }
542   dimensionToFlush_ = numDimensions_ - 1;
543
544   yieldCount_ = 0; 
545   userCallback_ = endCb; 
546   prio_ = prio;
547
548   numLocalDone_ = 0; 
549   initLocalClients();
550   this->contribute(startCb);
551 }
552
553 template <class dtype>
554 void MeshStreamer<dtype>::associateCallback(
555                           int numContributors,
556                           CkCallback startCb, CkCallback endCb, 
557                           CProxy_CompletionDetector detector, 
558                           int prio) {
559
560 #ifdef STREAMER_EXPERIMENTAL
561   immediateMode_ = false;
562   hasSentPreviously_ = false; 
563 #endif
564   yieldCount_ = 0; 
565   prio_ = prio;
566   userCallback_ = endCb; 
567   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushDirect(), 
568                      this->thisProxy);
569   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
570                     this->thisProxy);
571   detector_ = detector;      
572   detectorLocalObj_ = detector_.ckLocalBranch();
573   initLocalClients();
574
575   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, finish , 0);
576   
577   if (progressPeriodInMs_ <= 0) {
578     CkPrintf("Using completion detection in NDMeshStreamer requires"
579              " setting a valid periodic flush period. Defaulting"
580              " to 10 ms\n");
581     progressPeriodInMs_ = 10;
582   }
583   
584   hasSentRecently_ = false; 
585   enablePeriodicFlushing();
586       
587 }
588
589 template <class dtype>
590 void MeshStreamer<dtype>::finish() {
591   isPeriodicFlushEnabled_ = false; 
592
593   if (!userCallback_.isInvalid()) {
594     this->contribute(userCallback_);
595     userCallback_ = CkCallback();      // nullify the current callback
596   }
597
598 }
599
600 template <class dtype>
601 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
602
603   int destinationPe, lastDestinationPe; 
604   MeshLocation destinationLocation;
605
606   lastDestinationPe = -1;
607   for (int i = 0; i < msg->numDataItems; i++) {
608     destinationPe = msg->destinationPes[i];
609     dtype &dataItem = msg->getDataItem(i);
610     if (destinationPe == CkMyPe()) {
611       localDeliver(dataItem);
612     }
613     else {
614       if (destinationPe != lastDestinationPe) {
615         // do this once per sequence of items with the same destination
616         destinationLocation = determineLocation(destinationPe);
617       }
618       storeMessage(destinationPe, destinationLocation, &dataItem);   
619     }
620     lastDestinationPe = destinationPe; 
621   }
622
623 #ifdef STREAMER_EXPERIMENTAL
624   if (immediateMode_) {
625     flushToIntermediateDestinations();
626   }
627 #endif
628
629 #ifdef STAGED_COMPLETION
630   envelope *env = UsrToEnv(msg);
631   MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
632   markMessageReceived(sourceLocation.dimension, msg->finalMsgCount); 
633 #endif
634
635   delete msg;
636
637 }
638
639 template <class dtype>
640 void MeshStreamer<dtype>::sendLargestBuffer() {
641
642   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
643   MeshStreamerMessage<dtype> ** messageBuffers; 
644   MeshStreamerMessage<dtype> *destinationBuffer; 
645
646   for (int i = 0; i < numDimensions_; i++) {
647
648     messageBuffers = dataBuffers_[i]; 
649     numBuffers = individualDimensionSizes_[i]; 
650
651     flushDimension = i; 
652     maxSize = 0;    
653     for (int j = 0; j < numBuffers; j++) {
654       if (messageBuffers[j] != NULL && 
655           messageBuffers[j]->numDataItems > maxSize) {
656         maxSize = messageBuffers[j]->numDataItems;
657         flushIndex = j; 
658       }
659     }
660
661     if (maxSize > 0) {
662
663       messageBuffers = dataBuffers_[flushDimension]; 
664       destinationBuffer = messageBuffers[flushIndex];
665       destinationIndex = myIndex_ + 
666         (flushIndex - myLocationIndex_[flushDimension]) * 
667         combinedDimensionSizes_[flushDimension] ;
668
669       if (destinationBuffer->numDataItems < bufferSize_) {
670         // not sending the full buffer, shrink the message size
671         envelope *env = UsrToEnv(destinationBuffer);
672         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
673                           (bufferSize_ - destinationBuffer->numDataItems));
674         *((int *) env->getPrioPtr()) = prio_;
675       }
676       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
677
678       if (flushDimension == 0) {
679 #ifdef STREAMER_VERBOSE_OUTPUT
680         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
681 #endif
682         this->thisProxy[destinationIndex].
683           receiveAtDestination(destinationBuffer);
684       }
685       else {
686 #ifdef STREAMER_VERBOSE_OUTPUT
687         CkPrintf("[%d] sending intermediate flush to %d\n", CkMyPe(), destinationIndex); 
688 #endif
689         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
690       }
691
692 #ifdef STAGED_COMPLETION
693       cntMsgSent_[i][flushIndex]++; 
694 #endif
695
696       messageBuffers[flushIndex] = NULL;
697
698     }
699
700   }
701 }
702
703 template <class dtype>
704 void MeshStreamer<dtype>::flushAllBuffers() {
705
706   MeshStreamerMessage<dtype> **messageBuffers; 
707   int numBuffers; 
708
709   for (int i = 0; i < numDimensions_; i++) {
710
711     messageBuffers = dataBuffers_[i]; 
712     numBuffers = individualDimensionSizes_[i]; 
713
714     for (int j = 0; j < numBuffers; j++) {
715
716       if(messageBuffers[j] == NULL) {
717         continue;
718       }
719
720       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
721
722       if (i == 0) {
723         int destinationPe = myIndex_ + j - myLocationIndex_[i];
724         this->thisProxy[destinationPe].receiveAtDestination(messageBuffers[j]);
725       }  
726       else {
727
728         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
729
730           MeshStreamerMessage<dtype> *directMsg = 
731             new (0, 1, sizeof(int)) MeshStreamerMessage<dtype>();
732           *(int *) CkPriorityPtr(directMsg) = prio_;
733           CkSetQueueing(directMsg, CK_QUEUEING_IFIFO);
734
735 #ifdef DEBUG_STREAMER
736           CkAssert(directMsg != NULL);
737 #endif
738           int destinationPe = messageBuffers[j]->destinationPes[k]; 
739           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
740           directMsg->addDataItem(dataItem);
741           this->thisProxy[destinationPe].receiveAtDestination(directMsg);
742         }
743         delete messageBuffers[j];
744       }
745       messageBuffers[j] = NULL;
746     }
747   }
748 }
749
750 template <class dtype>
751 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
752   
753   for (int i = 0; i < numDimensions_; i++) {
754     flushDimension(i); 
755   }
756 }
757
758 template <class dtype>
759 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
760 #ifdef STREAMER_VERBOSE_OUTPUT
761   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", CkMyPe(), dimension, sendMsgCounts); 
762 #endif
763   MeshStreamerMessage<dtype> **messageBuffers; 
764   MeshStreamerMessage<dtype> *destinationBuffer; 
765   int destinationIndex, numBuffers; 
766
767   messageBuffers = dataBuffers_[dimension]; 
768   numBuffers = individualDimensionSizes_[dimension]; 
769
770   for (int j = 0; j < numBuffers; j++) {
771
772     if(messageBuffers[j] == NULL) {      
773       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
774         messageBuffers[j] = 
775           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>();
776         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
777         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
778       }
779       else {
780         continue; 
781       } 
782     }
783
784     destinationBuffer = messageBuffers[j];
785     destinationIndex = myIndex_ + 
786       (j - myLocationIndex_[dimension]) * 
787       combinedDimensionSizes_[dimension] ;
788
789     if (destinationBuffer->numDataItems < bufferSize_) {
790 #ifdef STAGED_COMPLETION
791       if (destinationBuffer->numDataItems != 0) {
792 #endif
793       // not sending the full buffer, shrink the message size
794       envelope *env = UsrToEnv(destinationBuffer);
795       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
796                         (bufferSize_ - destinationBuffer->numDataItems));
797       *((int *) env->getPrioPtr()) = prio_;
798 #ifdef STAGED_COMPLETION
799       }
800 #endif
801     }
802     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
803
804 #ifdef STAGED_COMPLETION
805     destinationBuffer->finalMsgCount = ++cntMsgSent_[dimension][j];
806 #endif
807
808     if (dimension == 0) {
809 #ifdef STREAMER_VERBOSE_OUTPUT
810       CkPrintf("[%d] sending dimension flush to %d\n", CkMyPe(), destinationIndex); 
811 #endif
812       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
813     }
814     else {
815 #ifdef STREAMER_VERBOSE_OUTPUT
816       CkPrintf("[%d] sending intermediate dimension flush to %d\n", CkMyPe(), destinationIndex); 
817 #endif
818       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
819     }
820     messageBuffers[j] = NULL;
821   }
822   
823 }
824
825
826 template <class dtype>
827 void MeshStreamer<dtype>::flushDirect(){
828   // flush if (1) this is not a periodic call or 
829   //          (2) this is a periodic call and no sending took place
830   //              since the last time the function was invoked
831   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
832
833     if (numDataItemsBuffered_ != 0) {
834       flushAllBuffers();
835     }    
836 #ifdef DEBUG_STREAMER
837     CkAssert(numDataItemsBuffered_ == 0); 
838 #endif
839     
840   }
841
842 #ifdef STREAMER_EXPERIMENTAL
843   // switch into immediate sending mode when 
844   // number of items buffered is small; avoid doing the switch 
845   // at the beginning before any sending has taken place
846   if (hasSentPreviously_ && 
847       (numDataItemsBuffered_ < .1 * maxNumDataItemsBuffered_)) {
848     immediateMode_ = true; 
849   } 
850
851   if (!hasSentPreviously_) {
852     hasSentPreviously_ = hasSentRecently_; 
853   }
854 #endif
855
856   hasSentRecently_ = false; 
857
858 }
859
860 template <class dtype>
861 void periodicProgressFunction(void *MeshStreamerObj, double time) {
862
863   MeshStreamer<dtype> *properObj = 
864     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
865
866   if (properObj->isPeriodicFlushEnabled()) {
867     properObj->flushDirect();
868     properObj->registerPeriodicProgressFunction();
869   }
870 }
871
872 template <class dtype>
873 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
874   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
875                  progressPeriodInMs_); 
876 }
877
878
879 template <class dtype>
880 class GroupMeshStreamer : public MeshStreamer<dtype> {
881 private:
882
883   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
884   MeshStreamerGroupClient<dtype> *clientObj_;
885
886   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
887     for (int i = 0; i < msg->numDataItems; i++) {
888       dtype &data = msg->getDataItem(i);
889       clientObj_->process(data);
890     }
891 #ifdef STAGED_COMPLETION
892     envelope *env = UsrToEnv(msg);
893     MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
894 #ifdef DEBUG_STREAMER
895     CkAssert(env->getSrcPe() >= 0 && env->getSrcPe() < CkNumPes()); 
896 #endif
897 #ifdef STREAMER_VERBOSE_OUTPUT
898     CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
899 #endif
900     markMessageReceived(sourceLocation.dimension, msg->finalMsgCount); 
901 #else 
902     this->detectorLocalObj_->consume(msg->numDataItems);    
903 #endif
904     delete msg;
905   }
906
907   void localDeliver(dtype &dataItem) {
908     clientObj_->process(dataItem);
909 #ifndef STAGED_COMPLETION
910     MeshStreamer<dtype>::detectorLocalObj_->consume();
911 #endif
912   }
913
914   int numElementsInClient() {
915     // client is a group - there is one element per PE
916     return CkNumPes();
917   }
918
919   int numLocalElementsInClient() {
920     return 1; 
921   }
922
923   void initLocalClients() {
924     // no action required
925   }
926
927 public:
928
929   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
930                     int *dimensionSizes, 
931                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
932                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
933    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
934                          yieldFlag, progressPeriodInMs) 
935   {
936     clientProxy_ = clientProxy; 
937     clientObj_ = 
938       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
939   }
940
941 };
942
943 template <class dtype>
944 class MeshStreamerClientIterator : public CkLocIterator {
945
946 public:
947   
948   CompletionDetector *detectorLocalObj_;
949   CkArray *clientArrMgr_;
950   MeshStreamerClientIterator(CompletionDetector *detectorObj, 
951                              CkArray *clientArrMgr) 
952     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
953
954   // CkLocMgr::iterate will call addLocation on all elements local to this PE
955   void addLocation(CkLocation &loc) {
956
957     MeshStreamerArrayClient<dtype> *clientObj = 
958       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
959
960 #ifdef DEBUG_STREAMER
961     CkAssert(clientObj != NULL); 
962 #endif
963     clientObj->setDetector(detectorLocalObj_); 
964   }
965
966 };
967
968 template <class dtype, class itype>
969 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
970   
971 private:
972   
973   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
974   CkArray *clientArrayMgr_;
975   int numArrayElements_;
976   int numLocalArrayElements_;
977 #ifdef CACHE_ARRAY_METADATA
978   MeshStreamerArrayClient<dtype> **clientObjs_;
979   int *destinationPes_;
980   bool *isCachedArrayMetadata_;
981 #endif
982
983   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
984     itype arrayId = packedDataItem.arrayIndex; 
985
986     MeshStreamerArrayClient<dtype> *clientObj;
987 #ifdef CACHE_ARRAY_METADATA
988     clientObj = clientObjs_[arrayId];
989 #else
990     clientObj = clientProxy_[arrayId].ckLocal();
991 #endif
992
993     if (clientObj != NULL) {
994       clientObj->process(packedDataItem.dataItem);
995 #ifndef STAGED_COMPLETION
996       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
997 #endif
998     }
999     else { 
1000       // array element is no longer present locally - redeliver using proxy
1001       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
1002     }
1003   }
1004
1005   int numElementsInClient() {
1006     return numArrayElements_;
1007   }
1008
1009   int numLocalElementsInClient() {
1010     return numLocalArrayElements_;
1011   }
1012
1013   void initLocalClients() {
1014 #ifndef STAGED_COMPLETION
1015
1016   #ifdef CACHE_ARRAY_METADATA
1017     std::fill(isCachedArrayMetadata_, 
1018               isCachedArrayMetadata_ + numArrayElements_, false);
1019
1020     for (int i = 0; i < numArrayElements_; i++) {
1021       clientObjs_[i] = clientProxy_[i].ckLocal();
1022       if (clientObjs_[i] != NULL) {
1023         clientObjs_[i]->setDetector(
1024          MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
1025       }
1026     }
1027   #else
1028     // set completion detector in local elements of the client
1029     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1030     MeshStreamerClientIterator<dtype> clientIterator(
1031      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1032      clientProxy_.ckLocalBranch());
1033     clientLocMgr->iterate(clientIterator);
1034   #endif    
1035
1036 #else 
1037     numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1038 #endif
1039   }
1040
1041 public:
1042
1043   struct DataItemHandle {
1044     itype arrayIndex; 
1045     dtype *dataItem;
1046   };
1047
1048   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1049                     int *dimensionSizes, 
1050                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
1051                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1052     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1053       maxNumDataItemsBuffered, numDimensions, dimensionSizes, yieldFlag, 
1054       progressPeriodInMs) 
1055   {
1056     clientProxy_ = clientProxy; 
1057     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1058
1059 #ifdef CACHE_ARRAY_METADATA
1060     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1061     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1062     destinationPes_ = new int[numArrayElements_];
1063     isCachedArrayMetadata_ = new bool[numArrayElements_];
1064     std::fill(isCachedArrayMetadata_, 
1065               isCachedArrayMetadata_ + numArrayElements_, false);
1066 #endif
1067   }
1068
1069   ~ArrayMeshStreamer() {
1070 #ifdef CACHE_ARRAY_METADATA
1071     delete [] clientObjs_;
1072     delete [] destinationPes_;
1073     delete [] isCachedArrayMetadata_; 
1074 #endif
1075   }
1076
1077   void receiveAtDestination(
1078        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1079
1080     for (int i = 0; i < msg->numDataItems; i++) {
1081       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
1082       localDeliver(packedData);
1083     }
1084 #ifdef STAGED_COMPLETION
1085     envelope *env = UsrToEnv(msg);
1086     MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
1087     markMessageReceived(sourceLocation.dimension, msg->finalMsgCount);
1088 #endif
1089
1090     delete msg;
1091   }
1092
1093   void insertData(dtype &dataItem, itype arrayIndex) {
1094 #ifndef STAGED_COMPLETION
1095     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1096 #endif
1097     int destinationPe; 
1098 #ifdef CACHE_ARRAY_METADATA
1099   if (isCachedArrayMetadata_[arrayIndex]) {    
1100     destinationPe =  destinationPes_[arrayIndex];
1101   }
1102   else {
1103     destinationPe = 
1104       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1105     isCachedArrayMetadata_[arrayIndex] = true;
1106     destinationPes_[arrayIndex] = destinationPe;
1107   }
1108 #else 
1109   destinationPe = 
1110     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1111 #endif
1112
1113   ArrayDataItem<dtype, itype> packedDataItem;
1114     if (destinationPe == CkMyPe()) {
1115       // copying here is necessary - user code should not be 
1116       // passed back a reference to the original item
1117       packedDataItem.arrayIndex = arrayIndex; 
1118       packedDataItem.dataItem = dataItem;
1119       localDeliver(packedDataItem);
1120       return;
1121     }
1122
1123     // this implementation avoids copying an item before transfer into message
1124
1125     DataItemHandle tempHandle; 
1126     tempHandle.arrayIndex = arrayIndex; 
1127     tempHandle.dataItem = &dataItem;
1128
1129     MeshStreamer<ArrayDataItem<dtype, itype> >::
1130      insertData(&tempHandle, destinationPe);
1131
1132   }
1133
1134   int copyDataItemIntoMessage(
1135       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1136       void *dataItemHandle, bool copyIndirectly) {
1137
1138     if (copyIndirectly == true) {
1139       // newly inserted items are passed through a handle to avoid copying
1140       int numDataItems = destinationBuffer->numDataItems;
1141       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
1142       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1143         *(tempHandle->dataItem);
1144       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1145         tempHandle->arrayIndex;
1146       return ++destinationBuffer->numDataItems;
1147     }
1148     else {
1149       // this is an item received along the route to destination
1150       // we can copy it from the received message
1151       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1152               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1153     }
1154   }
1155
1156 };
1157
1158 #define CK_TEMPLATES_ONLY
1159 #include "NDMeshStreamer.def.h"
1160 #undef CK_TEMPLATES_ONLY
1161
1162 #endif