fixing for ft
[charm.git] / src / conv-com / MeshStreamingStrategy.C
1 /**
2    @addtogroup ComlibConverseStrategy   
3    @{
4    @file 
5 */
6
7 #include "MeshStreamingStrategy.h"
8 #include "pup_cmialloc.h"
9 //#include "MsgPacker.h"
10
11 /**** not needed any-more after pup_CmiAlloc 
12 // These macros are taken directly from convcore.c.
13 #define SIZEFIELD(m) (((CmiChunkHeader *)(m))[-1].size)
14 #define REFFIELD(m) (((CmiChunkHeader *)(m))[-1].ref)
15 #define BLKSTART(m) (((CmiChunkHeader *)(m))-1)
16 ***/
17
18 // These externs are defined inside ComlibManager.C.
19 //CkpvExtern(CkGroupID, cmgrID);
20 //CkpvExtern(int, RecvmsgHandle);
21
22 CkpvDeclare(int, streaming_column_handler_id);
23
24 /**************************************************************************
25 ** This handler is invoked automatically when the processor goes idle.
26 **
27 ** The idle handler automatically re-registers itself, so there is no need
28 ** to re-register it from here.
29 **
30 ** If nothing else is going on anyway, we might as well flush the buffers
31 ** now instead of waiting for the flush period.
32 */
33 void idle_flush_handler (void *ptr, double curT)
34 {
35   ComlibPrintf ("[%d] idle_flush_handler() invoked.\n", CkMyPe());
36
37   MeshStreamingStrategy *classptr = (MeshStreamingStrategy *) ptr;
38   classptr->FlushBuffers ();
39 }
40
41
42
43 /**************************************************************************
44 ** This handler is invoked automatically after a timeout occurs.
45 **
46 ** The periodic handler does not automatically re-register itself, so it
47 ** calls RegisterPeriodicFlush() to do so after it finishes flushing
48 ** buffers.
49 */
50 void periodic_flush_handler (void *ptr, double curT)
51 {
52   ComlibPrintf ("[%d] periodic_flush_handler() invoked.\n", CkMyPe());
53
54   MeshStreamingStrategy *classptr = (MeshStreamingStrategy *) ptr;
55   classptr->FlushBuffers ();
56   classptr->RegisterPeriodicFlush ();
57 }
58
59
60
61 /**************************************************************************
62 ** This handler is invoked automatically when a packed message for a column
63 ** is received.
64 **
65 ** The layout of the message received is shown in the diagram below.
66 **
67 **             \  /
68 ** +------------||---------------------------------------------+
69 ** |Conv| I | # || dest || size | ref || Converse  || user | ...
70 ** |hdr | D |   ||  PE  ||      | cnt ||  header   || data | ...
71 ** +------------||---------------------------------------------+
72 **             /  \
73 **
74 ** The function first retrieves the strategy ID and the number of messages
75 ** in the packed message and then uses the strategy ID to obtain a pointer
76 ** to the MeshStreamingStrategy class.  It also obtains the row length by
77 ** calling GetRowLength().
78 **
79 ** The function then iterates through the messages in the packed message.
80 ** For each message within, it allocates space by calling CmiAlloc() and
81 ** then copies the message from the packed buffer into the new message
82 ** buffer.  It is also able to obtain the destination PE for the message
83 ** because this information is included in the packed message data for
84 ** each packed message.  If the destination PE is the current PE, the
85 ** message is delivered immediately via a call to CmiSyncSendAndFree().
86 ** This routine calls CmiFree() on the message, which is appropriate
87 ** since it was allocated with CmiAlloc().  Otherwise, the message is
88 ** inserted into the row bucket for the necessary row by calling
89 ** InsertIntoRowBucket().  When messages are delivered from the row
90 ** bucket, they are freed by CmiFree().
91 */
92 void streaming_column_handler (void *msg)
93 {
94     int dest_row;
95     int my_pe;
96     //int num_msgs;
97     int row_length;
98     //int strategy_id;
99     //char *msgptr;
100     char *newmsg;
101     MeshStreamingStrategy *classptr;
102         
103     ComlibPrintf ("[%d] column_handler() invoked.\n", CkMyPe());
104     
105     my_pe = CkMyPe ();
106
107     //PUP_cmialloc mem lets us use the converse reference counting
108     //black magic in a transparent way. PUP_fromCmiAllocMem lets sub
109     //messages in a messages be used freely in the program as messages.     
110     PUP_fromCmiAllocMem fp(msg);    
111     MeshStreamingHeader mhdr;
112     
113     //Read the header from the message
114     fp | mhdr;
115     
116     //strategy_id = ((int *) (msg + CmiMsgHeaderSizeBytes))[0];
117     //num_msgs = ((int *) (msg + CmiMsgHeaderSizeBytes))[1];
118     
119     classptr = (MeshStreamingStrategy *)ConvComlibGetStrategy(mhdr.strategy_id);
120     //        CProxy_ComlibManager (CkpvAccess (cmgrID)).
121     //        ckLocalBranch()->getStrategy (mhdr.strategy_id);
122     
123     row_length = classptr->GetRowLength ();
124     
125     //msgptr = (char *) (msg + CmiMsgHeaderSizeBytes + 2 * sizeof(int));
126     
127     for (int i = 0; i < mhdr.num_msgs; i++) {
128         /*
129         dest_pe = ((int *) msgptr)[0];
130         msgsize = ((int *) msgptr)[1];
131         
132         newmsg = (char *) CmiAlloc (msgsize);
133         
134         memcpy (newmsg, (msgptr + 3 * sizeof(int)), msgsize);
135         
136         if (dest_pe == my_pe) {
137             CmiSyncSendAndFree (my_pe, msgsize, newmsg);
138         } else {
139             dest_row = dest_pe / row_length;
140             classptr->InsertIntoRowBucket (dest_row, newmsg);
141         }
142         
143         msgptr += msgsize + 3 * sizeof(int);
144         */
145
146         int dest_pe;
147         fp | dest_pe;
148
149         //Returns a part of a message as an independent message and
150         //updates the reference count of the container message.
151         fp.pupCmiAllocBuf((void **)&newmsg);
152         int msgsize = SIZEFIELD(newmsg);// ((envelope*)newmsg)->getTotalsize();
153
154         if (dest_pe == my_pe) {
155             CmiSyncSendAndFree (my_pe, msgsize, newmsg);
156         } else {
157             dest_row = dest_pe / row_length;
158             classptr->InsertIntoRowBucket (dest_row, newmsg);
159         }
160     }
161     
162     CmiFree (msg);
163 }
164
165
166
167 /**************************************************************************
168 ** This is the MeshStreamingStrategy constructor.
169 **
170 ** The period and bucket_size have default values specified in the .h file.
171 **
172 ** The constructor is invoked when the client code instantiates this
173 ** strategy.  The constructor executes on a SINGLE PROCESS in the
174 ** computation, so it cannot do things like determine an individual
175 ** process's position within the mesh.
176 **
177 ** After the constructor is invoked, the communications library creates
178 ** instances that get pup'ed and shipped to each processor in the
179 ** computation.  To that end, the process that instantiates this strategy
180 ** (most likely PE 0) will then use pup to pack copies of the strategy
181 ** and then ship them off to other processes.  They will be un-pup'ed
182 ** there.  Finally, beginProcessing() will be called on EACH instance on
183 ** its target processor.
184 */
185 MeshStreamingStrategy::MeshStreamingStrategy (int period, int bucket_size) 
186     : Strategy() 
187 {
188     ComlibPrintf ("[%d] MeshStreamingStrategy::MeshStreamingStrategy() invoked.\n", CkMyPe());
189
190     num_pe = CkNumPes ();
191     
192     num_columns = (int) (ceil (sqrt ((double) num_pe)));
193     num_rows = num_columns;
194     row_length = num_columns;
195     
196     flush_period = period;
197     max_bucket_size = bucket_size;
198     
199     column_bucket = new CkQ<char *>[num_columns];
200     column_destQ = new CkQ<int>[num_columns];
201     column_bytes = new int[num_columns];
202     row_bucket = new CkQ<char *>[num_rows];
203
204     //shortMsgPackingFlag = CmiFalse;
205 }
206
207
208
209 /**************************************************************************
210 ** This method is called when the communications library sends a message
211 ** from one PE to another PE.  This could be due to a direct message being
212 ** sent, or due to a method invocation with marshalled parameters.
213 **
214 ** The method begins by getting the destination PE from the
215 ** CharmMessageHolder that is passed in (and from this, computing the
216 ** destination column) and getting a pointer to the User data for the
217 ** message (and from this, computing the Envelope pointer and the Block
218 ** pointer).  The following diagram shows the layout of the message.
219 **
220 **     +----------------------------------------------------+
221 **     | size | refcount || Converse  ||        user        |
222 **     |      |          ||  header   ||        data        |
223 **     +----------------------------------------------------+
224 **     ^                  ^            ^
225 **     |                  |            |
226 **     blk (Block)        msg          usr (User)
227 **
228 ** All Converse messages are allocated by CmiAlloc() which prepends two ints to
229 ** all memory regions to hold a size field and a refcount field. BLKSTART() is a
230 ** macro that gets the start of a block from the envelope pointer.
231 **
232 ** If the destination PE is our current PE, we just deliver the message
233 ** immediately.
234 **
235 ** Otherwise, if the destination PE is in the same column as our PE, we
236 ** allocate a new region of memory with CmiAlloc() and copy from the
237 ** envelope pointer into the new region, and then deposit this new message
238 ** into the appropriate row bucket for our column.  (The row buckets are
239 ** queues of pointers to memory regions exactly like the diagram above.
240 ** All entries in the row bucket are allocated with CmiAlloc() and must
241 ** be deallocated with CmiFree()!)
242 **
243 ** Otherwise, the destination PE must be in a different column from our
244 ** PE.  We allocate a new region of memory with "new" that looks like
245 ** the diagram below.
246 **
247 ** +------------------------------------------------------------+
248 ** | dest || size | refcount || Converse  ||        user        |
249 ** |  PE  ||      |          ||  header   ||        data        |
250 ** +------------------------------------------------------------+
251 ** ^       ^                  ^            ^
252 ** |       |                  |            |
253 ** newmsg  blk (Block)        msg          usr (User)
254 **
255 ** We then deposit this new message into the appropriate column bucket.
256 ** (The column buckets are queues of pointers that are allocated with
257 ** "new" and must be deallocated with "delete"!)
258 */
259
260 void MeshStreamingStrategy::insertMessage (MessageHolder *cmsg)
261 {
262     int dest_pe;
263     int dest_row;
264     int dest_col;
265     int msg_size;
266     int total_size;
267     char *msg;
268     //char *env;
269     //char *blk;
270     //char *newmsg;
271     
272     ComlibPrintf ("[%d] MeshStreamingStrategy::insertMessage() invoked.\n", 
273                   CkMyPe());
274     
275     dest_pe = cmsg->dest_proc;
276     dest_col = dest_pe % num_columns;
277     msg = cmsg->getMessage ();
278     //env = (char *) UsrToEnv (usr);
279     
280     //blk = (char *) BLKSTART (env);
281     msg_size = SIZEFIELD(msg);//((envelope *)env)->getTotalsize();
282
283     //misc_size = (env - blk);
284     total_size = sizeof (int) + sizeof(CmiChunkHeader) + msg_size;
285     
286     if (dest_pe == my_pe) {
287         CmiSyncSend (my_pe, msg_size, msg);
288     } else if (dest_col == my_column) {
289         //newmsg = (char *) CmiAlloc (env_size);
290         //memcpy (newmsg, env, env_size);
291         //newmsg = env;
292         
293         dest_row = dest_pe / row_length;
294         
295         InsertIntoRowBucket (dest_row, msg);
296     } else {
297         //newmsg = new char[total_size];
298         //((int *) newmsg)[0] = dest_pe;
299         //memcpy ( (void *) &(((int *) newmsg)[1]), blk, misc_size + env_size);
300         
301         column_bucket[dest_col].enq (msg);
302         column_destQ[dest_col].enq(dest_pe);
303         column_bytes[dest_col] += total_size;
304         
305         if (column_bucket[dest_col].length() > max_bucket_size) {
306             FlushColumn (dest_col);
307         }
308     }
309     
310     delete cmsg;
311 }
312
313
314
315 /**************************************************************************
316 ** This method is not used for streaming strategies.
317 */
318 void MeshStreamingStrategy::doneInserting ()
319 {
320     ComlibPrintf ("[%d] MeshStreamingStrategy::doneInserting() invoked.\n", CkMyPe());    
321     // Empty for this strategy.
322
323     //FlushBuffers();
324     //Only want to flush local outgoing messages
325     for (int column = 0; column < num_columns; column++) {
326       FlushColumn ((column+my_column)%num_columns);
327     }
328 }
329
330
331 /* *************************************************************************
332 ** This method is invoked prior to any processing taking place in the
333 ** class.  Various initializations take place here that cannot take place
334 ** in the class constructor due to the communications library itself not
335 ** being totally initialized.
336 **
337 ** See MeshStreamingStrategy::MeshStreamingStrategy() for more details.
338 */
339 /*
340 void MeshStreamingStrategy::beginProcessing (int ignored) {
341     ComlibPrintf ("[%d] MeshStreamingStrategy::beginProcessing() invoked.\n", CkMyPe());
342     
343     //strategy_id = myInstanceID;
344     
345     my_pe = CkMyPe ();
346
347     my_column = my_pe % num_columns;
348     my_row = my_pe / row_length;
349     
350     //column_bucket = new CkQ<char *>[num_columns];
351     //column_bytes = new int[num_columns];
352     
353     for (int i = 0; i < num_columns; i++) {
354         column_bytes[i] = 0;
355     }
356     
357     row_bucket = new CkQ<char *>[num_rows];
358     
359     column_handler_id = CkRegisterHandler ((CmiHandler) column_handler);
360     
361     CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler,
362                                (void *) this, CkMyPe());
363     RegisterPeriodicFlush ();
364 }
365 */
366
367
368 /**************************************************************************
369 ** This method exists so periodic_flush_handler() can re-register itself to
370 ** be invoked periodically to flush buffers.
371 */
372 void MeshStreamingStrategy::RegisterPeriodicFlush (void)
373 {
374   ComlibPrintf ("[%d] MeshStreamingStrategy::RegisterPeriodicFlush() invoked.\n", CkMyPe());
375
376   CcdCallFnAfterOnPE(periodic_flush_handler, (void *) this, flush_period, CkMyPe());
377 }
378
379
380
381 /**************************************************************************
382 ** This method is used to flush a specified column bucket, either as the
383 ** result of the column bucket reaching its maximum capacity, as a result
384 ** of the periodic flush handler being invoked, or as a result of the
385 ** processor going idle.
386 **
387 ** The method first finds the destination PE for the column.  This is the
388 ** PE in the target column that is within the same row as the current PE.
389 **
390 ** If there are actually messages in the bucket, then space is allocated
391 ** to hold the new message which will pack all of the messages in the
392 ** column bucket together.  The layout of this message is shown below:
393 **
394 **             \  /
395 ** +------------||-------------------------------------------+
396 ** |Conv| I | # || dest || size | ref || Converse  || user | ...
397 ** |hdr | D |   ||  PE  ||      | cnt ||  header   || data | ...
398 ** +------------||-------------------------------------------+
399 **             /  \
400 **
401 ** Since the buffer represents a Converse message, it must begin with a
402 ** Converse header.  After the header is an int representing the Commlib
403 ** strategy ID for this strategy.  This is needed only so that the
404 ** column_handler() can get a pointer to the MeshStreamingStrategy class
405 ** later.  Next, comes an int containing the number of messages within
406 ** the packed message.  Finally, the messages are removed from the column
407 ** bucket and appended one after another into the buffer.
408 **
409 ** After packing, a handler is set on the message to cause it to invoke
410 ** column_handler() on the destination PE and the message is finally
411 ** sent with CmiSyncSendAndFree().
412 **
413 ** The buffer that is allocated in this message is used as a Converse
414 ** message, so it is allocated with CmiAlloc() so the send routine can
415 ** properly free it with CmiFree().  Therefore it has two ints for size
416 ** and ref count at the beginning of the buffer.  These are not shown in
417 ** the diagram above since they are basically irrelevant to this software.
418 */
419
420 void MeshStreamingStrategy::FlushColumn (int column)
421 {
422     int dest_column_pe;
423     int num_msgs;
424     int newmsgsize;
425     char *newmsg;
426
427     CmiAssert (column < num_columns);
428     
429     dest_column_pe = column + (my_row * row_length);
430     if (dest_column_pe >= num_pe) {
431       // This means that there is a hole in the mesh.
432       //dest_column_pe = column + ((my_row % (num_rows - 1) - 1) * row_length);
433       int new_row = my_column % (my_row + 1);
434       if(new_row >= my_row)
435         new_row = 0;
436
437       dest_column_pe = column + new_row * row_length;
438     }
439     
440     num_msgs = column_bucket[column].length ();
441     
442     if(num_msgs == 0)
443       return;
444     
445     ComlibPrintf ("[%d] MeshStreamingStrategy::FlushColumn() invoked. to %d\n", 
446                   CkMyPe(), dest_column_pe);    
447
448     PUP_cmiAllocSizer sp;        
449     int i = 0;
450     MeshStreamingHeader mhdr;
451     
452     mhdr.strategy_id = getInstance();
453     mhdr.num_msgs = num_msgs;
454     sp | mhdr;
455     
456     for (i = 0; i < num_msgs; i++) {
457       void *msg = column_bucket[column][i];
458       int size = SIZEFIELD(msg);//((envelope *)msg)->getTotalsize();
459       
460       int destpe = column_destQ[column][i];
461       sp | destpe;
462       sp.pupCmiAllocBuf((void **)&msg, size);
463     }
464     
465     newmsgsize = sp.size();
466     newmsg = (char *) CmiAlloc (newmsgsize);
467     
468     //((int *) (newmsg + CmiMsgHeaderSizeBytes))[0] = strategy_id;
469     //((int *) (newmsg + CmiMsgHeaderSizeBytes))[1] = num_msgs;
470     
471     PUP_toCmiAllocMem mp(newmsg);
472     //make a structure header
473     mp | mhdr;
474     
475     /*
476       newmsgptr = (char *) (newmsg + CmiMsgHeaderSizeBytes + 2 * sizeof (int));               
477       for (int i = 0; i < num_msgs; i++) {
478       msgptr = column_bucket[column].deq ();            
479       msgsize = ((int *) msgptr)[1] + (3 * sizeof (int));
480       memcpy (newmsgptr, msgptr, msgsize);
481       
482       newmsgptr += msgsize;
483       
484       delete [] msgptr;
485       }
486     */
487     
488     for (i = 0; i < num_msgs; i++) {
489       void *msg = column_bucket[column][i];
490       int destpe = column_destQ[column][i];
491       int size = SIZEFIELD(msg);//((envelope*)msg)->getTotalsize();
492       
493       mp | destpe;
494       mp.pupCmiAllocBuf((void **)&msg, size);
495     }
496     
497     for (i = 0; i < num_msgs; i++) {
498       void *msg = column_bucket[column].deq();
499       CmiFree(msg);
500       
501       column_destQ[column].deq();
502     }
503     
504     column_bytes[column] = 0;        
505     CmiSetHandler (newmsg, CkpvAccess(streaming_column_handler_id));        
506     CmiSyncSendAndFree (dest_column_pe, newmsgsize, newmsg);
507 }
508
509
510 /**************************************************************************
511 ** This method is used to flush a specified row bucket, either as the
512 ** result of the row bucket reaching its maximum capacity, as a result
513 ** of the periodic flush handler being invoked, or as a result of the
514 ** processor going idle.
515 **
516 ** The method first finds the destination PE for the row.  The method then
517 ** iterates through the messages in the row bucket and constructs an array
518 ** for sizes[] of the message sizes and an array for msgComps[] of
519 ** pointers to the messages in the row bucket.  The method also sets the
520 ** handler for each message to be "RecvmsgHandle" which is the handler
521 ** for multi-message sends.  Finally, the method calls CmiMultiSend() to
522 ** send all messages to the destination PE in one go.
523 **
524 ** After the row bucket is emptied, the method calls CmiFree() to
525 ** deallocate space for the individual messages.  Since each message was
526 ** allocated via CmiAlloc() this is appropriate.
527 **
528 ** Each message in the row bucket has the layout shown in the diagram
529 ** below.
530 **
531 **     +----------------------------------------------------+
532 **     | size | refcount || Converse  ||        user        |
533 **     |      |          ||  header   ||        data        |
534 **     +----------------------------------------------------+
535 **                        ^
536 **                        |
537 **                        msg
538 **
539 */
540
541 void MeshStreamingStrategy::FlushRow (int row)
542 {
543     int dest_pe;
544     int num_msgs;
545     int *sizes;
546     char *msg;
547     char **msgComps;
548     int i;
549     
550     ComlibPrintf ("[%d] MeshStreamingStrategy::FlushRow() invoked.\n", 
551                   CkMyPe());
552     
553     CmiAssert (row < num_rows);
554     
555     dest_pe = my_column + (row * row_length);
556     
557     num_msgs = row_bucket[row].length ();
558     if (num_msgs > 0) {
559         
560         //Strip charm++ envelopes from messages
561       /*
562         if(shortMsgPackingFlag) {
563             MsgPacker mpack(row_bucket[row], num_msgs);
564             CombinedMessage *msg; 
565             int size;
566             mpack.getMessage(msg, size);
567             
568             CmiSyncSendAndFree(dest_pe, size, (char *)msg);
569             return;
570         }
571       */
572         //Send messages without short message packing
573         sizes = new int[num_msgs];
574         msgComps = new char *[num_msgs];
575         
576         for (i = 0; i < num_msgs; i++) {
577             msg = row_bucket[row].deq ();
578             //CmiSetHandler (msg, CkpvAccess(RecvmsgHandle));
579             sizes[i] = SIZEFIELD(msg);//((envelope *)msg)->getTotalsize();
580             msgComps[i] = msg;
581         }
582         
583         CmiMultipleSend (dest_pe, num_msgs, sizes, msgComps);
584         
585         for (i = 0; i < num_msgs; i++) {
586             CmiFree (msgComps[i]);
587         }
588         
589         delete [] sizes;
590         delete [] msgComps;
591     }
592 }
593
594
595
596 /**************************************************************************
597 ** This method exists so various handlers can easily trigger all column
598 ** buckets and row buckets to flush.
599 */
600 void MeshStreamingStrategy::FlushBuffers (void)
601 {
602     ComlibPrintf ("[%d] MeshStreamingStrategy::PeriodicFlush() invoked.\n", 
603                   CkMyPe());
604
605     for (int column = 0; column < num_columns; column++) {
606       FlushColumn ((column+my_column)%num_columns);
607     }
608     
609     for (int row = 0; row < num_rows; row++) {
610       FlushRow ((row+my_row)%num_rows);
611     }
612 }
613
614
615
616 /**************************************************************************
617 ** This method exists primarily so column_handler() can insert messages
618 ** into a specified row bucket.
619 */
620 void MeshStreamingStrategy::InsertIntoRowBucket (int row, char *msg)
621 {
622   ComlibPrintf ("[%d] MeshStreamingStrategy::InsertIntoRowBucket() invoked.\n", CkMyPe());
623
624   CmiAssert (row < num_rows);
625
626   row_bucket[row].enq (msg);
627   if (row_bucket[row].length() > max_bucket_size) {
628     FlushRow (row);
629   }
630 }
631
632
633
634 /**************************************************************************
635 ** This method exists only so column_handler() can get the length of a row
636 ** in the mesh.  Since it is outside of the MeshStreamingStrategy class, it
637 ** does not have direct access to the class variables.
638 */
639 int MeshStreamingStrategy::GetRowLength (void)
640 {
641   ComlibPrintf ("[%d] MeshStreamingStrategy::GetRowLength() invoked.\n", CkMyPe());
642
643   return (row_length);
644 }
645
646
647
648 /**************************************************************************
649 ** This is a very complicated pack/unpack method.
650 **
651 ** This method must handle the column_bucket[] and row_bucket[] data
652 ** structures.  These are arrays of queues of (char *).  To pack these,
653 ** we must iterate through the data structures and pack the sizes of
654 ** each message (char *) pointed to by each queue entry.
655 */
656 void MeshStreamingStrategy::pup (PUP::er &p)
657 {
658
659   ComlibPrintf ("[%d] MeshStreamingStrategy::pup() invoked.\n", CkMyPe());
660
661   // Call the superclass method -- easy.
662   Strategy::pup (p);
663
664   // Pup the instance variables -- easy.
665   p | num_pe;
666   p | num_columns;
667   p | num_rows;
668   p | row_length;
669
670   //p | my_pe;
671   //p | my_column;
672   //p | my_row;
673
674   p | max_bucket_size;
675   p | flush_period;
676   //p | strategy_id;
677   //p | column_handler_id;
678
679   //p | shortMsgPackingFlag;
680
681   // Handle the column_bucket[] data structure.
682   // For each element in column_bucket[], pup the length of the queue
683   // at that element followed by the contents of that queue.  For each
684   // queue, pup the size of the message pointed to by the (char *)
685   // entry, followed by the memory for the (char *) entry.
686   if (p.isUnpacking ()) {
687       column_bucket = new CkQ<char *>[num_columns];
688       column_destQ = new CkQ<int>[num_columns];
689   }
690
691   /*In correct code, will only be useful for checkpointing though
692   for (i = 0; i < num_columns; i++) {
693     int length = column_bucket[i].length ();
694
695     p | length;
696
697     for (int j = 0; j < length; j++) {
698         char *msg = column_bucket[i].deq ();
699         int size = sizeof (int) + ((int *) msg)[1];
700         p | size;
701         p(msg, size);
702     }
703   }
704   */
705
706   // Handle the column_bytes[] data structure.
707   // This is a straightforward packing of an int array.
708   if (p.isUnpacking ()) {
709       column_bytes = new int[num_columns];
710   }
711
712   p(column_bytes, num_columns);
713
714   // Handle the row_bucket[] data structure.
715   // This works exactly like the column_bucket[] above.
716   if (p.isUnpacking ()) {
717     row_bucket = new CkQ<char *>[num_rows];
718   }
719   
720   /* In correct code, will only be useful for checkpointing though
721   for (i = 0; i < num_rows; i++) {
722     int length = row_bucket[i].length ();
723
724     p | length;
725
726     for (int j = 0; j < length; j++) {
727       char *msg = row_bucket[i].deq ();
728       int size = ((int *) msg)[0];
729       p | size;
730       p(msg, size);
731     }
732   }
733   */
734
735     my_pe = CkMyPe ();
736
737     my_column = my_pe % num_columns;
738     my_row = my_pe / row_length;
739     
740     //column_bucket = new CkQ<char *>[num_columns];
741     //column_bytes = new int[num_columns];
742     
743     for (int i = 0; i < num_columns; i++) {
744         column_bytes[i] = 0;
745     }
746     
747     // packing called once on processor 0, unpacking called once on all processors except 0
748     if (p.isPacking() || p.isUnpacking()) {
749       //column_handler_id = CkRegisterHandler ((CmiHandler) column_handler);
750     
751       CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler,
752                                  (void *) this, CkMyPe());
753       RegisterPeriodicFlush ();
754     }
755 }
756
757 PUPable_def(MeshStreamingStrategy)
758
759 /*@}*/