7dd98e6587f7bdf28b461caadd45cc12fcd39a5c
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7 #include "completion.h"
8 #include "ckarray.h"
9
10 // limit total number of buffered data items to
11 // maxNumDataItemsBuffered_ (flush when limit is reached) but allow
12 // allocation of up to a factor of OVERALLOCATION_FACTOR more space to
13 // take advantage of nonuniform filling of buffers
14 #define OVERALLOCATION_FACTOR 4
15
16 // #define CACHE_LOCATIONS
17 // #define SUPPORT_INCOMPLETE_MESH
18 // #define CACHE_ARRAY_METADATA // only works for 1D array clients
19 // #define STREAMER_VERBOSE_OUTPUT
20
21 #define TRAM_BROADCAST (-100)
22
23 struct MeshLocation {
24   int dimension; 
25   int bufferIndex; 
26 }; 
27
28 template<class dtype>
29 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
30
31 public:
32
33   int finalMsgCount; 
34   int dimension; 
35   int numDataItems;
36   int *destinationPes;
37   dtype *dataItems;
38
39   MeshStreamerMessage(int dim): numDataItems(0), dimension(dim) {
40     finalMsgCount = -1; 
41   }
42
43   int addDataItem(const dtype& dataItem) {
44     dataItems[numDataItems] = dataItem;
45     return ++numDataItems; 
46   }
47
48   void markDestination(const int index, const int destinationPe) {
49     destinationPes[index] = destinationPe;
50   }
51
52   const dtype& getDataItem(const int index) {
53     return dataItems[index];
54   }
55
56 };
57
58 template <class dtype>
59 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype>{
60 private:
61   CompletionDetector *detectorLocalObj_;
62 public:
63   MeshStreamerArrayClient(){
64     detectorLocalObj_ = NULL; 
65   }
66   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
67   // would like to make it pure virtual but charm will try to
68   // instantiate the abstract class, leading to errors
69   virtual void process(const dtype& data) {
70     CkAbort("Error. MeshStreamerArrayClient::process() is being called. "
71             "This virtual function should have been defined by the user.\n");
72   };     
73   void setDetector(CompletionDetector *detectorLocalObj) {
74     detectorLocalObj_ = detectorLocalObj;
75   }
76   void receiveRedeliveredItem(dtype data) {
77 #ifdef STREAMER_VERBOSE_OUTPUT
78     CkPrintf("[%d] redelivered to index %d\n", 
79              CkMyPe(), this->thisIndex.data[0]);
80 #endif
81     if (detectorLocalObj_ != NULL) {
82       detectorLocalObj_->consume();
83     }
84     process(data);
85   }
86
87   void pup(PUP::er& p) {
88     CBase_MeshStreamerArrayClient<dtype>::pup(p);
89    }  
90
91 };
92
93 template <class dtype>
94 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype>{
95
96 public:
97   virtual void process(const dtype& data) = 0;
98
99 };
100
101 template <class dtype>
102 class MeshStreamer : public CBase_MeshStreamer<dtype> {
103
104 private:
105   int bufferSize_; 
106   int maxNumDataItemsBuffered_;
107   int numDataItemsBuffered_;
108
109   int numMembers_; 
110   int *individualDimensionSizes_;
111   int *combinedDimensionSizes_;
112
113   int *startingIndexAtDimension_; 
114
115   int myIndex_;
116   int *myLocationIndex_;
117
118   CkCallback   userCallback_;
119   bool yieldFlag_;
120
121   double progressPeriodInMs_; 
122   bool isPeriodicFlushEnabled_; 
123   bool hasSentRecently_;
124   MeshStreamerMessage<dtype> ***dataBuffers_;
125
126   CProxy_CompletionDetector detector_;
127   int prio_;
128   int yieldCount_;
129
130 #ifdef CACHE_LOCATIONS
131   MeshLocation *cachedLocations_;
132   bool *isCached_; 
133 #endif
134
135
136   // only used for staged completion
137   int **cntMsgSent_;
138   int *cntMsgReceived_;
139   int *cntMsgExpected_;
140   int *cntFinished_; 
141   int dimensionToFlush_;
142   int numLocalDone_; 
143   int numLocalContributors_;
144
145   void storeMessage(int destinationPe, 
146                     const MeshLocation& destinationCoordinates, 
147                     const void *dataItem, bool copyIndirectly = false);
148   virtual void localDeliver(const dtype& dataItem) = 0; 
149   virtual void localBroadcast(const dtype& dataItem) = 0; 
150   virtual int numElementsInClient() = 0;
151   virtual int numLocalElementsInClient() = 0; 
152
153   virtual void initLocalClients() = 0;
154
155   void sendLargestBuffer();
156   void flushToIntermediateDestinations();
157   void flushDimension(int dimension, bool sendMsgCounts = false); 
158   MeshLocation determineLocation(int destinationPe, 
159                                  int dimensionReceivedAlong);
160
161 protected:
162
163   int numDimensions_;
164   bool useStagedCompletion_;
165   CompletionDetector *detectorLocalObj_;
166   virtual int copyDataItemIntoMessage(
167               MeshStreamerMessage<dtype> *destinationBuffer, 
168               const void *dataItemHandle, bool copyIndirectly = false);
169   void insertData(const void *dataItemHandle, int destinationPe);
170   void broadcast(const void *dataItemHandle, int dimension, 
171                  bool copyIndirectly);
172
173 public:
174
175   MeshStreamer(int maxNumDataItemsBuffered, int numDimensions, 
176                int *dimensionSizes, int bufferSize,
177                bool yieldFlag = 0, double progressPeriodInMs = -1.0);
178   ~MeshStreamer();
179
180   // entry
181   void init(int numLocalContributors, CkCallback startCb, CkCallback endCb, 
182             int prio, bool usePeriodicFlushing);
183   void init(int numContributors, CkCallback startCb, CkCallback endCb, 
184             CProxy_CompletionDetector detector,
185             int prio, bool usePeriodicFlushing);
186   void init(CkArrayID senderArrayID, CkCallback startCb, CkCallback endCb, 
187             int prio, bool usePeriodicFlushing);
188
189   void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
190   virtual void receiveAtDestination(MeshStreamerMessage<dtype> *msg) = 0;
191   void flushIfIdle();
192   void finish();
193
194   // non entry
195   bool isPeriodicFlushEnabled() {
196     return isPeriodicFlushEnabled_;
197   }
198   virtual void insertData(const dtype& dataItem, int destinationPe); 
199   virtual void broadcast(const dtype& dataItem); 
200   void registerPeriodicProgressFunction();
201
202   // flushing begins only after enablePeriodicFlushing has been invoked
203
204   void enablePeriodicFlushing(){
205     isPeriodicFlushEnabled_ = true; 
206     registerPeriodicProgressFunction();
207   }
208
209   void done(int numContributorsFinished = 1) {
210
211     if (useStagedCompletion_) {
212       numLocalDone_ += numContributorsFinished; 
213       if (numLocalDone_ == numLocalContributors_) {
214         startStagedCompletion();
215       }
216     }
217     else {
218       detectorLocalObj_->done(numContributorsFinished);
219     }
220   }
221   
222   bool stagedCompletionStarted() {    
223     return (useStagedCompletion_ && dimensionToFlush_ != numDimensions_ - 1); 
224   }
225
226   void startStagedCompletion() {          
227     if (individualDimensionSizes_[dimensionToFlush_] != 1) {
228       flushDimension(dimensionToFlush_, true);
229     }
230     dimensionToFlush_--;
231
232     checkForCompletedStages();
233   }
234
235   void markMessageReceived(int dimension, int finalCount) {
236     cntMsgReceived_[dimension]++;
237     if (finalCount != -1) {
238       cntFinished_[dimension]++; 
239       cntMsgExpected_[dimension] += finalCount; 
240 #ifdef STREAMER_VERBOSE_OUTPUT
241       CkPrintf("[%d] received dimension: %d finalCount: %d cntFinished: %d "
242                "cntMsgExpected: %d cntMsgReceived: %d\n", CkMyPe(), dimension, 
243                finalCount, cntFinished_[dimension], cntMsgExpected_[dimension],
244                cntMsgReceived_[dimension]); 
245 #endif
246     }  
247     if (dimensionToFlush_ != numDimensions_ - 1) {
248       checkForCompletedStages();
249     }
250   }
251
252   void checkForCompletedStages() {
253
254     while (cntFinished_[dimensionToFlush_ + 1] == 
255            individualDimensionSizes_[dimensionToFlush_ + 1] - 1 &&
256            cntMsgExpected_[dimensionToFlush_ + 1] == 
257            cntMsgReceived_[dimensionToFlush_ + 1]) {
258       if (dimensionToFlush_ == -1) {
259 #ifdef STREAMER_VERBOSE_OUTPUT
260         CkPrintf("[%d] contribute\n", CkMyPe()); 
261 #endif
262         CkAssert(numDataItemsBuffered_ == 0); 
263         isPeriodicFlushEnabled_ = false; 
264         if (!userCallback_.isInvalid()) {
265           this->contribute(userCallback_);
266           userCallback_ = CkCallback();
267         }
268         return; 
269       }
270       else if (individualDimensionSizes_[dimensionToFlush_] != 1) {
271         flushDimension(dimensionToFlush_, true); 
272       }
273       dimensionToFlush_--; 
274     }
275   }
276
277 };
278
279 template <class dtype>
280 MeshStreamer<dtype>::MeshStreamer(
281                      int maxNumDataItemsBuffered, int numDimensions, 
282                      int *dimensionSizes, 
283                      int bufferSize,
284                      bool yieldFlag, 
285                      double progressPeriodInMs)
286  :numDimensions_(numDimensions), 
287   maxNumDataItemsBuffered_(maxNumDataItemsBuffered), 
288   yieldFlag_(yieldFlag), 
289   progressPeriodInMs_(progressPeriodInMs), 
290   bufferSize_(bufferSize)
291 {
292
293   int sumAlongAllDimensions = 0;   
294   individualDimensionSizes_ = new int[numDimensions_];
295   combinedDimensionSizes_ = new int[numDimensions_ + 1];
296   myLocationIndex_ = new int[numDimensions_];
297   startingIndexAtDimension_ = new int[numDimensions_ + 1]; 
298   memcpy(individualDimensionSizes_, dimensionSizes, 
299          numDimensions * sizeof(int)); 
300   combinedDimensionSizes_[0] = 1; 
301   for (int i = 0; i < numDimensions; i++) {
302     sumAlongAllDimensions += individualDimensionSizes_[i];
303     combinedDimensionSizes_[i + 1] = 
304       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
305   }
306
307   CkAssert(combinedDimensionSizes_[numDimensions] == CkNumPes()); 
308
309   // a bufferSize input of 0 indicates it should be calculated by the library
310   if (bufferSize_ == 0) {
311     CkAssert(maxNumDataItemsBuffered_ > 0);
312     // except for personalized messages, the buffers for dimensions with the 
313     //   same index as the sender's are not used
314     bufferSize_ = OVERALLOCATION_FACTOR * maxNumDataItemsBuffered_ 
315       / (sumAlongAllDimensions - numDimensions_ + 1); 
316   }
317   else {
318     maxNumDataItemsBuffered_ = 
319       bufferSize_ * (sumAlongAllDimensions - numDimensions_ + 1);
320   }
321
322   if (bufferSize_ <= 0) {
323     bufferSize_ = 1; 
324     CkPrintf("Argument maxNumDataItemsBuffered to MeshStreamer constructor "
325              "is invalid. Defaulting to a single buffer per destination.\n");
326   }
327   numDataItemsBuffered_ = 0; 
328   numMembers_ = CkNumPes(); 
329
330   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
331   for (int i = 0; i < numDimensions; i++) {
332     int numMembersAlongDimension = individualDimensionSizes_[i]; 
333     dataBuffers_[i] = 
334       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
335     for (int j = 0; j < numMembersAlongDimension; j++) {
336       dataBuffers_[i][j] = NULL;
337     }
338   }
339
340   myIndex_ = CkMyPe();
341   int remainder = myIndex_;
342   startingIndexAtDimension_[numDimensions_] = 0;
343   for (int i = numDimensions_ - 1; i >= 0; i--) {    
344     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
345     int dimensionOffset = combinedDimensionSizes_[i] * myLocationIndex_[i];
346     remainder -= dimensionOffset; 
347     startingIndexAtDimension_[i] = 
348       startingIndexAtDimension_[i+1] + dimensionOffset; 
349   }
350
351   isPeriodicFlushEnabled_ = false; 
352   detectorLocalObj_ = NULL;
353
354 #ifdef CACHE_LOCATIONS
355   cachedLocations_ = new MeshLocation[numMembers_];
356   isCached_ = new bool[numMembers_];
357   std::fill(isCached_, isCached_ + numMembers_, false);
358 #endif
359
360   cntMsgSent_ = NULL; 
361   cntMsgReceived_ = NULL; 
362   cntMsgExpected_ = NULL; 
363   cntFinished_ = NULL; 
364
365 }
366
367 template <class dtype>
368 MeshStreamer<dtype>::~MeshStreamer() {
369
370   for (int i = 0; i < numDimensions_; i++) {
371     for (int j=0; j < individualDimensionSizes_[i]; j++) {
372       delete[] dataBuffers_[i][j]; 
373     }
374     delete[] dataBuffers_[i]; 
375   }
376
377   delete[] individualDimensionSizes_;
378   delete[] combinedDimensionSizes_; 
379   delete[] myLocationIndex_;
380   delete[] startingIndexAtDimension_;
381
382 #ifdef CACHE_LOCATIONS
383   delete[] cachedLocations_;
384   delete[] isCached_; 
385 #endif
386
387   if (cntMsgSent_ != NULL) {
388     for (int i = 0; i < numDimensions_; i++) {
389       delete[] cntMsgSent_[i]; 
390     }
391     delete[] cntMsgSent_; 
392     delete[] cntMsgReceived_; 
393     delete[] cntMsgExpected_; 
394     delete[] cntFinished_;
395   }
396
397 }
398
399 template <class dtype>
400 inline
401 MeshLocation MeshStreamer<dtype>::determineLocation(
402                                   int destinationPe, 
403                                   int dimensionReceivedAlong) {
404
405 #ifdef CACHE_LOCATIONS
406   if (isCached_[destinationPe]) {    
407     return cachedLocations_[destinationPe]; 
408   }
409 #endif
410
411   MeshLocation destinationLocation;
412   int remainder = 
413     destinationPe - startingIndexAtDimension_[dimensionReceivedAlong];
414   int dimensionIndex; 
415   for (int i = dimensionReceivedAlong - 1; i >= 0; i--) {        
416     dimensionIndex = remainder / combinedDimensionSizes_[i];
417     
418     if (dimensionIndex != myLocationIndex_[i]) {
419       destinationLocation.dimension = i; 
420       destinationLocation.bufferIndex = dimensionIndex; 
421 #ifdef CACHE_LOCATIONS
422       cachedLocations_[destinationPe] = destinationLocation;
423       isCached_[destinationPe] = true; 
424 #endif
425       return destinationLocation;
426     }
427
428     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
429   }
430
431   CkAbort("Error. MeshStreamer::determineLocation called with destinationPe "
432           "equal to sender's PE. This is unexpected and may cause errors.\n"); 
433   // to prevent warnings
434   return destinationLocation; 
435 }
436
437 template <class dtype>
438 inline 
439 int MeshStreamer<dtype>::copyDataItemIntoMessage(
440                          MeshStreamerMessage<dtype> *destinationBuffer,
441                          const void *dataItemHandle, bool copyIndirectly) {
442   return destinationBuffer->addDataItem(*((const dtype *)dataItemHandle)); 
443 }
444
445 template <class dtype>
446 inline
447 void MeshStreamer<dtype>::storeMessage(
448                           int destinationPe, 
449                           const MeshLocation& destinationLocation,
450                           const void *dataItem, bool copyIndirectly) {
451
452   int dimension = destinationLocation.dimension;
453   int bufferIndex = destinationLocation.bufferIndex; 
454   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
455
456   // allocate new message if necessary
457   if (messageBuffers[bufferIndex] == NULL) {
458     if (dimension == 0) {
459       // personalized messages do not require destination indices
460       messageBuffers[bufferIndex] = 
461         new (0, bufferSize_, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
462     }
463     else {
464       messageBuffers[bufferIndex] = 
465         new (bufferSize_, bufferSize_, sizeof(int)) 
466         MeshStreamerMessage<dtype>(dimension);
467     }
468     *(int *) CkPriorityPtr(messageBuffers[bufferIndex]) = prio_;
469     CkSetQueueing(messageBuffers[bufferIndex], CK_QUEUEING_IFIFO);
470     CkAssert(messageBuffers[bufferIndex] != NULL);
471   }
472   
473   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
474   int numBuffered = 
475     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
476   if (dimension != 0) {
477     destinationBuffer->markDestination(numBuffered-1, destinationPe);
478   }
479   numDataItemsBuffered_++;
480
481   // send if buffer is full
482   if (numBuffered == bufferSize_) {
483
484     int destinationIndex;
485
486     destinationIndex = myIndex_ + 
487       (bufferIndex - myLocationIndex_[dimension]) * 
488       combinedDimensionSizes_[dimension];
489
490     if (dimension == 0) {
491 #ifdef STREAMER_VERBOSE_OUTPUT
492       CkPrintf("[%d] sending to %d\n", CkMyPe(), destinationIndex); 
493 #endif
494       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
495     }
496     else {
497 #ifdef STREAMER_VERBOSE_OUTPUT
498       CkPrintf("[%d] sending intermediate to %d\n", 
499                CkMyPe(), destinationIndex); 
500 #endif
501       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
502     }
503
504     if (useStagedCompletion_) {
505       cntMsgSent_[dimension][bufferIndex]++; 
506     }
507
508     messageBuffers[bufferIndex] = NULL;
509     numDataItemsBuffered_ -= numBuffered; 
510     hasSentRecently_ = true; 
511
512   }
513   // send if total buffering capacity has been reached
514   else if (numDataItemsBuffered_ == maxNumDataItemsBuffered_) {
515     sendLargestBuffer();
516     hasSentRecently_ = true; 
517   }
518
519 }
520 template <class dtype>
521 inline
522 void MeshStreamer<dtype>::broadcast(const dtype& dataItem) {
523   const static bool copyIndirectly = true;
524
525   // no data items should be submitted after all local contributors call done 
526   // and staged completion has begun
527   CkAssert(stagedCompletionStarted() == false);
528
529   // produce and consume once per PE
530   if (!useStagedCompletion_) {
531     detectorLocalObj_->produce(CkNumPes());
532   }
533
534   // deliver locally
535   localBroadcast(dataItem);
536
537   broadcast(&dataItem, numDimensions_ - 1, copyIndirectly); 
538 }
539
540 template <class dtype>
541 inline
542 void MeshStreamer<dtype>::broadcast(const void *dataItemHandle, int dimension, 
543                                     bool copyIndirectly) {
544
545   MeshLocation destinationLocation;
546   destinationLocation.dimension = dimension; 
547
548   while (destinationLocation.dimension != -1) {
549     for (int i = 0; 
550          i < individualDimensionSizes_[destinationLocation.dimension]; 
551          i++) {
552
553       if (i != myLocationIndex_[destinationLocation.dimension]) {
554         destinationLocation.bufferIndex = i; 
555         storeMessage(TRAM_BROADCAST, destinationLocation, 
556                      dataItemHandle, copyIndirectly); 
557       }
558       // release control to scheduler if requested by the user, 
559       //   assume caller is threaded entry
560       if (yieldFlag_ && ++yieldCount_ == 1024) {
561         yieldCount_ = 0; 
562         CthYield();
563       }
564     }
565     destinationLocation.dimension--; 
566   }
567
568 }
569
570
571 template <class dtype>
572 inline
573 void MeshStreamer<dtype>::insertData(const void *dataItemHandle, 
574                                      int destinationPe) {
575   const static bool copyIndirectly = true;
576
577   // treat newly inserted items as if they were received along
578   // a higher dimension (e.g. for a 3D mesh, received along 4th dimension)
579   MeshLocation destinationLocation = determineLocation(destinationPe, 
580                                                        numDimensions_);
581   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
582                copyIndirectly); 
583   // release control to scheduler if requested by the user, 
584   //   assume caller is threaded entry
585   if (yieldFlag_ && ++yieldCount_ == 1024) {
586     yieldCount_ = 0; 
587     CthYield();
588   }
589
590 }
591
592 template <class dtype>
593 inline
594 void MeshStreamer<dtype>::insertData(const dtype& dataItem, int destinationPe) {
595
596   // no data items should be submitted after all local contributors call done 
597   // and staged completion has begun
598   CkAssert(stagedCompletionStarted() == false);
599
600   if (!useStagedCompletion_) {
601     detectorLocalObj_->produce();
602   }
603   if (destinationPe == CkMyPe()) {
604     localDeliver(dataItem);
605     return;
606   }
607
608   insertData((const void *) &dataItem, destinationPe);
609 }
610
611 template <class dtype>
612 void MeshStreamer<dtype>::init(int numLocalContributors, CkCallback startCb, 
613                                CkCallback endCb, int prio, 
614                                bool usePeriodicFlushing) {
615   useStagedCompletion_ = true; 
616   // allocate memory on first use
617   if (cntMsgSent_ == NULL) {
618     cntMsgSent_ = new int*[numDimensions_]; 
619     cntMsgReceived_ = new int[numDimensions_];
620     cntMsgExpected_ = new int[numDimensions_];
621     cntFinished_ = new int[numDimensions_]; 
622
623     for (int i = 0; i < numDimensions_; i++) {
624       cntMsgSent_[i] = new int[individualDimensionSizes_[i]]; 
625     }
626   }
627
628
629   for (int i = 0; i < numDimensions_; i++) {
630     std::fill(cntMsgSent_[i], 
631               cntMsgSent_[i] + individualDimensionSizes_[i], 0);
632     cntMsgReceived_[i] = 0;
633     cntMsgExpected_[i] = 0; 
634     cntFinished_[i] = 0; 
635   }
636   dimensionToFlush_ = numDimensions_ - 1;
637
638   yieldCount_ = 0; 
639   userCallback_ = endCb; 
640   prio_ = prio;
641
642   numLocalDone_ = 0; 
643   numLocalContributors_ = numLocalContributors; 
644   initLocalClients();
645
646   if (numLocalContributors_ == 0) {
647     startStagedCompletion();
648   }
649
650   hasSentRecently_ = false;
651   if (usePeriodicFlushing) {
652     enablePeriodicFlushing();
653   }
654
655   this->contribute(startCb);
656 }
657
658 template <class dtype>
659 void MeshStreamer<dtype>::init(int numContributors,       
660                                CkCallback startCb, CkCallback endCb,
661                                CProxy_CompletionDetector detector, 
662                                int prio, bool usePeriodicFlushing) {
663
664   useStagedCompletion_ = false; 
665   yieldCount_ = 0; 
666   prio_ = prio;
667   userCallback_ = endCb; 
668   CkCallback flushCb(CkIndex_MeshStreamer<dtype>::flushIfIdle(), 
669                      this->thisProxy);
670   CkCallback finish(CkIndex_MeshStreamer<dtype>::finish(), 
671                     this->thisProxy);
672   detector_ = detector;      
673   detectorLocalObj_ = detector_.ckLocalBranch();
674   initLocalClients();
675
676   detectorLocalObj_->start_detection(numContributors, startCb, flushCb, 
677                                      finish , 0);
678   
679   if (progressPeriodInMs_ <= 0) {
680     CkPrintf("Using completion detection in NDMeshStreamer requires"
681              " setting a valid periodic flush period. Defaulting"
682              " to 10 ms\n");
683     progressPeriodInMs_ = 10;
684   }
685   
686   hasSentRecently_ = false; 
687   if (usePeriodicFlushing) {
688     enablePeriodicFlushing();
689   }
690
691 }
692
693 template <class dtype>
694 void MeshStreamer<dtype>::init(CkArrayID senderArrayID, CkCallback startCb, 
695             CkCallback endCb, int prio, bool usePeriodicFlushing) {
696     CkArray *senderArrayMgr = senderArrayID.ckLocalBranch();
697     int numLocalElements = senderArrayMgr->getLocMgr()->numLocalElements();
698     init(numLocalElements, startCb, endCb, prio, usePeriodicFlushing); 
699   }
700
701
702 template <class dtype>
703 void MeshStreamer<dtype>::finish() {
704   isPeriodicFlushEnabled_ = false; 
705
706   if (!userCallback_.isInvalid()) {
707     this->contribute(userCallback_);
708     userCallback_ = CkCallback();      // nullify the current callback
709   }
710
711 }
712
713 template <class dtype>
714 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
715
716   int destinationPe, lastDestinationPe; 
717   MeshLocation destinationLocation;
718
719   lastDestinationPe = -1;
720   for (int i = 0; i < msg->numDataItems; i++) {
721     destinationPe = msg->destinationPes[i];
722     const dtype& dataItem = msg->getDataItem(i);
723     if (destinationPe == CkMyPe()) {
724       localDeliver(dataItem);
725     }
726     else if (destinationPe != TRAM_BROADCAST) {
727       if (destinationPe != lastDestinationPe) {
728         // do this once per sequence of items with the same destination
729         destinationLocation = determineLocation(destinationPe, msg->dimension);
730       }
731       storeMessage(destinationPe, destinationLocation, &dataItem);   
732     }
733     else /* if (destinationPe == TRAM_BROADCAST) */ {
734       localBroadcast(dataItem);       
735       broadcast(&dataItem, msg->dimension - 1, false); 
736     }
737     lastDestinationPe = destinationPe; 
738   }
739
740   if (useStagedCompletion_) {
741     markMessageReceived(msg->dimension, msg->finalMsgCount); 
742   }
743
744   delete msg;
745
746 }
747
748 template <class dtype>
749 void MeshStreamer<dtype>::sendLargestBuffer() {
750
751   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
752   MeshStreamerMessage<dtype> ** messageBuffers; 
753   MeshStreamerMessage<dtype> *destinationBuffer; 
754
755   for (int i = 0; i < numDimensions_; i++) {
756
757     messageBuffers = dataBuffers_[i]; 
758     numBuffers = individualDimensionSizes_[i]; 
759
760     flushDimension = i; 
761     maxSize = 0;    
762     for (int j = 0; j < numBuffers; j++) {
763       if (messageBuffers[j] != NULL && 
764           messageBuffers[j]->numDataItems > maxSize) {
765         maxSize = messageBuffers[j]->numDataItems;
766         flushIndex = j; 
767       }
768     }
769
770     if (maxSize > 0) {
771
772       messageBuffers = dataBuffers_[flushDimension]; 
773       destinationBuffer = messageBuffers[flushIndex];
774       destinationIndex = myIndex_ + 
775         (flushIndex - myLocationIndex_[flushDimension]) * 
776         combinedDimensionSizes_[flushDimension] ;
777
778       // not sending the full buffer, shrink the message size
779       envelope *env = UsrToEnv(destinationBuffer);
780       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
781                         (bufferSize_ - destinationBuffer->numDataItems));
782       *((int *) env->getPrioPtr()) = prio_;
783
784       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
785
786       if (flushDimension == 0) {
787 #ifdef STREAMER_VERBOSE_OUTPUT
788         CkPrintf("[%d] sending flush to %d\n", CkMyPe(), destinationIndex); 
789 #endif
790         this->thisProxy[destinationIndex].
791           receiveAtDestination(destinationBuffer);
792       }
793       else {
794 #ifdef STREAMER_VERBOSE_OUTPUT
795         CkPrintf("[%d] sending intermediate flush to %d\n", 
796                  CkMyPe(), destinationIndex); 
797 #endif
798         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
799       }
800
801       if (useStagedCompletion_) {
802         cntMsgSent_[i][flushIndex]++; 
803       }
804
805       messageBuffers[flushIndex] = NULL;
806
807     }
808
809   }
810 }
811
812 template <class dtype>
813 void MeshStreamer<dtype>::flushToIntermediateDestinations() {
814   
815   for (int i = 0; i < numDimensions_; i++) {
816     flushDimension(i); 
817   }
818 }
819
820 template <class dtype>
821 void MeshStreamer<dtype>::flushDimension(int dimension, bool sendMsgCounts) {
822 #ifdef STREAMER_VERBOSE_OUTPUT
823   CkPrintf("[%d] flushDimension: %d, sendMsgCounts: %d\n", 
824            CkMyPe(), dimension, sendMsgCounts); 
825 #endif
826   MeshStreamerMessage<dtype> **messageBuffers; 
827   MeshStreamerMessage<dtype> *destinationBuffer; 
828   int destinationIndex, numBuffers; 
829
830   messageBuffers = dataBuffers_[dimension]; 
831   numBuffers = individualDimensionSizes_[dimension]; 
832
833   for (int j = 0; j < numBuffers; j++) {
834
835     if(messageBuffers[j] == NULL) {      
836       if (sendMsgCounts && j != myLocationIndex_[dimension]) {
837         messageBuffers[j] = 
838           new (0, 0, sizeof(int)) MeshStreamerMessage<dtype>(dimension);
839         *(int *) CkPriorityPtr(messageBuffers[j]) = prio_;
840         CkSetQueueing(messageBuffers[j], CK_QUEUEING_IFIFO);
841       }
842       else {
843         continue; 
844       } 
845     }
846
847     destinationBuffer = messageBuffers[j];
848     destinationIndex = myIndex_ + 
849       (j - myLocationIndex_[dimension]) * 
850       combinedDimensionSizes_[dimension] ;
851
852     if (destinationBuffer->numDataItems != 0) {
853       // not sending the full buffer, shrink the message size
854       envelope *env = UsrToEnv(destinationBuffer);
855       env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
856                         (bufferSize_ - destinationBuffer->numDataItems));
857       *((int *) env->getPrioPtr()) = prio_;
858     }
859     numDataItemsBuffered_ -= destinationBuffer->numDataItems;
860
861     if (useStagedCompletion_) {
862       cntMsgSent_[dimension][j]++;
863       if (sendMsgCounts) {
864         destinationBuffer->finalMsgCount = cntMsgSent_[dimension][j];
865       }
866     }
867
868     if (dimension == 0) {
869 #ifdef STREAMER_VERBOSE_OUTPUT
870       CkPrintf("[%d] sending dimension flush to %d\n", 
871                CkMyPe(), destinationIndex); 
872 #endif
873       this->thisProxy[destinationIndex].receiveAtDestination(destinationBuffer);
874     }
875     else {
876 #ifdef STREAMER_VERBOSE_OUTPUT
877       CkPrintf("[%d] sending intermediate dimension flush to %d\n", 
878                CkMyPe(), destinationIndex); 
879 #endif
880       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
881     }
882     messageBuffers[j] = NULL;
883   }
884   
885 }
886
887
888 template <class dtype>
889 void MeshStreamer<dtype>::flushIfIdle(){
890   // flush if (1) this is not a periodic call or 
891   //          (2) this is a periodic call and no sending took place
892   //              since the last time the function was invoked
893   if (!isPeriodicFlushEnabled_ || !hasSentRecently_) {
894
895     if (numDataItemsBuffered_ != 0) {
896       flushToIntermediateDestinations();
897     }    
898     CkAssert(numDataItemsBuffered_ == 0); 
899     
900   }
901
902   hasSentRecently_ = false; 
903
904 }
905
906 template <class dtype>
907 void periodicProgressFunction(void *MeshStreamerObj, double time) {
908
909   MeshStreamer<dtype> *properObj = 
910     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
911
912   if (properObj->isPeriodicFlushEnabled()) {
913     properObj->flushIfIdle();
914     properObj->registerPeriodicProgressFunction();
915   }
916 }
917
918 template <class dtype>
919 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
920   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
921                  progressPeriodInMs_); 
922 }
923
924
925 template <class dtype>
926 class GroupMeshStreamer : public MeshStreamer<dtype> {
927 private:
928
929   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
930   MeshStreamerGroupClient<dtype> *clientObj_;
931
932   void receiveAtDestination(MeshStreamerMessage<dtype> *msg) {
933     for (int i = 0; i < msg->numDataItems; i++) {
934       const dtype& data = msg->getDataItem(i);
935       clientObj_->process(data);
936     }
937
938     if (MeshStreamer<dtype>::useStagedCompletion_) {
939 #ifdef STREAMER_VERBOSE_OUTPUT
940       envelope *env = UsrToEnv(msg);
941       CkPrintf("[%d] received at dest from %d %d items finalMsgCount: %d\n", 
942                CkMyPe(), env->getSrcPe(), msg->numDataItems, 
943                msg->finalMsgCount);  
944 #endif
945       markMessageReceived(msg->dimension, msg->finalMsgCount); 
946     }
947     else {
948       this->detectorLocalObj_->consume(msg->numDataItems);    
949     }
950
951     delete msg;
952   }
953
954   void localDeliver(const dtype& dataItem) {
955     clientObj_->process(dataItem);
956     if (MeshStreamer<dtype>::useStagedCompletion_ == false) {
957       MeshStreamer<dtype>::detectorLocalObj_->consume();
958     }
959   }
960
961   void localBroadcast(const dtype& dataItem) {
962     localDeliver(dataItem); 
963   }
964
965   int numElementsInClient() {
966     // client is a group - there is one element per PE
967     return CkNumPes();
968   }
969
970   int numLocalElementsInClient() {
971     return 1; 
972   }
973
974   void initLocalClients() {
975     // no action required
976   }
977
978 public:
979
980   GroupMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
981                     int *dimensionSizes, 
982                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
983                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
984    :MeshStreamer<dtype>(maxNumDataItemsBuffered, numDimensions, dimensionSizes,
985                         0, yieldFlag, progressPeriodInMs) 
986   {
987     clientProxy_ = clientProxy; 
988     clientObj_ = 
989       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
990   }
991
992   GroupMeshStreamer(int numDimensions, int *dimensionSizes, 
993                     const CProxy_MeshStreamerGroupClient<dtype>& clientProxy,
994                     int bufferSize, bool yieldFlag = 0, 
995                     double progressPeriodInMs = -1.0)
996    :MeshStreamer<dtype>(0, numDimensions, dimensionSizes, bufferSize, 
997                         yieldFlag, progressPeriodInMs) 
998   {
999     clientProxy_ = clientProxy; 
1000     clientObj_ = 
1001       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
1002   }
1003
1004 };
1005
1006 template <class dtype>
1007 class ClientInitializer : public CkLocIterator {
1008
1009 public:
1010   
1011   CompletionDetector *detectorLocalObj_;
1012   CkArray *clientArrMgr_;
1013   ClientInitializer(CompletionDetector *detectorObj, 
1014                              CkArray *clientArrMgr) 
1015     : detectorLocalObj_(detectorObj), clientArrMgr_(clientArrMgr) {}
1016
1017   // CkLocMgr::iterate will call addLocation on all elements local to this PE
1018   void addLocation(CkLocation& loc) {
1019
1020     MeshStreamerArrayClient<dtype> *clientObj = 
1021       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1022
1023     CkAssert(clientObj != NULL); 
1024     clientObj->setDetector(detectorLocalObj_); 
1025   }
1026
1027 };
1028
1029 template <class dtype>
1030 class LocalBroadcaster : public CkLocIterator {
1031
1032 public:
1033   CkArray *clientArrMgr_;
1034   const dtype *dataItem_; 
1035
1036   LocalBroadcaster(CkArray *clientArrMgr, const dtype *dataItem) 
1037    : clientArrMgr_(clientArrMgr), dataItem_(dataItem) {}
1038
1039   void addLocation(CkLocation& loc) {
1040     MeshStreamerArrayClient<dtype> *clientObj = 
1041       (MeshStreamerArrayClient<dtype> *) clientArrMgr_->lookup(loc.getIndex());
1042
1043     CkAssert(clientObj != NULL); 
1044
1045     clientObj->process(*dataItem_); 
1046   }
1047
1048 };
1049
1050 template <class dtype, class itype>
1051 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype, itype> > {
1052   
1053 private:
1054   
1055   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
1056   CkArray *clientArrayMgr_;
1057   int numArrayElements_;
1058   int numLocalArrayElements_;
1059 #ifdef CACHE_ARRAY_METADATA
1060   MeshStreamerArrayClient<dtype> **clientObjs_;
1061   int *destinationPes_;
1062   bool *isCachedArrayMetadata_;
1063 #endif
1064
1065   void localDeliver(const ArrayDataItem<dtype, itype>& packedDataItem) {
1066     itype arrayId = packedDataItem.arrayIndex; 
1067     if (arrayId == itype(TRAM_BROADCAST)) {
1068       localBroadcast(packedDataItem);
1069       return;
1070     }
1071     MeshStreamerArrayClient<dtype> *clientObj;
1072 #ifdef CACHE_ARRAY_METADATA
1073     clientObj = clientObjs_[arrayId];
1074 #else
1075     clientObj = clientProxy_[arrayId].ckLocal();
1076 #endif
1077
1078     if (clientObj != NULL) {
1079       clientObj->process(packedDataItem.dataItem);
1080       if (MeshStreamer<ArrayDataItem<dtype, itype> >
1081            ::useStagedCompletion_ == false) {
1082         MeshStreamer<ArrayDataItem<dtype, itype> >
1083          ::detectorLocalObj_->consume();
1084       }
1085     }
1086     else { 
1087       // array element is no longer present locally - redeliver using proxy
1088       clientProxy_[arrayId].receiveRedeliveredItem(packedDataItem.dataItem);
1089     }
1090   }
1091
1092   void localBroadcast(const ArrayDataItem<dtype, itype>& packedDataItem) {
1093
1094     LocalBroadcaster<dtype> clientIterator(clientProxy_.ckLocalBranch(), 
1095                                            &packedDataItem.dataItem);
1096     CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1097     clientLocMgr->iterate(clientIterator);
1098
1099     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1100          ::useStagedCompletion_ == false) {
1101         MeshStreamer<ArrayDataItem<dtype, itype> >
1102          ::detectorLocalObj_->consume();      
1103     }
1104
1105   }
1106
1107   int numElementsInClient() {
1108     return numArrayElements_;
1109   }
1110
1111   int numLocalElementsInClient() {
1112     return numLocalArrayElements_;
1113   }
1114
1115   void initLocalClients() {
1116     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1117          ::useStagedCompletion_ == false) {
1118 #ifdef CACHE_ARRAY_METADATA
1119       std::fill(isCachedArrayMetadata_, 
1120                 isCachedArrayMetadata_ + numArrayElements_, false);
1121
1122       for (int i = 0; i < numArrayElements_; i++) {
1123         clientObjs_[i] = clientProxy_[i].ckLocal();
1124         if (clientObjs_[i] != NULL) {
1125           clientObjs_[i]->setDetector(
1126            MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_);
1127         }
1128       }
1129 #else
1130       // set completion detector in local elements of the client
1131       CkLocMgr *clientLocMgr = clientProxy_.ckLocMgr(); 
1132       ClientInitializer<dtype> clientIterator(
1133           MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_, 
1134           clientProxy_.ckLocalBranch());
1135       clientLocMgr->iterate(clientIterator);
1136 #endif    
1137     }
1138     else {
1139       numLocalArrayElements_ = clientProxy_.ckLocMgr()->numLocalElements();
1140     }
1141   }
1142
1143   void commonInit() {
1144 #ifdef CACHE_ARRAY_METADATA
1145     numArrayElements_ = (clientArrayMgr_->getNumInitial()).data()[0];
1146     clientObjs_ = new MeshStreamerArrayClient<dtype>*[numArrayElements_];
1147     destinationPes_ = new int[numArrayElements_];
1148     isCachedArrayMetadata_ = new bool[numArrayElements_];
1149     std::fill(isCachedArrayMetadata_, 
1150               isCachedArrayMetadata_ + numArrayElements_, false);
1151 #endif    
1152   }
1153
1154 public:
1155
1156   struct DataItemHandle {
1157     itype arrayIndex; 
1158     const dtype *dataItem;
1159   };
1160
1161   ArrayMeshStreamer(int maxNumDataItemsBuffered, int numDimensions,
1162                     int *dimensionSizes, 
1163                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1164                     bool yieldFlag = 0, double progressPeriodInMs = -1.0)
1165     :MeshStreamer<ArrayDataItem<dtype, itype> >(
1166                   maxNumDataItemsBuffered, numDimensions, dimensionSizes, 
1167                   0, yieldFlag, progressPeriodInMs) 
1168   {
1169     clientProxy_ = clientProxy; 
1170     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1171     commonInit();
1172   }
1173
1174   ArrayMeshStreamer(int numDimensions, int *dimensionSizes, 
1175                     const CProxy_MeshStreamerArrayClient<dtype>& clientProxy,
1176                     int bufferSize, bool yieldFlag = 0, 
1177                     double progressPeriodInMs = -1.0)
1178     :MeshStreamer<ArrayDataItem<dtype,itype> >(
1179                   0, numDimensions, dimensionSizes, 
1180                   bufferSize, yieldFlag, progressPeriodInMs) 
1181   {
1182     clientProxy_ = clientProxy; 
1183     clientArrayMgr_ = clientProxy_.ckLocalBranch();
1184     commonInit();
1185
1186   }
1187
1188   ~ArrayMeshStreamer() {
1189 #ifdef CACHE_ARRAY_METADATA
1190     delete [] clientObjs_;
1191     delete [] destinationPes_;
1192     delete [] isCachedArrayMetadata_; 
1193 #endif
1194   }
1195
1196   void receiveAtDestination(
1197        MeshStreamerMessage<ArrayDataItem<dtype, itype> > *msg) {
1198
1199     for (int i = 0; i < msg->numDataItems; i++) {
1200       const ArrayDataItem<dtype, itype>& packedData = msg->getDataItem(i);
1201       localDeliver(packedData);
1202     }
1203     if (MeshStreamer<ArrayDataItem<dtype, itype> >::useStagedCompletion_) {
1204       markMessageReceived(msg->dimension, msg->finalMsgCount);
1205     }
1206
1207     delete msg;
1208   }
1209
1210   void broadcast(const dtype& dataItem) {
1211     const static bool copyIndirectly = true;
1212
1213     // no data items should be submitted after all local contributors call done
1214     // and staged completion has begun
1215     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1216                ::stagedCompletionStarted()) == false);
1217
1218     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1219          ::useStagedCompletion_ == false) {
1220       MeshStreamer<ArrayDataItem<dtype, itype> >
1221         ::detectorLocalObj_->produce(CkNumPes());
1222     }
1223
1224     // deliver locally
1225     ArrayDataItem<dtype, itype>& packedDataItem(TRAM_BROADCAST, dataItem);
1226     localBroadcast(packedDataItem);
1227
1228     DataItemHandle tempHandle; 
1229     tempHandle.dataItem = &dataItem;
1230     tempHandle.arrayIndex = TRAM_BROADCAST;
1231
1232     int numDimensions = 
1233       MeshStreamer<ArrayDataItem<dtype, itype> >::numDimensions_;
1234     MeshStreamer<ArrayDataItem<dtype, itype> >::
1235       broadcast(&tempHandle, numDimensions - 1, copyIndirectly);
1236   }
1237
1238   void insertData(const dtype& dataItem, itype arrayIndex) {
1239
1240     // no data items should be submitted after all local contributors call done
1241     // and staged completion has begun
1242     CkAssert((MeshStreamer<ArrayDataItem<dtype, itype> >
1243                ::stagedCompletionStarted()) == false);
1244
1245     if (MeshStreamer<ArrayDataItem<dtype, itype> >
1246          ::useStagedCompletion_ == false) {
1247       MeshStreamer<ArrayDataItem<dtype, itype> >::detectorLocalObj_->produce();
1248     }
1249     int destinationPe; 
1250 #ifdef CACHE_ARRAY_METADATA
1251   if (isCachedArrayMetadata_[arrayIndex]) {    
1252     destinationPe =  destinationPes_[arrayIndex];
1253   }
1254   else {
1255     destinationPe = 
1256       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1257     isCachedArrayMetadata_[arrayIndex] = true;
1258     destinationPes_[arrayIndex] = destinationPe;
1259   }
1260 #else 
1261   destinationPe = 
1262     clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
1263 #endif
1264
1265     if (destinationPe == CkMyPe()) {
1266       ArrayDataItem<dtype, itype> packedDataItem(arrayIndex, dataItem);
1267       localDeliver(packedDataItem);
1268       return;
1269     }
1270
1271     // this implementation avoids copying an item before transfer into message
1272     DataItemHandle tempHandle; 
1273     tempHandle.arrayIndex = arrayIndex; 
1274     tempHandle.dataItem = &dataItem;
1275
1276     MeshStreamer<ArrayDataItem<dtype, itype> >::
1277      insertData(&tempHandle, destinationPe);
1278
1279   }
1280
1281   int copyDataItemIntoMessage(
1282       MeshStreamerMessage<ArrayDataItem <dtype, itype> > *destinationBuffer, 
1283       const void *dataItemHandle, bool copyIndirectly) {
1284
1285     if (copyIndirectly == true) {
1286       // newly inserted items are passed through a handle to avoid copying
1287       int numDataItems = destinationBuffer->numDataItems;
1288       const DataItemHandle *tempHandle = 
1289         (const DataItemHandle *) dataItemHandle;
1290       (destinationBuffer->dataItems)[numDataItems].dataItem = 
1291         *(tempHandle->dataItem);
1292       (destinationBuffer->dataItems)[numDataItems].arrayIndex = 
1293         tempHandle->arrayIndex;
1294       return ++destinationBuffer->numDataItems;
1295     }
1296     else {
1297       // this is an item received along the route to destination
1298       // we can copy it from the received message
1299       return MeshStreamer<ArrayDataItem<dtype, itype> >::
1300               copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
1301     }
1302   }
1303
1304 };
1305
1306 #define CK_TEMPLATES_ONLY
1307 #include "NDMeshStreamer.def.h"
1308 #undef CK_TEMPLATES_ONLY
1309
1310 #endif