remove unused variables and other cleanup to get a clean compilation.
[charm.git] / src / conv-com / MeshStreamingStrategy.C
1 /**
2    @addtogroup ComlibConverseStrategy   
3    @{
4    @file 
5 */
6
7 #include "MeshStreamingStrategy.h"
8 #include "pup_cmialloc.h"
9 //#include "MsgPacker.h"
10
11 /**** not needed any-more after pup_CmiAlloc 
12 // These macros are taken directly from convcore.c.
13 #define SIZEFIELD(m) (((CmiChunkHeader *)(m))[-1].size)
14 #define REFFIELD(m) (((CmiChunkHeader *)(m))[-1].ref)
15 #define BLKSTART(m) (((CmiChunkHeader *)(m))-1)
16 ***/
17
18 // These externs are defined inside ComlibManager.C.
19 //CkpvExtern(CkGroupID, cmgrID);
20 //CkpvExtern(int, RecvmsgHandle);
21
22 CkpvDeclare(int, streaming_column_handler_id);
23
24 /**************************************************************************
25 ** This handler is invoked automatically when the processor goes idle.
26 **
27 ** The idle handler automatically re-registers itself, so there is no need
28 ** to re-register it from here.
29 **
30 ** If nothing else is going on anyway, we might as well flush the buffers
31 ** now instead of waiting for the flush period.
32 */
33 void idle_flush_handler (void *ptr, double curT)
34 {
35   ComlibPrintf ("[%d] idle_flush_handler() invoked.\n", CkMyPe());
36
37   MeshStreamingStrategy *classptr = (MeshStreamingStrategy *) ptr;
38   classptr->FlushBuffers ();
39 }
40
41
42
43 /**************************************************************************
44 ** This handler is invoked automatically after a timeout occurs.
45 **
46 ** The periodic handler does not automatically re-register itself, so it
47 ** calls RegisterPeriodicFlush() to do so after it finishes flushing
48 ** buffers.
49 */
50 void periodic_flush_handler (void *ptr, double curT)
51 {
52   ComlibPrintf ("[%d] periodic_flush_handler() invoked.\n", CkMyPe());
53
54   MeshStreamingStrategy *classptr = (MeshStreamingStrategy *) ptr;
55   classptr->FlushBuffers ();
56   classptr->RegisterPeriodicFlush ();
57 }
58
59
60
61 /**************************************************************************
62 ** This handler is invoked automatically when a packed message for a column
63 ** is received.
64 **
65 ** The layout of the message received is shown in the diagram below.
66 **
67 **             \  /
68 ** +------------||---------------------------------------------+
69 ** |Conv| I | # || dest || size | ref || Converse  || user | ...
70 ** |hdr | D |   ||  PE  ||      | cnt ||  header   || data | ...
71 ** +------------||---------------------------------------------+
72 **             /  \
73 **
74 ** The function first retrieves the strategy ID and the number of messages
75 ** in the packed message and then uses the strategy ID to obtain a pointer
76 ** to the MeshStreamingStrategy class.  It also obtains the row length by
77 ** calling GetRowLength().
78 **
79 ** The function then iterates through the messages in the packed message.
80 ** For each message within, it allocates space by calling CmiAlloc() and
81 ** then copies the message from the packed buffer into the new message
82 ** buffer.  It is also able to obtain the destination PE for the message
83 ** because this information is included in the packed message data for
84 ** each packed message.  If the destination PE is the current PE, the
85 ** message is delivered immediately via a call to CmiSyncSendAndFree().
86 ** This routine calls CmiFree() on the message, which is appropriate
87 ** since it was allocated with CmiAlloc().  Otherwise, the message is
88 ** inserted into the row bucket for the necessary row by calling
89 ** InsertIntoRowBucket().  When messages are delivered from the row
90 ** bucket, they are freed by CmiFree().
91 */
92 void streaming_column_handler (void *msg)
93 {
94     int dest_pe;
95     int dest_row;
96     int msgsize;
97     int my_pe;
98     //int num_msgs;
99     int row_length;
100     //int strategy_id;
101     //char *msgptr;
102     char *newmsg;
103     MeshStreamingStrategy *classptr;
104         
105     ComlibPrintf ("[%d] column_handler() invoked.\n", CkMyPe());
106     
107     my_pe = CkMyPe ();
108
109     //PUP_cmialloc mem lets us use the converse reference counting
110     //black magic in a transparent way. PUP_fromCmiAllocMem lets sub
111     //messages in a messages be used freely in the program as messages.     
112     PUP_fromCmiAllocMem fp(msg);    
113     MeshStreamingHeader mhdr;
114     
115     //Read the header from the message
116     fp | mhdr;
117     
118     //strategy_id = ((int *) (msg + CmiMsgHeaderSizeBytes))[0];
119     //num_msgs = ((int *) (msg + CmiMsgHeaderSizeBytes))[1];
120     
121     classptr = (MeshStreamingStrategy *)ConvComlibGetStrategy(mhdr.strategy_id);
122     //        CProxy_ComlibManager (CkpvAccess (cmgrID)).
123     //        ckLocalBranch()->getStrategy (mhdr.strategy_id);
124     
125     row_length = classptr->GetRowLength ();
126     
127     //msgptr = (char *) (msg + CmiMsgHeaderSizeBytes + 2 * sizeof(int));
128     
129     for (int i = 0; i < mhdr.num_msgs; i++) {
130         /*
131         dest_pe = ((int *) msgptr)[0];
132         msgsize = ((int *) msgptr)[1];
133         
134         newmsg = (char *) CmiAlloc (msgsize);
135         
136         memcpy (newmsg, (msgptr + 3 * sizeof(int)), msgsize);
137         
138         if (dest_pe == my_pe) {
139             CmiSyncSendAndFree (my_pe, msgsize, newmsg);
140         } else {
141             dest_row = dest_pe / row_length;
142             classptr->InsertIntoRowBucket (dest_row, newmsg);
143         }
144         
145         msgptr += msgsize + 3 * sizeof(int);
146         */
147
148         int dest_pe;
149         fp | dest_pe;
150
151         //Returns a part of a message as an independent message and
152         //updates the reference count of the container message.
153         fp.pupCmiAllocBuf((void **)&newmsg);
154         int msgsize = SIZEFIELD(newmsg);// ((envelope*)newmsg)->getTotalsize();
155
156         if (dest_pe == my_pe) {
157             CmiSyncSendAndFree (my_pe, msgsize, newmsg);
158         } else {
159             dest_row = dest_pe / row_length;
160             classptr->InsertIntoRowBucket (dest_row, newmsg);
161         }
162     }
163     
164     CmiFree (msg);
165 }
166
167
168
169 /**************************************************************************
170 ** This is the MeshStreamingStrategy constructor.
171 **
172 ** The period and bucket_size have default values specified in the .h file.
173 **
174 ** The constructor is invoked when the client code instantiates this
175 ** strategy.  The constructor executes on a SINGLE PROCESS in the
176 ** computation, so it cannot do things like determine an individual
177 ** process's position within the mesh.
178 **
179 ** After the constructor is invoked, the communications library creates
180 ** instances that get pup'ed and shipped to each processor in the
181 ** computation.  To that end, the process that instantiates this strategy
182 ** (most likely PE 0) will then use pup to pack copies of the strategy
183 ** and then ship them off to other processes.  They will be un-pup'ed
184 ** there.  Finally, beginProcessing() will be called on EACH instance on
185 ** its target processor.
186 */
187 MeshStreamingStrategy::MeshStreamingStrategy (int period, int bucket_size) 
188     : Strategy() 
189 {
190     ComlibPrintf ("[%d] MeshStreamingStrategy::MeshStreamingStrategy() invoked.\n", CkMyPe());
191
192     num_pe = CkNumPes ();
193     
194     num_columns = (int) (ceil (sqrt ((double) num_pe)));
195     num_rows = num_columns;
196     row_length = num_columns;
197     
198     flush_period = period;
199     max_bucket_size = bucket_size;
200     
201     column_bucket = new CkQ<char *>[num_columns];
202     column_destQ = new CkQ<int>[num_columns];
203     column_bytes = new int[num_columns];
204     row_bucket = new CkQ<char *>[num_rows];
205
206     //shortMsgPackingFlag = CmiFalse;
207 }
208
209
210
211 /**************************************************************************
212 ** This method is called when the communications library sends a message
213 ** from one PE to another PE.  This could be due to a direct message being
214 ** sent, or due to a method invocation with marshalled parameters.
215 **
216 ** The method begins by getting the destination PE from the
217 ** CharmMessageHolder that is passed in (and from this, computing the
218 ** destination column) and getting a pointer to the User data for the
219 ** message (and from this, computing the Envelope pointer and the Block
220 ** pointer).  The following diagram shows the layout of the message.
221 **
222 **     +----------------------------------------------------+
223 **     | size | refcount || Converse  ||        user        |
224 **     |      |          ||  header   ||        data        |
225 **     +----------------------------------------------------+
226 **     ^                  ^            ^
227 **     |                  |            |
228 **     blk (Block)        msg          usr (User)
229 **
230 ** All Converse messages are allocated by CmiAlloc() which prepends two ints to
231 ** all memory regions to hold a size field and a refcount field. BLKSTART() is a
232 ** macro that gets the start of a block from the envelope pointer.
233 **
234 ** If the destination PE is our current PE, we just deliver the message
235 ** immediately.
236 **
237 ** Otherwise, if the destination PE is in the same column as our PE, we
238 ** allocate a new region of memory with CmiAlloc() and copy from the
239 ** envelope pointer into the new region, and then deposit this new message
240 ** into the appropriate row bucket for our column.  (The row buckets are
241 ** queues of pointers to memory regions exactly like the diagram above.
242 ** All entries in the row bucket are allocated with CmiAlloc() and must
243 ** be deallocated with CmiFree()!)
244 **
245 ** Otherwise, the destination PE must be in a different column from our
246 ** PE.  We allocate a new region of memory with "new" that looks like
247 ** the diagram below.
248 **
249 ** +------------------------------------------------------------+
250 ** | dest || size | refcount || Converse  ||        user        |
251 ** |  PE  ||      |          ||  header   ||        data        |
252 ** +------------------------------------------------------------+
253 ** ^       ^                  ^            ^
254 ** |       |                  |            |
255 ** newmsg  blk (Block)        msg          usr (User)
256 **
257 ** We then deposit this new message into the appropriate column bucket.
258 ** (The column buckets are queues of pointers that are allocated with
259 ** "new" and must be deallocated with "delete"!)
260 */
261
262 void MeshStreamingStrategy::insertMessage (MessageHolder *cmsg)
263 {
264     int dest_pe;
265     int dest_row;
266     int dest_col;
267     int msg_size;
268     int misc_size;
269     int total_size;
270     char *msg;
271     //char *env;
272     //char *blk;
273     //char *newmsg;
274     
275     ComlibPrintf ("[%d] MeshStreamingStrategy::insertMessage() invoked.\n", 
276                   CkMyPe());
277     
278     dest_pe = cmsg->dest_proc;
279     dest_col = dest_pe % num_columns;
280     msg = cmsg->getMessage ();
281     //env = (char *) UsrToEnv (usr);
282     
283     //blk = (char *) BLKSTART (env);
284     msg_size = SIZEFIELD(msg);//((envelope *)env)->getTotalsize();
285
286     //misc_size = (env - blk);
287     total_size = sizeof (int) + sizeof(CmiChunkHeader) + msg_size;
288     
289     if (dest_pe == my_pe) {
290         CmiSyncSend (my_pe, msg_size, msg);
291     } else if (dest_col == my_column) {
292         //newmsg = (char *) CmiAlloc (env_size);
293         //memcpy (newmsg, env, env_size);
294         //newmsg = env;
295         
296         dest_row = dest_pe / row_length;
297         
298         InsertIntoRowBucket (dest_row, msg);
299     } else {
300         //newmsg = new char[total_size];
301         //((int *) newmsg)[0] = dest_pe;
302         //memcpy ( (void *) &(((int *) newmsg)[1]), blk, misc_size + env_size);
303         
304         column_bucket[dest_col].enq (msg);
305         column_destQ[dest_col].enq(dest_pe);
306         column_bytes[dest_col] += total_size;
307         
308         if (column_bucket[dest_col].length() > max_bucket_size) {
309             FlushColumn (dest_col);
310         }
311     }
312     
313     delete cmsg;
314 }
315
316
317
318 /**************************************************************************
319 ** This method is not used for streaming strategies.
320 */
321 void MeshStreamingStrategy::doneInserting ()
322 {
323     ComlibPrintf ("[%d] MeshStreamingStrategy::doneInserting() invoked.\n", CkMyPe());    
324     // Empty for this strategy.
325
326     //FlushBuffers();
327     //Only want to flush local outgoing messages
328     for (int column = 0; column < num_columns; column++) {
329       FlushColumn ((column+my_column)%num_columns);
330     }
331 }
332
333
334 /* *************************************************************************
335 ** This method is invoked prior to any processing taking place in the
336 ** class.  Various initializations take place here that cannot take place
337 ** in the class constructor due to the communications library itself not
338 ** being totally initialized.
339 **
340 ** See MeshStreamingStrategy::MeshStreamingStrategy() for more details.
341 */
342 /*
343 void MeshStreamingStrategy::beginProcessing (int ignored) {
344     ComlibPrintf ("[%d] MeshStreamingStrategy::beginProcessing() invoked.\n", CkMyPe());
345     
346     //strategy_id = myInstanceID;
347     
348     my_pe = CkMyPe ();
349
350     my_column = my_pe % num_columns;
351     my_row = my_pe / row_length;
352     
353     //column_bucket = new CkQ<char *>[num_columns];
354     //column_bytes = new int[num_columns];
355     
356     for (int i = 0; i < num_columns; i++) {
357         column_bytes[i] = 0;
358     }
359     
360     row_bucket = new CkQ<char *>[num_rows];
361     
362     column_handler_id = CkRegisterHandler ((CmiHandler) column_handler);
363     
364     CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler,
365                                (void *) this, CkMyPe());
366     RegisterPeriodicFlush ();
367 }
368 */
369
370
371 /**************************************************************************
372 ** This method exists so periodic_flush_handler() can re-register itself to
373 ** be invoked periodically to flush buffers.
374 */
375 void MeshStreamingStrategy::RegisterPeriodicFlush (void)
376 {
377   ComlibPrintf ("[%d] MeshStreamingStrategy::RegisterPeriodicFlush() invoked.\n", CkMyPe());
378
379   CcdCallFnAfterOnPE(periodic_flush_handler, (void *) this, flush_period, CkMyPe());
380 }
381
382
383
384 /**************************************************************************
385 ** This method is used to flush a specified column bucket, either as the
386 ** result of the column bucket reaching its maximum capacity, as a result
387 ** of the periodic flush handler being invoked, or as a result of the
388 ** processor going idle.
389 **
390 ** The method first finds the destination PE for the column.  This is the
391 ** PE in the target column that is within the same row as the current PE.
392 **
393 ** If there are actually messages in the bucket, then space is allocated
394 ** to hold the new message which will pack all of the messages in the
395 ** column bucket together.  The layout of this message is shown below:
396 **
397 **             \  /
398 ** +------------||-------------------------------------------+
399 ** |Conv| I | # || dest || size | ref || Converse  || user | ...
400 ** |hdr | D |   ||  PE  ||      | cnt ||  header   || data | ...
401 ** +------------||-------------------------------------------+
402 **             /  \
403 **
404 ** Since the buffer represents a Converse message, it must begin with a
405 ** Converse header.  After the header is an int representing the Commlib
406 ** strategy ID for this strategy.  This is needed only so that the
407 ** column_handler() can get a pointer to the MeshStreamingStrategy class
408 ** later.  Next, comes an int containing the number of messages within
409 ** the packed message.  Finally, the messages are removed from the column
410 ** bucket and appended one after another into the buffer.
411 **
412 ** After packing, a handler is set on the message to cause it to invoke
413 ** column_handler() on the destination PE and the message is finally
414 ** sent with CmiSyncSendAndFree().
415 **
416 ** The buffer that is allocated in this message is used as a Converse
417 ** message, so it is allocated with CmiAlloc() so the send routine can
418 ** properly free it with CmiFree().  Therefore it has two ints for size
419 ** and ref count at the beginning of the buffer.  These are not shown in
420 ** the diagram above since they are basically irrelevant to this software.
421 */
422
423 void MeshStreamingStrategy::FlushColumn (int column)
424 {
425     int dest_column_pe;
426     int num_msgs;
427     int newmsgsize;
428     int msgsize;
429     char *newmsg;
430     char *newmsgptr;
431     char *msgptr;
432     
433
434     CmiAssert (column < num_columns);
435     
436     dest_column_pe = column + (my_row * row_length);
437     if (dest_column_pe >= num_pe) {
438       // This means that there is a hole in the mesh.
439       //dest_column_pe = column + ((my_row % (num_rows - 1) - 1) * row_length);
440       int new_row = my_column % (my_row + 1);
441       if(new_row >= my_row)
442         new_row = 0;
443
444       dest_column_pe = column + new_row * row_length;
445     }
446     
447     num_msgs = column_bucket[column].length ();
448     
449     if(num_msgs == 0)
450       return;
451     
452     ComlibPrintf ("[%d] MeshStreamingStrategy::FlushColumn() invoked. to %d\n", 
453                   CkMyPe(), dest_column_pe);    
454
455     PUP_cmiAllocSizer sp;        
456     int i = 0;
457     MeshStreamingHeader mhdr;
458     
459     mhdr.strategy_id = getInstance();
460     mhdr.num_msgs = num_msgs;
461     sp | mhdr;
462     
463     for (i = 0; i < num_msgs; i++) {
464       void *msg = column_bucket[column][i];
465       int size = SIZEFIELD(msg);//((envelope *)msg)->getTotalsize();
466       
467       int destpe = column_destQ[column][i];
468       sp | destpe;
469       sp.pupCmiAllocBuf((void **)&msg, size);
470     }
471     
472     newmsgsize = sp.size();
473     newmsg = (char *) CmiAlloc (newmsgsize);
474     
475     //((int *) (newmsg + CmiMsgHeaderSizeBytes))[0] = strategy_id;
476     //((int *) (newmsg + CmiMsgHeaderSizeBytes))[1] = num_msgs;
477     
478     PUP_toCmiAllocMem mp(newmsg);
479     //make a structure header
480     mp | mhdr;
481     
482     /*
483       newmsgptr = (char *) (newmsg + CmiMsgHeaderSizeBytes + 2 * sizeof (int));               
484       for (int i = 0; i < num_msgs; i++) {
485       msgptr = column_bucket[column].deq ();            
486       msgsize = ((int *) msgptr)[1] + (3 * sizeof (int));
487       memcpy (newmsgptr, msgptr, msgsize);
488       
489       newmsgptr += msgsize;
490       
491       delete [] msgptr;
492       }
493     */
494     
495     for (i = 0; i < num_msgs; i++) {
496       void *msg = column_bucket[column][i];
497       int destpe = column_destQ[column][i];
498       int size = SIZEFIELD(msg);//((envelope*)msg)->getTotalsize();
499       
500       mp | destpe;
501       mp.pupCmiAllocBuf((void **)&msg, size);
502     }
503     
504     for (i = 0; i < num_msgs; i++) {
505       void *msg = column_bucket[column].deq();
506       CmiFree(msg);
507       
508       column_destQ[column].deq();
509     }
510     
511     column_bytes[column] = 0;        
512     CmiSetHandler (newmsg, CkpvAccess(streaming_column_handler_id));        
513     CmiSyncSendAndFree (dest_column_pe, newmsgsize, newmsg);
514 }
515
516
517 /**************************************************************************
518 ** This method is used to flush a specified row bucket, either as the
519 ** result of the row bucket reaching its maximum capacity, as a result
520 ** of the periodic flush handler being invoked, or as a result of the
521 ** processor going idle.
522 **
523 ** The method first finds the destination PE for the row.  The method then
524 ** iterates through the messages in the row bucket and constructs an array
525 ** for sizes[] of the message sizes and an array for msgComps[] of
526 ** pointers to the messages in the row bucket.  The method also sets the
527 ** handler for each message to be "RecvmsgHandle" which is the handler
528 ** for multi-message sends.  Finally, the method calls CmiMultiSend() to
529 ** send all messages to the destination PE in one go.
530 **
531 ** After the row bucket is emptied, the method calls CmiFree() to
532 ** deallocate space for the individual messages.  Since each message was
533 ** allocated via CmiAlloc() this is appropriate.
534 **
535 ** Each message in the row bucket has the layout shown in the diagram
536 ** below.
537 **
538 **     +----------------------------------------------------+
539 **     | size | refcount || Converse  ||        user        |
540 **     |      |          ||  header   ||        data        |
541 **     +----------------------------------------------------+
542 **                        ^
543 **                        |
544 **                        msg
545 **
546 */
547
548 void MeshStreamingStrategy::FlushRow (int row)
549 {
550     int dest_pe;
551     int num_msgs;
552     int *sizes;
553     char *msg;
554     char **msgComps;
555     int i;
556     
557     ComlibPrintf ("[%d] MeshStreamingStrategy::FlushRow() invoked.\n", 
558                   CkMyPe());
559     
560     CmiAssert (row < num_rows);
561     
562     dest_pe = my_column + (row * row_length);
563     
564     num_msgs = row_bucket[row].length ();
565     if (num_msgs > 0) {
566         
567         //Strip charm++ envelopes from messages
568       /*
569         if(shortMsgPackingFlag) {
570             MsgPacker mpack(row_bucket[row], num_msgs);
571             CombinedMessage *msg; 
572             int size;
573             mpack.getMessage(msg, size);
574             
575             CmiSyncSendAndFree(dest_pe, size, (char *)msg);
576             return;
577         }
578       */
579         //Send messages without short message packing
580         sizes = new int[num_msgs];
581         msgComps = new char *[num_msgs];
582         
583         for (i = 0; i < num_msgs; i++) {
584             msg = row_bucket[row].deq ();
585             //CmiSetHandler (msg, CkpvAccess(RecvmsgHandle));
586             sizes[i] = SIZEFIELD(msg);//((envelope *)msg)->getTotalsize();
587             msgComps[i] = msg;
588         }
589         
590         CmiMultipleSend (dest_pe, num_msgs, sizes, msgComps);
591         
592         for (i = 0; i < num_msgs; i++) {
593             CmiFree (msgComps[i]);
594         }
595         
596         delete [] sizes;
597         delete [] msgComps;
598     }
599 }
600
601
602
603 /**************************************************************************
604 ** This method exists so various handlers can easily trigger all column
605 ** buckets and row buckets to flush.
606 */
607 void MeshStreamingStrategy::FlushBuffers (void)
608 {
609     ComlibPrintf ("[%d] MeshStreamingStrategy::PeriodicFlush() invoked.\n", 
610                   CkMyPe());
611
612     for (int column = 0; column < num_columns; column++) {
613       FlushColumn ((column+my_column)%num_columns);
614     }
615     
616     for (int row = 0; row < num_rows; row++) {
617       FlushRow ((row+my_row)%num_rows);
618     }
619 }
620
621
622
623 /**************************************************************************
624 ** This method exists primarily so column_handler() can insert messages
625 ** into a specified row bucket.
626 */
627 void MeshStreamingStrategy::InsertIntoRowBucket (int row, char *msg)
628 {
629   ComlibPrintf ("[%d] MeshStreamingStrategy::InsertIntoRowBucket() invoked.\n", CkMyPe());
630
631   CmiAssert (row < num_rows);
632
633   row_bucket[row].enq (msg);
634   if (row_bucket[row].length() > max_bucket_size) {
635     FlushRow (row);
636   }
637 }
638
639
640
641 /**************************************************************************
642 ** This method exists only so column_handler() can get the length of a row
643 ** in the mesh.  Since it is outside of the MeshStreamingStrategy class, it
644 ** does not have direct access to the class variables.
645 */
646 int MeshStreamingStrategy::GetRowLength (void)
647 {
648   ComlibPrintf ("[%d] MeshStreamingStrategy::GetRowLength() invoked.\n", CkMyPe());
649
650   return (row_length);
651 }
652
653
654
655 /**************************************************************************
656 ** This is a very complicated pack/unpack method.
657 **
658 ** This method must handle the column_bucket[] and row_bucket[] data
659 ** structures.  These are arrays of queues of (char *).  To pack these,
660 ** we must iterate through the data structures and pack the sizes of
661 ** each message (char *) pointed to by each queue entry.
662 */
663 void MeshStreamingStrategy::pup (PUP::er &p)
664 {
665   int i;
666   ComlibPrintf ("[%d] MeshStreamingStrategy::pup() invoked.\n", CkMyPe());
667
668   // Call the superclass method -- easy.
669   Strategy::pup (p);
670
671   // Pup the instance variables -- easy.
672   p | num_pe;
673   p | num_columns;
674   p | num_rows;
675   p | row_length;
676
677   //p | my_pe;
678   //p | my_column;
679   //p | my_row;
680
681   p | max_bucket_size;
682   p | flush_period;
683   //p | strategy_id;
684   //p | column_handler_id;
685
686   //p | shortMsgPackingFlag;
687
688   // Handle the column_bucket[] data structure.
689   // For each element in column_bucket[], pup the length of the queue
690   // at that element followed by the contents of that queue.  For each
691   // queue, pup the size of the message pointed to by the (char *)
692   // entry, followed by the memory for the (char *) entry.
693   if (p.isUnpacking ()) {
694       column_bucket = new CkQ<char *>[num_columns];
695       column_destQ = new CkQ<int>[num_columns];
696   }
697
698   /*In correct code, will only be useful for checkpointing though
699   for (i = 0; i < num_columns; i++) {
700     int length = column_bucket[i].length ();
701
702     p | length;
703
704     for (int j = 0; j < length; j++) {
705         char *msg = column_bucket[i].deq ();
706         int size = sizeof (int) + ((int *) msg)[1];
707         p | size;
708         p(msg, size);
709     }
710   }
711   */
712
713   // Handle the column_bytes[] data structure.
714   // This is a straightforward packing of an int array.
715   if (p.isUnpacking ()) {
716       column_bytes = new int[num_columns];
717   }
718
719   p(column_bytes, num_columns);
720
721   // Handle the row_bucket[] data structure.
722   // This works exactly like the column_bucket[] above.
723   if (p.isUnpacking ()) {
724     row_bucket = new CkQ<char *>[num_rows];
725   }
726   
727   /* In correct code, will only be useful for checkpointing though
728   for (i = 0; i < num_rows; i++) {
729     int length = row_bucket[i].length ();
730
731     p | length;
732
733     for (int j = 0; j < length; j++) {
734       char *msg = row_bucket[i].deq ();
735       int size = ((int *) msg)[0];
736       p | size;
737       p(msg, size);
738     }
739   }
740   */
741
742     my_pe = CkMyPe ();
743
744     my_column = my_pe % num_columns;
745     my_row = my_pe / row_length;
746     
747     //column_bucket = new CkQ<char *>[num_columns];
748     //column_bytes = new int[num_columns];
749     
750     for (int i = 0; i < num_columns; i++) {
751         column_bytes[i] = 0;
752     }
753     
754     // packing called once on processor 0, unpacking called once on all processors except 0
755     if (p.isPacking() || p.isUnpacking()) {
756       //column_handler_id = CkRegisterHandler ((CmiHandler) column_handler);
757     
758       CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler,
759                                  (void *) this, CkMyPe());
760       RegisterPeriodicFlush ();
761     }
762 }
763
764 PUPable_def(MeshStreamingStrategy)
765
766 /*@}*/