0316f2b85f5cf57b55504f523e795866b4e34181
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define DEBUG_STREAMER
17 // #define CACHE_LOCATIONS
18 // #define SUPPORT_INCOMPLETE_MESH
19 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
20 // #define STREAMER_EXPERIMENTAL
21 // #define STREAMER_VERBOSE_OUTPUT
22 // #define STAGED_COMPLETION
23
24 struct MeshLocation {
25   int dimension; 
26   int bufferIndex; 
27 }; 
28
29 template<class dtype>
30 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
31
32 public:
33
34 #ifdef STAGED_COMPLETION
35   int finalMsgCount; 
36 #endif
37   int numDataItems;
38   int *destinationPes;
39   dtype *dataItems;
40
41   MeshStreamerMessage(): numDataItems(0) {
42 #ifdef STAGED_COMPLETION
43     finalMsgCount = -1; 
44 #endif
45   }
46
47   int addDataItem(const dtype &dataItem) {
48     dataItems[numDataItems] = dataItem;
49     return ++numDataItems; 
50   }
51
52   void markDestination(const int index, const int destinationPe) {
53     destinationPes[index] = destinationPe;
54   }
55
56   dtype &getDataItem(const int index) {
57     return dataItems[index];
58   }
59
60 };
61
62 template <class dtype>
63 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
64 private:
65   CompletionDetector *detectorLocalObj_;
66 public:
67   MeshStreamerArrayClient(){}
68   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
69   // would like to make it pure virtual but charm will try to
70   // instantiate the abstract class, leading to errors
71   virtual void process(dtype &data) {
72     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
73             "This virtual function should have been defined by the user.\n");
74   };     
75   void setDetector(CompletionDetector *detectorLocalObj) {
76     detectorLocalObj_ = detectorLocalObj;
77   }
78   void receiveRedeliveredItem(dtype data) {
79 #ifdef STREAMER_VERBOSE_OUTPUT
80     CkPrintf("[%d] redelivered to index %d\n", CkMyPe(), this->thisIndex.data[0]);
81 #endif
82     detectorLocalObj_->consume();
83     process(data);
84   }
85
86   void pup(PUP::er &p) {
87     CBase_MeshStreamerArrayClient<dtype>::pup(p);
88    }  
89
90 };
91
92 template <class dtype>
93 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
94
95 public:
96   virtual void process(dtype &data) = 0;
97
98 };
99
100 template <class dtype>
101 class MeshStreamer : public CBase_MeshStreamer<dtype> {
102
103 private:
104   int bufferSize_; 
105   int maxNumDataItemsBuffered_;
106   int numDataItemsBuffered_;
107
108   int numMembers_; 
109   int numDimensions_;
110   int *individualDimensionSizes_;
111   int *combinedDimensionSizes_;
112
113   int myIndex_;
114   int *myLocationIndex_;
115
116   CkCallback   userCallback_;
117   bool yieldFlag_;
118
119   double progressPeriodInMs_; 
120   bool isPeriodicFlushEnabled_; 
121   bool hasSentRecently_;
122 #ifdef STREAMER_EXPERIMENTAL
123   bool hasSentPreviously_;
124   bool immediateMode_; 
125 #endif
126   MeshStreamerMessage<dtype> ***dataBuffers_;
127
128   CProxy_CompletionDetector detector_;
129   int prio_;
130   int yieldCount_;
131
132 #ifdef CACHE_LOCATIONS
133   MeshLocation *cachedLocations_;
134   bool *isCached_; 
135 #endif
136
137
138   // only used for staged completion
139   int **cntMsgSent_;
140   int *cntMsgReceived_;
141   int *cntMsgExpected_;
142   int *cntFinished_; 
143   int dimensionToFlush_;
144   int numLocalDone_; 
145
146
147   void storeMessage(int destinationPe, 
148                     const MeshLocation &destinationCoordinates, 
149                     void *dataItem, bool copyIndirectly = false);
150   virtual void localDeliver(dtype &dataItem) = 0; 
151
152   virtual int numElementsInClient() = 0;
153   virtual int numLocalElementsInClient() = 0; 
154
155   virtual void initLocalClients() = 0;
156
157   void sendLargestBuffer();
158   void flushToIntermediateDestinations();
159   void flushDimension(int dimension, bool sendMsgCounts = false); 
160
161 protected:
162
163   CompletionDetector *detectorLocalObj_;
164   virtual int copyDataItemIntoMessage(
165               MeshStreamerMessage<dtype> *destinationBuffer, 
166               void *dataItemHandle, bool copyIndirectly = false);
167   MeshLocation determineLocation(int destinationPe);
168 public:
169
170   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
171                int *dimensionSizes,
172                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
173   ~MeshStreamer();
174
175   // entry
176   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
177   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
178   void flushDirect();
179   void finish();
180
181   // non entry
182   bool isPeriodicFlushEnabled() {
183     return isPeriodicFlushEnabled_;
184   }
185   virtual void insertData(dtype &dataItem, int destinationPe); 
186   void insertData(void *dataItemHandle, int destinationPe);
187   void associateCallback(int numContributors, 
188                          CkCallback startCb, CkCallback endCb, 
189                          CProxy_CompletionDetector detector,
190                          int prio);
191   void flushAllBuffers();
192   void registerPeriodicProgressFunction();
193
194   // flushing begins only after enablePeriodicFlushing has been invoked
195
196   void enablePeriodicFlushing(){
197     isPeriodicFlushEnabled_ = true; 
198     registerPeriodicProgressFunction();
199   }
200
201   void done(int numContributorsFinished = 1) {
202 #ifdef STAGED_COMPLETION
203     numLocalDone_ += numContributorsFinished; 
204     if (numLocalDone_ == numLocalElementsInClient()) {
205       startStagedCompletion();
206     }
207 #else
208     detectorLocalObj_->done(numContributorsFinished);
209 #endif
210   }
211
212   void init(CkCallback startCb, CkCallback endCb, int prio);
213   
214   void startStagedCompletion() {          
215     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
216       flushDimension(dimensionToFlush_, true);
217     }
218     dimensionToFlush_--;
219
220     checkForCompletedStages();
221   }
222
223   void markMessageReceived(int dimension, int finalCount) {
224     cntMsgReceived_[dimension]++;
225     if (finalCount != -1) {
226       cntFinished_[dimension]++; 
227       cntMsgExpected_[dimension] += finalCount; 
228 #ifdef STREAMER_VERBOSE_OUTPUT
229       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
230                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
231                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
232                cntMsgReceived_[dimension]); 
233 #endif
234     }  
235     if (dimensionToFlush_ != numDimensions_ - 1) {
236       checkForCompletedStages();
237     }
238   }
239
240   void checkForCompletedStages() {
241
242     while (cntFinished_[dimensionToFlush_ + 1] == 
243            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
244            cntMsgExpected_[dimensionToFlush_ + 1] == 
245            cntMsgReceived_[dimensionToFlush_ + 1]) {
246       if (dimensionToFlush_ == -1) {
247 #ifdef STREAMER_VERBOSE_OUTPUT
248         CkPrintf("[%d] contribute\n", CkMyPe()); 
249 #endif
250 #ifdef DEBUG_STREAMER
251         CkAssert(numDataItemsBuffered_ == 0); 
252 #endif
253         this->contribute(userCallback_);
254         return; 
255       }
256       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
257         flushDimension(dimensionToFlush_, true); 
258       }
259       dimensionToFlush_--; 
260     }
261   }
262
263 };
264
265 template <class dtype>
266 MeshStreamer<dtype>::MeshStreamer(
267                      int maxNumDataItemsBuffered, int numDimensions, 
268                      int *dimensionSizes, 
269                      bool yieldFlag, 
270                      double progressPeriodInMs)
271  :numDimensions_(numDimensions), 
272   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
273   yieldFlag_(yieldFlag), 
274   progressPeriodInMs_(progressPeriodInMs)
275 {
276
277   int sumAlongAllDimensions = 0;   
278   individualDimensionSizes_ = new int[numDimensions_];
279   combinedDimensionSizes_ = new int[numDimensions_ + 1];
280   myLocationIndex_ = new int[numDimensions_];
281   memcpy(individualDimensionSizes_, dimensionSizes, 
282          numDimensions * sizeof(int)); 
283   combinedDimensionSizes_[0] = 1; 
284   for (int i = 0; i < numDimensions; i++) {
285     sumAlongAllDimensions += individualDimensionSizes_[i];
286     combinedDimensionSizes_[i + 1] = 
287       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
288   }
289
290   // except for personalized messages, the buffers for dimensions with the 
291   //   same index as the sender's are not used
292   bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
293     / (sumAlongAllDimensions - numDimensions_ + 1); 
294   if (bufferSize_ <= 0) {
295     bufferSize_ = 1; 
296     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
297              "is invalid. Defaulting to a single buffer per destination.\n");
298   }
299   numDataItemsBuffered_ = 0; 
300   numMembers_ = CkNumPes(); 
301
302   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
303   for (int i = 0; i < numDimensions; i++) {
304     int numMembersAlongDimension = individualDimensionSizes_[i]; 
305     dataBuffers_[i] = 
306       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
307     for (int j = 0; j < numMembersAlongDimension; j++) {
308       dataBuffers_[i][j] = NULL;
309     }
310   }
311
312   myIndex_ = CkMyPe();
313   int remainder = myIndex_;
314   for (int i = numDimensions_ - 1; i >= 0; i--) {    
315     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
316     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
317   }
318
319   isPeriodicFlushEnabled_ = false; 
320   detectorLocalObj_ = NULL;
321 #ifdef STREAMER_EXPERIMENTAL
322   immediateMode_ = false; 
323 #endif
324
325 #ifdef CACHE_LOCATIONS
326   cachedLocations_ = new MeshLocation[numMembers_];
327   isCached_ = new bool[numMembers_];
328   std::fill(isCached_, isCached_ + numMembers_, false);
329 #endif
330
331 #ifdef STAGED_COMPLETION
332
333   cntMsgSent_ = new int*[numDimensions_]; 
334   cntMsgReceived_ = new int[numDimensions_];
335   cntMsgExpected_ = new int[numDimensions_];
336   cntFinished_ = new int[numDimensions_]; 
337
338   for (int i = 0; i < numDimensions_; i++) {
339     cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
340   }
341
342 #endif
343
344 }
345
346 template <class dtype>
347 MeshStreamer<dtype>::~MeshStreamer() {
348
349   for (int i = 0; i < numDimensions_; i++) {
350     for (int j=0; j < individualDimensionSizes_[i]; j++) {
351       delete[] dataBuffers_[i][j]; 
352     }
353     delete[] dataBuffers_[i]; 
354   }
355
356   delete[] individualDimensionSizes_;
357   delete[] combinedDimensionSizes_; 
358   delete[] myLocationIndex_;
359
360 #ifdef CACHE_LOCATIONS
361   delete[] cachedLocations_;
362   delete[] isCached_; 
363 #endif
364
365 #ifdef STAGED_COMPLETION
366   for (int i = 0; i < numDimensions_; i++) {
367     delete[] cntMsgSent_[i]; 
368   }
369   delete[] cntMsgSent_; 
370   delete[] cntMsgReceived_; 
371   delete[] cntMsgExpected_; 
372   delete[] cntFinished_;
373 #endif
374
375 }
376
377
378 template <class dtype>
379 inline
380 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
381
382 #ifdef CACHE_LOCATIONS
383   if (isCached_[destinationPe]) {    
384     return cachedLocations_[destinationPe]; 
385   }
386 #endif
387
388   MeshLocation destinationLocation;
389   int remainder = destinationPe;
390   int dimensionIndex; 
391   for (int i = numDimensions_ - 1; i >= 0; i--) {        
392     dimensionIndex = remainder / combinedDimensionSizes_[i];
393     
394     if (dimensionIndex != myLocationIndex_[i]) {
395       destinationLocation.dimension = i; 
396       destinationLocation.bufferIndex = dimensionIndex; 
397 #ifdef CACHE_LOCATIONS
398       cachedLocations_[destinationPe] = destinationLocation;
399       isCached_[destinationPe] = true; 
400 #endif
401       return destinationLocation;
402     }
403
404     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
405   }
406
407   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
408           "equal to sender's PE. This is unexpected and may cause errors.\n"); 
409 }
410
411 template <class dtype>
412 inline 
413 int MeshStreamer<dtype>::copyDataItemIntoMessage(
414                          MeshStreamerMessage<dtype> *destinationBuffer,
415                          void *dataItemHandle, bool copyIndirectly) {
416   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
417 }
418
419 template <class dtype>
420 inline
421 void MeshStreamer<dtype>::storeMessage(
422                           int destinationPe, 
423                           const MeshLocation& destinationLocation,
424                           void *dataItem, bool copyIndirectly) {
425
426   int dimension = destinationLocation.dimension;
427   int bufferIndex = destinationLocation.bufferIndex; 
428   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
429
430   // allocate new message if necessary
431   if (messageBuffers[bufferIndex] == NULL) {
432     if (dimension == 0) {
433       // personalized messages do not require destination indices
434       messageBuffers[bufferIndex] = 
435         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
436     }
437     else {
438       messageBuffers[bufferIndex] = 
439         new (bufferSize_, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>();
440     }
441     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
442     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
443 #ifdef DEBUG_STREAMER
444     CkAssert(messageBuffers[bufferIndex] != NULL);
445 #endif
446   }
447   
448   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
449   int numBuffered = 
450     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
451   if (dimension != 0) {
452     destinationBuffer->markDestination(numBuffered-1, destinationPe);
453   }  
454   numDataItemsBuffered_++;
455
456   // send if buffer is full
457   if (numBuffered == bufferSize_) {
458
459     int destinationIndex;
460
461     destinationIndex = myIndex_ + 
462       (bufferIndex - myLocationIndex_[dimension]) * 
463       combinedDimensionSizes_[dimension];
464
465     if (dimension == 0) {
466 #ifdef STREAMER_VERBOSE_OUTPUT
467       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
468 #endif
469       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
470     }
471     else {
472 #ifdef STREAMER_VERBOSE_OUTPUT
473       CkPrintf("[%d] sending intermediate to %d\n", CkMyPe(), destinationIndex); 
474 #endif
475       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
476     }
477
478 #ifdef STAGED_COMPLETION
479     cntMsgSent_[dimension][bufferIndex]++; 
480 #endif
481
482     messageBuffers[bufferIndex] = NULL;
483     numDataItemsBuffered_ -= numBuffered; 
484     hasSentRecently_ = true; 
485
486   }
487   // send if total buffering capacity has been reached
488   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
489     sendLargestBuffer();
490     hasSentRecently_ = true; 
491   }
492
493 }
494
495 template <class dtype>
496 inline
497 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
498   const static bool copyIndirectly = true;
499
500   MeshLocation destinationLocation = determineLocation(destinationPe);
501   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
502                copyIndirectly); 
503   // release control to scheduler if requested by the user, 
504   //   assume caller is threaded entry
505   if (yieldFlag_ && ++yieldCount_ == 1024) {
506     yieldCount_ = 0; 
507     CthYield();
508   }
509
510 }
511
512 template <class dtype>
513 inline
514 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
515 #ifndef STAGED_COMPLETION
516   detectorLocalObj_->produce();
517 #endif
518   if (destinationPe == CkMyPe()) {
519     // copying here is necessary - user code should not be 
520     // passed back a reference to the original item
521     dtype dataItemCopy = dataItem;
522     localDeliver(dataItemCopy);
523     return;
524   }
525
526   insertData((void *) &dataItem, destinationPe);
527 }
528
529 template <class dtype>
530 void MeshStreamer<dtype>::init(CkCallback startCb, CkCallback endCb, 
531                                int prio) {
532
533   for (int i = 0; i < numDimensions_; i++) {
534     std::fill(cntMsgSent_[i], 
535               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
536     cntMsgReceived_[i] = 0;
537     cntMsgExpected_[i] = 0; 
538     cntFinished_[i] = 0; 
539   }
540   dimensionToFlush_ = numDimensions_ - 1;
541
542   yieldCount_ = 0; 
543   userCallback_ = endCb; 
544   prio_ = prio;
545
546   numLocalDone_ = 0; 
547   initLocalClients();
548   this->contribute(startCb);
549 }
550
551 template <class dtype>
552 void MeshStreamer<dtype>::associateCallback(
553                           int numContributors,
554                           CkCallback startCb, CkCallback endCb, 
555                           CProxy_CompletionDetector detector, 
556                           int prio) {
557
558 #ifdef STREAMER_EXPERIMENTAL
559   immediateMode_ = false;
560   hasSentPreviously_ = false; 
561 #endif
562   yieldCount_ = 0; 
563   prio_ = prio;
564   userCallback_ = endCb; 
565   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushDirect(), 
566                      this->thisProxy);
567   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
568                     this->thisProxy);
569   detector_ = detector;      
570   detectorLocalObj_ = detector_.ckLocalBranch();
571   initLocalClients();
572
573   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, finish , 0);
574   
575   if (progressPeriodInMs_ <= 0) {
576     CkPrintf("Using completion detection in NDMeshStreamer requires"
577              " setting a valid periodic flush period. Defaulting"
578              " to 10 ms\n");
579     progressPeriodInMs_ = 10;
580   }
581   
582   hasSentRecently_ = false; 
583   enablePeriodicFlushing();
584       
585 }
586
587 template <class dtype>
588 void MeshStreamer<dtype>::finish() {
589   isPeriodicFlushEnabled_ = false; 
590
591   if (!userCallback_.isInvalid()) {
592     this->contribute(userCallback_);
593     userCallback_ = CkCallback();      // nullify the current callback
594   }
595
596 }
597
598 template <class dtype>
599 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
600
601   int destinationPe, lastDestinationPe; 
602   MeshLocation destinationLocation;
603
604   lastDestinationPe = -1;
605   for (int i = 0; i < msg->numDataItems; i++) {
606     destinationPe = msg->destinationPes[i];
607     dtype &dataItem = msg->getDataItem(i);
608     if (destinationPe == CkMyPe()) {
609       localDeliver(dataItem);
610     }
611     else {
612       if (destinationPe != lastDestinationPe) {
613         // do this once per sequence of items with the same destination
614         destinationLocation = determineLocation(destinationPe);
615       }
616       storeMessage(destinationPe, destinationLocation, &dataItem);   
617     }
618     lastDestinationPe = destinationPe; 
619   }
620
621 #ifdef STREAMER_EXPERIMENTAL
622   if (immediateMode_) {
623     flushToIntermediateDestinations();
624   }
625 #endif
626
627 #ifdef STAGED_COMPLETION
628   envelope *env = UsrToEnv(msg);
629   MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
630   markMessageReceived(sourceLocation.dimension, msg->finalMsgCount); 
631 #endif
632
633   delete msg;
634
635 }
636
637 template <class dtype>
638 void MeshStreamer<dtype>::sendLargestBuffer() {
639
640   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
641   MeshStreamerMessage<dtype> ** messageBuffers; 
642   MeshStreamerMessage<dtype> *destinationBuffer; 
643
644   for (int i = 0; i < numDimensions_; i++) {
645
646     messageBuffers = dataBuffers_[i]; 
647     numBuffers = individualDimensionSizes_[i]; 
648
649     flushDimension = i; 
650     maxSize = 0;    
651     for (int j = 0; j < numBuffers; j++) {
652       if (messageBuffers[j] != NULL && 
653           messageBuffers[j]->numDataItems > maxSize) {
654         maxSize = messageBuffers[j]->numDataItems;
655         flushIndex = j; 
656       }
657     }
658
659     if (maxSize > 0) {
660
661       messageBuffers = dataBuffers_[flushDimension]; 
662       destinationBuffer = messageBuffers[flushIndex];
663       destinationIndex = myIndex_ + 
664         (flushIndex - myLocationIndex_[flushDimension]) * 
665         combinedDimensionSizes_[flushDimension] ;
666
667       if (destinationBuffer->numDataItems < bufferSize_) {
668         // not sending the full buffer, shrink the message size
669         envelope *env = UsrToEnv(destinationBuffer);
670         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
671                           (bufferSize_ - destinationBuffer->numDataItems));
672         *((int *) env->getPrioPtr()) = prio_;
673       }
674       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
675
676       if (flushDimension == 0) {
677 #ifdef STREAMER_VERBOSE_OUTPUT
678         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
679 #endif
680         this->thisProxy[destinationIndex].
681           receiveAtDestination(destinationBuffer);
682       }
683       else {
684 #ifdef STREAMER_VERBOSE_OUTPUT
685         CkPrintf("[%d] sending intermediate flush to %d\n", CkMyPe(), destinationIndex); 
686 #endif
687         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
688       }
689
690 #ifdef STAGED_COMPLETION
691       cntMsgSent_[i][flushIndex]++; 
692 #endif
693
694       messageBuffers[flushIndex] = NULL;
695
696     }
697
698   }
699 }
700
701 template <class dtype>
702 void MeshStreamer<dtype>::flushAllBuffers() {
703
704   MeshStreamerMessage<dtype> **messageBuffers; 
705   int numBuffers; 
706
707   for (int i = 0; i < numDimensions_; i++) {
708
709     messageBuffers = dataBuffers_[i]; 
710     numBuffers = individualDimensionSizes_[i]; 
711
712     for (int j = 0; j < numBuffers; j++) {
713
714       if(messageBuffers[j] == NULL) {
715         continue;
716       }
717
718       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
719
720       if (i == 0) {
721         int destinationPe = myIndex_ + j - myLocationIndex_[i];
722         this->thisProxy[destinationPe].receiveAtDestination(messageBuffers[j]);
723       }  
724       else {
725
726         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
727
728           MeshStreamerMessage<dtype> *directMsg = 
729             new (0, 1, sizeof(int)) MeshStreamerMessage<dtype>();
730           *(int *) CkPriorityPtr(directMsg) = prio_;
731           CkSetQueueing(directMsg, CK_QUEUEING_IFIFO);
732
733 #ifdef DEBUG_STREAMER
734           CkAssert(directMsg != NULL);
735 #endif
736           int destinationPe = messageBuffers[j]->destinationPes[k]; 
737           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
738           directMsg->addDataItem(dataItem);
739           this->thisProxy[destinationPe].receiveAtDestination(directMsg);
740         }
741         delete messageBuffers[j];
742       }
743       messageBuffers[j] = NULL;
744     }
745   }
746 }
747
748 template <class dtype>
749 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
750   
751   for (int i = 0; i < numDimensions_; i++) {
752     flushDimension(i); 
753   }
754 }
755
756 template <class dtype>
757 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
758 #ifdef STREAMER_VERBOSE_OUTPUT
759   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", CkMyPe(), dimension, sendMsgCounts); 
760 #endif
761   MeshStreamerMessage<dtype> **messageBuffers; 
762   MeshStreamerMessage<dtype> *destinationBuffer; 
763   int destinationIndex, numBuffers; 
764
765   messageBuffers = dataBuffers_[dimension]; 
766   numBuffers = individualDimensionSizes_[dimension]; 
767
768   for (int j = 0; j < numBuffers; j++) {
769
770     if(messageBuffers[j] == NULL) {      
771       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
772         messageBuffers[j] = 
773           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>();
774         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
775         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
776       }
777       else {
778         continue; 
779       } 
780     }
781
782     destinationBuffer = messageBuffers[j];
783     destinationIndex = myIndex_ + 
784       (j - myLocationIndex_[dimension]) * 
785       combinedDimensionSizes_[dimension] ;
786
787     if (destinationBuffer->numDataItems < bufferSize_) {
788 #ifdef STAGED_COMPLETION
789       if (destinationBuffer->numDataItems != 0) {
790 #endif
791       // not sending the full buffer, shrink the message size
792       envelope *env = UsrToEnv(destinationBuffer);
793       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
794                         (bufferSize_ - destinationBuffer->numDataItems));
795       *((int *) env->getPrioPtr()) = prio_;
796 #ifdef STAGED_COMPLETION
797       }
798 #endif
799     }
800     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
801
802 #ifdef STAGED_COMPLETION
803     destinationBuffer->finalMsgCount = ++cntMsgSent_[dimension][j];
804 #endif
805
806     if (dimension == 0) {
807 #ifdef STREAMER_VERBOSE_OUTPUT
808       CkPrintf("[%d] sending dimension flush to %d\n", CkMyPe(), destinationIndex); 
809 #endif
810       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
811     }
812     else {
813 #ifdef STREAMER_VERBOSE_OUTPUT
814       CkPrintf("[%d] sending intermediate dimension flush to %d\n", CkMyPe(), destinationIndex); 
815 #endif
816       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
817     }
818     messageBuffers[j] = NULL;
819   }
820   
821 }
822
823
824 template <class dtype>
825 void MeshStreamer<dtype>::flushDirect(){
826   // flush if (1) this is not a periodic call or 
827   //          (2) this is a periodic call and no sending took place
828   //              since the last time the function was invoked
829   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
830
831     if (numDataItemsBuffered_ != 0) {
832       flushAllBuffers();
833     }    
834 #ifdef DEBUG_STREAMER
835     CkAssert(numDataItemsBuffered_ == 0); 
836 #endif
837     
838   }
839
840 #ifdef STREAMER_EXPERIMENTAL
841   // switch into immediate sending mode when 
842   // number of items buffered is small; avoid doing the switch 
843   // at the beginning before any sending has taken place
844   if (hasSentPreviously_ && 
845       (numDataItemsBuffered_ < .1 * maxNumDataItemsBuffered_)) {
846     immediateMode_ = true; 
847   } 
848
849   if (!hasSentPreviously_) {
850     hasSentPreviously_ = hasSentRecently_; 
851   }
852 #endif
853
854   hasSentRecently_ = false; 
855
856 }
857
858 template <class dtype>
859 void periodicProgressFunction(void *MeshStreamerObj, double time) {
860
861   MeshStreamer<dtype> *properObj = 
862     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
863
864   if (properObj->isPeriodicFlushEnabled()) {
865     properObj->flushDirect();
866     properObj->registerPeriodicProgressFunction();
867   }
868 }
869
870 template <class dtype>
871 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
872   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
873                  progressPeriodInMs_); 
874 }
875
876
877 template <class dtype>
878 class GroupMeshStreamer : public MeshStreamer<dtype> {
879 private:
880
881   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
882   MeshStreamerGroupClient<dtype> *clientObj_;
883
884   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
885     for (int i = 0; i < msg->numDataItems; i++) {
886       dtype &data = msg->getDataItem(i);
887       clientObj_->process(data);
888     }
889 #ifdef STAGED_COMPLETION
890     envelope *env = UsrToEnv(msg);
891     MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
892 #ifdef DEBUG_STREAMER
893     CkAssert(env->getSrcPe() >= 0 && env->getSrcPe() < CkNumPes()); 
894 #endif
895 #ifdef STREAMER_VERBOSE_OUTPUT
896     CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", CkMyPe(), env->getSrcPe(), msg->numDataItems, msg->finalMsgCount);  
897 #endif
898     markMessageReceived(sourceLocation.dimension, msg->finalMsgCount); 
899 #else 
900     this->detectorLocalObj_->consume(msg->numDataItems);    
901 #endif
902     delete msg;
903   }
904
905   void localDeliver(dtype &dataItem) {
906     clientObj_->process(dataItem);
907 #ifndef STAGED_COMPLETION
908     MeshStreamer<dtype>::detectorLocalObj_->consume();
909 #endif
910   }
911
912   int numElementsInClient() {
913     // client is a group - there is one element per PE
914     return CkNumPes();
915   }
916
917   int numLocalElementsInClient() {
918     return 1; 
919   }
920
921   void initLocalClients() {
922     // no action required
923   }
924
925 public:
926
927   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
928                     int *dimensionSizes, 
929                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
930                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
931    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
932                          yieldFlag, progressPeriodInMs) 
933   {
934     clientProxy_ = clientProxy; 
935     clientObj_ = 
936       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
937   }
938
939 };
940
941 template <class dtype>
942 class MeshStreamerClientIterator : public CkLocIterator {
943
944 public:
945   
946   CompletionDetector *detectorLocalObj_;
947   CkArray *clientArrMgr_;
948   MeshStreamerClientIterator(CompletionDetector *detectorObj, 
949                              CkArray *clientArrMgr) 
950     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
951
952   // CkLocMgr::iterate will call addLocation on all elements local to this PE
953   void addLocation(CkLocation &loc) {
954
955     MeshStreamerArrayClient<dtype> *clientObj = 
956       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
957
958 #ifdef DEBUG_STREAMER
959     CkAssert(clientObj != NULL); 
960 #endif
961     clientObj->setDetector(detectorLocalObj_); 
962   }
963
964 };
965
966 template <class dtype, class itype>
967 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
968   
969 private:
970   
971   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
972   CkArray *clientArrayMgr_;
973   int numArrayElements_;
974   int numLocalArrayElements_;
975 #ifdef CACHE_ARRAY_METADATA
976   MeshStreamerArrayClient<dtype> **clientObjs_;
977   int *destinationPes_;
978   bool *isCachedArrayMetadata_;
979 #endif
980
981   void localDeliver(ArrayDataItem<dtype, itype> &packedDataItem) {
982     itype arrayId = packedDataItem.arrayIndex; 
983
984     MeshStreamerArrayClient<dtype> *clientObj;
985 #ifdef CACHE_ARRAY_METADATA
986     clientObj = clientObjs_[arrayId];
987 #else
988     clientObj = clientProxy_[arrayId].ckLocal();
989 #endif
990
991     if (clientObj != NULL) {
992       clientObj->process(packedDataItem.dataItem);
993 #ifndef STAGED_COMPLETION
994       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->consume();
995 #endif
996     }
997     else { 
998       // array element is no longer present locally - redeliver using proxy
999       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
1000     }
1001   }
1002
1003   int numElementsInClient() {
1004     return numArrayElements_;
1005   }
1006
1007   int numLocalElementsInClient() {
1008     return numLocalArrayElements_;
1009   }
1010
1011   void initLocalClients() {
1012 #ifndef STAGED_COMPLETION
1013
1014   #ifdef CACHE_ARRAY_METADATA
1015     std::fill(isCachedArrayMetadata_, 
1016               isCachedArrayMetadata_ + numArrayElements_, false);
1017
1018     for (int i = 0; i < numArrayElements_; i++) {
1019       clientObjs_[i] = clientProxy_[i].ckLocal();
1020       if (clientObjs_[i] != NULL) {
1021         clientObjs_[i]->setDetector(
1022          MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
1023       }
1024     }
1025   #else
1026     // set completion detector in local elements of the client
1027     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1028     MeshStreamerClientIterator<dtype> clientIterator(
1029      MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1030      clientProxy_.ckLocalBranch());
1031     clientLocMgr->iterate(clientIterator);
1032   #endif    
1033
1034 #else 
1035     numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1036 #endif
1037   }
1038
1039 public:
1040
1041   struct DataItemHandle {
1042     itype arrayIndex; 
1043     dtype *dataItem;
1044   };
1045
1046   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1047                     int *dimensionSizes, 
1048                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
1049                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1050     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1051       maxNumDataItemsBuffered, numDimensions, dimensionSizes, yieldFlag, 
1052       progressPeriodInMs) 
1053   {
1054     clientProxy_ = clientProxy; 
1055     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1056
1057 #ifdef CACHE_ARRAY_METADATA
1058     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1059     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1060     destinationPes_ = new int[numArrayElements_];
1061     isCachedArrayMetadata_ = new bool[numArrayElements_];
1062     std::fill(isCachedArrayMetadata_, 
1063               isCachedArrayMetadata_ + numArrayElements_, false);
1064 #endif
1065   }
1066
1067   ~ArrayMeshStreamer() {
1068 #ifdef CACHE_ARRAY_METADATA
1069     delete [] clientObjs_;
1070     delete [] destinationPes_;
1071     delete [] isCachedArrayMetadata_; 
1072 #endif
1073   }
1074
1075   void receiveAtDestination(
1076        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1077
1078     for (int i = 0; i < msg->numDataItems; i++) {
1079       ArrayDataItem<dtype, itype> &packedData = msg->getDataItem(i);
1080       localDeliver(packedData);
1081     }
1082 #ifdef STAGED_COMPLETION
1083     envelope *env = UsrToEnv(msg);
1084     MeshLocation sourceLocation = this->determineLocation(env->getSrcPe());
1085     markMessageReceived(sourceLocation.dimension, msg->finalMsgCount);
1086 #endif
1087
1088     delete msg;
1089   }
1090
1091   void insertData(dtype &dataItem, itype arrayIndex) {
1092 #ifndef STAGED_COMPLETION
1093     MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1094 #endif
1095     int destinationPe; 
1096 #ifdef CACHE_ARRAY_METADATA
1097   if (isCachedArrayMetadata_[arrayIndex]) {    
1098     destinationPe =  destinationPes_[arrayIndex];
1099   }
1100   else {
1101     destinationPe = 
1102       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1103     isCachedArrayMetadata_[arrayIndex] = true;
1104     destinationPes_[arrayIndex] = destinationPe;
1105   }
1106 #else 
1107   destinationPe = 
1108     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1109 #endif
1110
1111   ArrayDataItem<dtype, itype> packedDataItem;
1112     if (destinationPe == CkMyPe()) {
1113       // copying here is necessary - user code should not be 
1114       // passed back a reference to the original item
1115       packedDataItem.arrayIndex = arrayIndex; 
1116       packedDataItem.dataItem = dataItem;
1117       localDeliver(packedDataItem);
1118       return;
1119     }
1120
1121     // this implementation avoids copying an item before transfer into message
1122
1123     DataItemHandle tempHandle; 
1124     tempHandle.arrayIndex = arrayIndex; 
1125     tempHandle.dataItem = &dataItem;
1126
1127     MeshStreamer<ArrayDataItem<dtype, itype> >::
1128      insertData(&tempHandle, destinationPe);
1129
1130   }
1131
1132   int copyDataItemIntoMessage(
1133       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1134       void *dataItemHandle, bool copyIndirectly) {
1135
1136     if (copyIndirectly == true) {
1137       // newly inserted items are passed through a handle to avoid copying
1138       int numDataItems = destinationBuffer->numDataItems;
1139       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
1140       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1141         *(tempHandle->dataItem);
1142       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1143         tempHandle->arrayIndex;
1144       return ++destinationBuffer->numDataItems;
1145     }
1146     else {
1147       // this is an item received along the route to destination
1148       // we can copy it from the received message
1149       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1150               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1151     }
1152   }
1153
1154 };
1155
1156 #define CK_TEMPLATES_ONLY
1157 #include "NDMeshStreamer.def.h"
1158 #undef CK_TEMPLATES_ONLY
1159
1160 #endif