NDMeshStreamer: when delivering an item locally, don't return a reference into
[charm.git] / src / libs / ck-libs / NDMeshStreamer / NDMeshStreamer.h
1 #ifndef NDMESH_STREAMER_H
2 #define NDMESH_STREAMER_H
3
4 #include <algorithm>
5 #include "NDMeshStreamer.decl.h"
6 #include "DataItemTypes.h"
7
8 // allocate more total buffer space than the maximum buffering limit but flush 
9 //   upon reaching totalBufferCapacity_
10 #define BUFFER_SIZE_FACTOR 4
11
12 // #define DEBUG_STREAMER
13 // #define CACHE_LOCATIONS
14 // #define SUPPORT_INCOMPLETE_MESH
15
16 struct MeshLocation {
17   int dimension; 
18   int bufferIndex; 
19 }; 
20
21 template<class dtype>
22 class MeshStreamerMessage : public CMessage_MeshStreamerMessage<dtype> {
23 public:
24     int numDataItems;
25     int *destinationPes;
26     dtype *data;
27
28     MeshStreamerMessage(): numDataItems(0) {}   
29
30     int addDataItem(const dtype &dataItem) {
31         data[numDataItems] = dataItem;
32         return ++numDataItems; 
33     }
34
35     void markDestination(const int index, const int destinationPe) {
36         destinationPes[index] = destinationPe;
37     }
38
39     dtype &getDataItem(const int index) {
40         return data[index];
41     }
42 };
43
44 template <class dtype>
45 class MeshStreamerGroupClient : public CBase_MeshStreamerGroupClient<dtype> {
46  public:
47      virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
48      virtual void process(dtype &data)=0; 
49 };
50
51 template <class dtype>
52 class MeshStreamerArrayClient : public CBase_MeshStreamerArrayClient<dtype> {
53  public:
54      // virtual void receiveCombinedData(MeshStreamerMessage<dtype> *msg);
55   // would like to make it pure virtual but charm will try to
56   // instantiate the abstract class, leading to errors
57   virtual void process(dtype &data) {} //=0; 
58   MeshStreamerArrayClient() {}
59   MeshStreamerArrayClient(CkMigrateMessage *msg) {}
60 };
61
62 template <class dtype>
63 class MeshStreamer : public CBase_MeshStreamer<dtype> {
64
65 private:
66     int bufferSize_; 
67     int totalBufferCapacity_;
68     int numDataItemsBuffered_;
69
70     int numMembers_; 
71     int numDimensions_;
72     int *individualDimensionSizes_;
73     int *combinedDimensionSizes_;
74
75     int myIndex_;
76     int *myLocationIndex_;
77
78     CkCallback   userCallback_;
79     int yieldFlag_;
80
81     double progressPeriodInMs_; 
82     bool isPeriodicFlushEnabled_; 
83     double timeOfLastSend_; 
84
85
86     MeshStreamerMessage<dtype> ***dataBuffers_;
87
88 #ifdef CACHE_LOCATIONS
89     MeshLocation *cachedLocations;
90     bool *isCached; 
91 #endif
92
93     MeshLocation determineLocation(int destinationPe);
94
95     void storeMessage(int destinationPe, 
96                       const MeshLocation &destinationCoordinates, 
97                       void *dataItem, bool copyIndirectly = false);
98
99     virtual void deliverToDestination(
100                  int destinationPe, 
101                  MeshStreamerMessage<dtype> *destinationBuffer) = 0;
102
103     virtual void localDeliver(dtype &dataItem) = 0; 
104
105     void flushLargestBuffer();
106
107 protected:
108
109     virtual int copyDataItemIntoMessage(
110                 MeshStreamerMessage<dtype> *destinationBuffer, 
111                 void *dataItemHandle, bool copyIndirectly = false);
112
113 public:
114
115     MeshStreamer(int totalBufferCapacity, int numDimensions, 
116                  int *dimensionSizes,
117                  int yieldFlag = 0, double progressPeriodInMs = -1.0);
118     ~MeshStreamer();
119
120       // entry
121     void receiveAlongRoute(MeshStreamerMessage<dtype> *msg);
122     void flushDirect();
123     void finish(CkReductionMsg *msg);
124
125       // non entry
126     bool isPeriodicFlushEnabled() {
127       return isPeriodicFlushEnabled_;
128     }
129     virtual void insertData(dtype &dataItem, int destinationPe); 
130     void insertData(void *dataItemHandle, int destinationPe);
131     void doneInserting();
132     void associateCallback(CkCallback &cb, bool automaticFinish = true) { 
133       userCallback_ = cb;
134       if (automaticFinish) {
135         CkStartQD(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), 
136                              this->thisProxy));
137       }
138     }
139     void flushAllBuffers();
140     void registerPeriodicProgressFunction();
141
142     // flushing begins only after enablePeriodicFlushing has been invoked
143
144     void enablePeriodicFlushing(){
145       isPeriodicFlushEnabled_ = true; 
146       registerPeriodicProgressFunction();
147     }
148 };
149
150 template <class dtype>
151 void MeshStreamerGroupClient<dtype>::receiveCombinedData(
152                                 MeshStreamerMessage<dtype> *msg) {
153   for (int i = 0; i < msg->numDataItems; i++) {
154     dtype &data = msg->getDataItem(i);
155     process(data);
156   }
157   delete msg;
158 }
159
160 template <class dtype>
161 MeshStreamer<dtype>::MeshStreamer(
162                      int totalBufferCapacity, int numDimensions, 
163                      int *dimensionSizes, 
164                      int yieldFlag, 
165                      double progressPeriodInMs)
166  :numDimensions_(numDimensions), 
167   totalBufferCapacity_(totalBufferCapacity), 
168   yieldFlag_(yieldFlag), 
169   progressPeriodInMs_(progressPeriodInMs)
170 {
171   // limit total number of messages in system to totalBufferCapacity
172   //   but allocate a factor BUFFER_SIZE_FACTOR more space to take
173   //   advantage of nonuniform filling of buffers
174
175   int sumAlongAllDimensions = 0;   
176   individualDimensionSizes_ = new int[numDimensions_];
177   combinedDimensionSizes_ = new int[numDimensions_ + 1];
178   myLocationIndex_ = new int[numDimensions_];
179   memcpy(individualDimensionSizes_, dimensionSizes, 
180          numDimensions * sizeof(int)); 
181   combinedDimensionSizes_[0] = 1; 
182   for (int i = 0; i < numDimensions; i++) {
183     sumAlongAllDimensions += individualDimensionSizes_[i];
184     combinedDimensionSizes_[i + 1] = 
185       combinedDimensionSizes_[i] * individualDimensionSizes_[i];
186   }
187
188   // except for personalized messages, the buffers for dimensions with the 
189   //   same index as the sender's are not used
190   bufferSize_ = BUFFER_SIZE_FACTOR * totalBufferCapacity 
191     / (sumAlongAllDimensions - numDimensions_ + 1); 
192   if (bufferSize_ <= 0) {
193     bufferSize_ = 1; 
194     CkPrintf("Argument totalBufferCapacity to MeshStreamer constructor "
195              "is invalid. Defaulting to a single buffer per destination.\n");
196   }
197   totalBufferCapacity_ = totalBufferCapacity;
198   numDataItemsBuffered_ = 0; 
199   numMembers_ = CkNumPes(); 
200
201   dataBuffers_ = new MeshStreamerMessage<dtype> **[numDimensions_]; 
202   for (int i = 0; i < numDimensions; i++) {
203     int numMembersAlongDimension = individualDimensionSizes_[i]; 
204     dataBuffers_[i] = 
205       new MeshStreamerMessage<dtype> *[numMembersAlongDimension];
206     for (int j = 0; j < numMembersAlongDimension; j++) {
207       dataBuffers_[i][j] = NULL;
208     }
209   }
210
211   myIndex_ = CkMyPe();
212   int remainder = myIndex_;
213   for (int i = numDimensions_ - 1; i >= 0; i--) {    
214     myLocationIndex_[i] = remainder / combinedDimensionSizes_[i];
215     remainder -= combinedDimensionSizes_[i] * myLocationIndex_[i];
216   }
217
218   isPeriodicFlushEnabled_ = false; 
219
220 #ifdef CACHE_LOCATIONS
221   cachedLocations = new MeshLocation[numMembers_];
222   isCached = new bool[numMembers_];
223   std::fill(isCached, isCached + numMembers_, false);
224 #endif
225
226 }
227
228 template <class dtype>
229 MeshStreamer<dtype>::~MeshStreamer() {
230
231   for (int i = 0; i < numDimensions_; i++) {
232     for (int j=0; j < individualDimensionSizes_[i]; j++) {
233       delete[] dataBuffers_[i][j]; 
234     }
235     delete[] dataBuffers_[i]; 
236   }
237
238   delete[] individualDimensionSizes_;
239   delete[] combinedDimensionSizes_; 
240   delete[] myLocationIndex_;
241
242 #ifdef CACHE_LOCATIONS
243   delete[] cachedLocations;
244   delete[] isCached; 
245 #endif
246
247 }
248
249
250 template <class dtype>
251 inline
252 MeshLocation MeshStreamer<dtype>::determineLocation(int destinationPe) { 
253
254 #ifdef CACHE_LOCATIONS
255   if (isCached[destinationPe]) {    
256     return cachedLocations[destinationPe]; 
257   }
258 #endif
259
260   MeshLocation destinationLocation;
261   int remainder = destinationPe;
262   int dimensionIndex; 
263   for (int i = numDimensions_ - 1; i >= 0; i--) {        
264     dimensionIndex = remainder / combinedDimensionSizes_[i];
265     
266     if (dimensionIndex != myLocationIndex_[i]) {
267       destinationLocation.dimension = i; 
268       destinationLocation.bufferIndex = dimensionIndex; 
269 #ifdef CACHE_LOCATIONS
270       cachedLocations[destinationPe] = destinationLocation;
271       isCached[destinationPe] = true; 
272 #endif
273       return destinationLocation;
274     }
275
276     remainder -= combinedDimensionSizes_[i] * dimensionIndex;
277   }
278
279   // all indices agree - message to oneself
280   destinationLocation.dimension = 0; 
281   destinationLocation.bufferIndex = myLocationIndex_[0];
282   return destinationLocation; 
283 }
284
285 template <class dtype>
286 inline 
287 int MeshStreamer<dtype>::copyDataItemIntoMessage(
288                          MeshStreamerMessage<dtype> *destinationBuffer,
289                          void *dataItemHandle, bool copyIndirectly) {
290   return destinationBuffer->addDataItem(*((dtype *)dataItemHandle)); 
291 }
292
293 template <class dtype>
294 inline
295 void MeshStreamer<dtype>::storeMessage(
296                           int destinationPe, 
297                           const MeshLocation& destinationLocation,
298                           void *dataItem, bool copyIndirectly) {
299
300   int dimension = destinationLocation.dimension;
301   int bufferIndex = destinationLocation.bufferIndex; 
302   MeshStreamerMessage<dtype> ** messageBuffers = dataBuffers_[dimension];   
303
304
305
306   // allocate new message if necessary
307   if (messageBuffers[bufferIndex] == NULL) {
308     if (dimension == 0) {
309       // personalized messages do not require destination indices
310       messageBuffers[bufferIndex] = 
311         new (0, bufferSize_) MeshStreamerMessage<dtype>();
312     }
313     else {
314       messageBuffers[bufferIndex] = 
315         new (bufferSize_, bufferSize_) MeshStreamerMessage<dtype>();
316     }
317 #ifdef DEBUG_STREAMER
318     CkAssert(messageBuffers[bufferIndex] != NULL);
319 #endif
320   }
321   
322   MeshStreamerMessage<dtype> *destinationBuffer = messageBuffers[bufferIndex];
323   int numBuffered = 
324     copyDataItemIntoMessage(destinationBuffer, dataItem, copyIndirectly);
325   if (dimension != 0) {
326     destinationBuffer->markDestination(numBuffered-1, destinationPe);
327   }  
328   numDataItemsBuffered_++;
329
330   // send if buffer is full
331   if (numBuffered == bufferSize_) {
332
333     int destinationIndex;
334
335     destinationIndex = myIndex_ + 
336       (bufferIndex - myLocationIndex_[dimension]) * 
337       combinedDimensionSizes_[dimension];
338
339     if (dimension == 0) {
340       deliverToDestination(destinationIndex, destinationBuffer);
341     }
342     else {
343       this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
344     }
345
346     messageBuffers[bufferIndex] = NULL;
347     numDataItemsBuffered_ -= numBuffered; 
348
349     if (isPeriodicFlushEnabled_) {
350       timeOfLastSend_ = CkWallTimer();
351     }
352
353   }
354
355   // send if total buffering capacity has been reached
356   if (numDataItemsBuffered_ == totalBufferCapacity_) {
357     flushLargestBuffer();
358     if (isPeriodicFlushEnabled_) {
359       timeOfLastSend_ = CkWallTimer();
360     }
361   }
362
363 }
364
365 template <class dtype>
366 void MeshStreamer<dtype>::insertData(void *dataItemHandle, int destinationPe) {
367   static int count = 0;
368   const static bool copyIndirectly = true;
369
370   MeshLocation destinationLocation = determineLocation(destinationPe);
371   storeMessage(destinationPe, destinationLocation, dataItemHandle, 
372                copyIndirectly); 
373
374   // release control to scheduler if requested by the user, 
375   //   assume caller is threaded entry
376   if (yieldFlag_ && ++count == 1024) {
377     count = 0; 
378     CthYield();
379   }
380
381 }
382
383 template <class dtype>
384 inline
385 void MeshStreamer<dtype>::insertData(dtype &dataItem, int destinationPe) {
386
387   if (destinationPe == CkMyPe()) {
388     // copying here is necessary - user code should not be 
389     // passed back a reference to the original item
390     dtype dataItemCopy = dataItem;
391     localDeliver(dataItemCopy);
392     return;
393   }
394
395   insertData((void *) &dataItem, destinationPe);
396 }
397
398 template <class dtype>
399 void MeshStreamer<dtype>::doneInserting() {
400   this->contribute(CkCallback(CkIndex_MeshStreamer<dtype>::finish(NULL), 
401                               this->thisProxy));
402 }
403
404 template <class dtype>
405 void MeshStreamer<dtype>::finish(CkReductionMsg *msg) {
406
407   isPeriodicFlushEnabled_ = false; 
408   flushDirect();
409
410   if (!userCallback_.isInvalid()) {
411     CkStartQD(userCallback_);
412     userCallback_ = CkCallback();      // nullify the current callback
413   }
414
415   // TODO: TEST IF THIS DELETE STILL CAUSES UNEXPLAINED CRASHES
416   //  delete msg; 
417 }
418
419 template <class dtype>
420 void MeshStreamer<dtype>::receiveAlongRoute(MeshStreamerMessage<dtype> *msg) {
421
422   int destinationPe; 
423   MeshLocation destinationLocation;
424
425   for (int i = 0; i < msg->numDataItems; i++) {
426     destinationPe = msg->destinationPes[i];
427     dtype &dataItem = msg->getDataItem(i);
428     destinationLocation = determineLocation(destinationPe);
429     if (destinationPe == CkMyPe()) {
430       localDeliver(dataItem);
431     }
432     else {
433       storeMessage(destinationPe, destinationLocation, &dataItem);   
434     }
435   }
436
437   delete msg;
438
439 }
440
441 template <class dtype>
442 void MeshStreamer<dtype>::flushLargestBuffer() {
443
444   int flushDimension, flushIndex, maxSize, destinationIndex, numBuffers;
445   MeshStreamerMessage<dtype> ** messageBuffers; 
446   MeshStreamerMessage<dtype> *destinationBuffer; 
447
448   for (int i = 0; i < numDimensions_; i++) {
449
450     messageBuffers = dataBuffers_[i]; 
451     numBuffers = individualDimensionSizes_[i]; 
452
453     flushDimension = i; 
454     maxSize = 0;    
455     for (int j = 0; j < numBuffers; j++) {
456       if (messageBuffers[j] != NULL && 
457           messageBuffers[j]->numDataItems > maxSize) {
458         maxSize = messageBuffers[j]->numDataItems;
459         flushIndex = j; 
460       }
461     }
462
463     if (maxSize > 0) {
464
465       messageBuffers = dataBuffers_[flushDimension]; 
466       destinationBuffer = messageBuffers[flushIndex];
467       destinationIndex = myIndex_ + 
468         (flushIndex - myLocationIndex_[flushDimension]) * 
469         combinedDimensionSizes_[flushDimension] ;
470
471       if (destinationBuffer->numDataItems < bufferSize_) {
472         // not sending the full buffer, shrink the message size
473         envelope *env = UsrToEnv(destinationBuffer);
474         env->setTotalsize(env->getTotalsize() - sizeof(dtype) *
475                           (bufferSize_ - destinationBuffer->numDataItems));
476       }
477       numDataItemsBuffered_ -= destinationBuffer->numDataItems;
478
479       if (flushDimension == 0) {
480         deliverToDestination(destinationIndex, destinationBuffer);
481       }
482       else {
483         this->thisProxy[destinationIndex].receiveAlongRoute(destinationBuffer);
484       }
485       messageBuffers[flushIndex] = NULL;
486
487     }
488
489   }
490 }
491
492 template <class dtype>
493 void MeshStreamer<dtype>::flushAllBuffers() {
494
495   MeshStreamerMessage<dtype> **messageBuffers; 
496   int numBuffers; 
497
498   for (int i = 0; i < numDimensions_; i++) {
499
500     messageBuffers = dataBuffers_[i]; 
501     numBuffers = individualDimensionSizes_[i]; 
502
503     for (int j = 0; j < numBuffers; j++) {
504
505       if(messageBuffers[j] == NULL) {
506         continue;
507       }
508
509       numDataItemsBuffered_ -= messageBuffers[j]->numDataItems;
510
511       if (i == 0) {
512         int destinationPe = myIndex_ + j - myLocationIndex_[i];
513         deliverToDestination(destinationPe, messageBuffers[j]);
514       }  
515       else {
516
517         for (int k = 0; k < messageBuffers[j]->numDataItems; k++) {
518
519           MeshStreamerMessage<dtype> *directMsg = 
520             new (0, 1) MeshStreamerMessage<dtype>();
521 #ifdef DEBUG_STREAMER
522           CkAssert(directMsg != NULL);
523 #endif
524           int destinationPe = messageBuffers[j]->destinationPes[k]; 
525           dtype &dataItem = messageBuffers[j]->getDataItem(k);   
526           directMsg->addDataItem(dataItem);
527           deliverToDestination(destinationPe,directMsg);
528         }
529         delete messageBuffers[j];
530       }
531       messageBuffers[j] = NULL;
532     }
533   }
534 }
535
536 template <class dtype>
537 void MeshStreamer<dtype>::flushDirect(){
538
539     if (!isPeriodicFlushEnabled_ || 
540         1000 * (CkWallTimer() - timeOfLastSend_) >= progressPeriodInMs_) {
541       flushAllBuffers();
542     }
543
544     if (isPeriodicFlushEnabled_) {
545       timeOfLastSend_ = CkWallTimer();
546     }
547
548 #ifdef DEBUG_STREAMER
549     //CkPrintf("[%d] numDataItemsBuffered_: %d\n", CkMyPe(), numDataItemsBuffered_);
550     CkAssert(numDataItemsBuffered_ == 0); 
551 #endif
552
553 }
554
555 template <class dtype>
556 void periodicProgressFunction(void *MeshStreamerObj, double time) {
557
558   MeshStreamer<dtype> *properObj = 
559     static_cast<MeshStreamer<dtype>*>(MeshStreamerObj); 
560
561   if (properObj->isPeriodicFlushEnabled()) {
562     properObj->flushDirect();
563     properObj->registerPeriodicProgressFunction();
564   }
565 }
566
567 template <class dtype>
568 void MeshStreamer<dtype>::registerPeriodicProgressFunction() {
569   CcdCallFnAfter(periodicProgressFunction<dtype>, (void *) this, 
570                  progressPeriodInMs_); 
571 }
572
573
574 template <class dtype>
575 class GroupMeshStreamer : public MeshStreamer<dtype> {
576 private:
577
578   CProxy_MeshStreamerGroupClient<dtype> clientProxy_;
579   MeshStreamerGroupClient<dtype> *clientObj_;
580
581   void deliverToDestination(int destinationPe, 
582                             MeshStreamerMessage<dtype> *destinationBuffer) {
583     clientProxy_[destinationPe].receiveCombinedData(destinationBuffer);
584   }
585
586   void localDeliver(dtype &dataItem) {
587     clientObj_->process(dataItem);
588   }
589
590 public:
591
592   GroupMeshStreamer(int totalBufferCapacity, int numDimensions,
593                     int *dimensionSizes, 
594                     const CProxy_MeshStreamerGroupClient<dtype> &clientProxy,
595                     int yieldFlag = 0, double progressPeriodInMs = -1.0)
596    :MeshStreamer<dtype>(totalBufferCapacity, numDimensions, dimensionSizes, 
597                          yieldFlag, progressPeriodInMs) 
598   {
599     clientProxy_ = clientProxy; 
600     clientObj_ = 
601       ((MeshStreamerGroupClient<dtype> *)CkLocalBranch(clientProxy_));
602   }
603
604
605 };
606
607 template <class dtype>
608 class ArrayMeshStreamer : public MeshStreamer<ArrayDataItem<dtype> > {
609 private:
610
611   CProxy_MeshStreamerArrayClient<dtype> clientProxy_;
612   CkArray *clientArrayMgr_;
613   MeshStreamerArrayClient<dtype> *clientObj_;
614
615
616   void deliverToDestination(
617        int destinationPe, 
618        MeshStreamerMessage<ArrayDataItem<dtype> > *destinationBuffer) { 
619     ( (CProxy_ArrayMeshStreamer<dtype>) 
620       this->thisProxy )[destinationPe].receiveArrayData(destinationBuffer);
621   }
622
623   void localDeliver(ArrayDataItem<dtype> &packedDataItem) {
624     int arrayId = packedDataItem.arrayIndex; 
625     MeshStreamerArrayClient<dtype> *clientObj = 
626       clientProxy_[arrayId].ckLocal();
627
628     if (clientObj != NULL) {
629       clientObj->process(packedDataItem.dataItem);
630     }
631     else {
632       // array element is no longer present locally - redeliver using proxy
633       clientProxy_[arrayId].process(packedDataItem.dataItem);
634     }
635   }
636
637 public:
638
639   struct DataItemHandle {
640     int arrayIndex; 
641     dtype *dataItem;
642   };
643
644   ArrayMeshStreamer(int totalBufferCapacity, int numDimensions,
645                     int *dimensionSizes, 
646                     const CProxy_MeshStreamerArrayClient<dtype> &clientProxy,
647                     int yieldFlag = 0, double progressPeriodInMs = -1.0)
648     :MeshStreamer<ArrayDataItem<dtype> >(totalBufferCapacity, numDimensions, 
649                                         dimensionSizes, yieldFlag, 
650                                         progressPeriodInMs) 
651   {
652     clientProxy_ = clientProxy; 
653     clientArrayMgr_ = clientProxy_.ckLocalBranch();
654   }
655
656   void receiveArrayData(MeshStreamerMessage<ArrayDataItem<dtype> > *msg) {
657     for (int i = 0; i < msg->numDataItems; i++) {
658       ArrayDataItem<dtype> &packedData = msg->getDataItem(i);
659       localDeliver(packedData);
660     }
661     delete msg;
662   }
663
664   void insertData(dtype &dataItem, int arrayIndex) {
665
666     int destinationPe = 
667       clientArrayMgr_->lastKnown(clientProxy_[arrayIndex].ckGetIndex());
668     static ArrayDataItem<dtype> packedDataItem;
669     if (destinationPe == CkMyPe()) {
670       // copying here is necessary - user code should not be 
671       // passed back a reference to the original item
672       packedDataItem.arrayIndex = arrayIndex; 
673       packedDataItem.dataItem = dataItem;
674       localDeliver(packedDataItem);
675       return;
676     }
677
678     // this implementation avoids copying an item before transfer into message
679
680     static DataItemHandle tempHandle; 
681     tempHandle.arrayIndex = arrayIndex; 
682     tempHandle.dataItem = &dataItem;
683
684     MeshStreamer<ArrayDataItem<dtype> >::insertData(&tempHandle, destinationPe);
685
686   }
687
688   int copyDataItemIntoMessage(
689       MeshStreamerMessage<ArrayDataItem <dtype> > *destinationBuffer, 
690       void *dataItemHandle, bool copyIndirectly) {
691
692     if (copyIndirectly == true) {
693       // newly inserted items are passed through a handle to avoid copying
694       int numDataItems = destinationBuffer->numDataItems;
695       DataItemHandle *tempHandle = (DataItemHandle *) dataItemHandle;
696       (destinationBuffer->data)[numDataItems].dataItem = 
697         *(tempHandle->dataItem);
698       (destinationBuffer->data)[numDataItems].arrayIndex = 
699         tempHandle->arrayIndex;
700       return ++destinationBuffer->numDataItems;
701     }
702     else {
703       // this is an item received along the route to destination
704       // we can copy it from the received message
705       return MeshStreamer<ArrayDataItem<dtype> >::copyDataItemIntoMessage(destinationBuffer, dataItemHandle);
706     }
707   }
708
709 };
710
711 #define CK_TEMPLATES_ONLY
712 #include "NDMeshStreamer.def.h"
713 #undef CK_TEMPLATES_ONLY
714
715 #endif