Cleaned up Mesh Streaming strategy. Also added some more optimizations to MesgPacker
authorSameer Kumar <skumar2@uiuc.edu>
Fri, 19 Nov 2004 20:34:02 +0000 (20:34 +0000)
committerSameer Kumar <skumar2@uiuc.edu>
Fri, 19 Nov 2004 20:34:02 +0000 (20:34 +0000)
src/ck-com/ComlibStrategy.C
src/ck-com/ComlibStrategy.h
src/ck-com/MeshStreamingStrategy.C
src/ck-com/MeshStreamingStrategy.h
src/ck-com/MsgPacker.C
src/ck-com/MsgPacker.h

index d7081314b42fc2bd87653419b558f55da49fe42a..ca6e76646148d742d24a3468a63dc7c049138171 100644 (file)
@@ -35,10 +35,6 @@ CharmMessageHolder::CharmMessageHolder(char * msg, int proc)
 CharmMessageHolder::~CharmMessageHolder() { 
 }
 
-char * CharmMessageHolder::getCharmMessage(){
-    return (char *)EnvToUsr((envelope *) data);
-}
-
 void CharmMessageHolder::pup(PUP::er &p) {
 
     //    CkPrintf("In CharmMessageHolder::pup \n"); 
index 7ee83b91b2e5dc0748af88b43800315b7ce5ec94..c81c3c81c3a00ac90f572145a5f09c12e67c0559 100644 (file)
@@ -5,6 +5,7 @@
 #include "ckhashtable.h"
 #include "convcomlibstrategy.h"
 #include "ComlibLearner.h"
+#include "envelope.h"
 
 CkpvExtern(int, migrationDoneHandlerID);
 
@@ -20,7 +21,9 @@ class CharmMessageHolder : public MessageHolder{
     CharmMessageHolder(char * msg, int dest_proc);
     ~CharmMessageHolder();
 
-    char * getCharmMessage();
+    inline char * getCharmMessage() {
+        return (char *)EnvToUsr((envelope *) data);
+    }
     
     virtual void pup(PUP::er &p);
     PUPable_decl(CharmMessageHolder);
index f9cb46d76d1e72ffb9726ecb7a01be8ae037fe84..9d2b1fc75f645c1e950feffc1f1f68e205da82ad 100644 (file)
@@ -38,6 +38,8 @@
 */
 
 #include "MeshStreamingStrategy.h"
+#include "pup_cmialloc.h"
+#include "MsgPacker.h"
 
 // These macros are taken directly from convcore.c.
 #define SIZEFIELD(m) (((CmiChunkHeader *)(m))[-1].size)
@@ -119,51 +121,77 @@ void periodic_flush_handler (void *ptr, double curT)
 */
 void column_handler (char *msg)
 {
-  int dest_pe;
-  int dest_row;
-  int msgsize;
-  int my_pe;
-  int num_msgs;
-  int row_length;
-  int strategy_id;
-  char *msgptr;
-  char *newmsg;
-  MeshStreamingStrategy *classptr;
-
-
-  ComlibPrintf ("[%d] column_handler() invoked.\n", CkMyPe());
-
-  my_pe = CkMyPe ();
-
-  strategy_id = ((int *) (msg + CmiMsgHeaderSizeBytes))[0];
-  num_msgs = ((int *) (msg + CmiMsgHeaderSizeBytes))[1];
-
-  classptr = (MeshStreamingStrategy *)
-             CProxy_ComlibManager (CkpvAccess (cmgrID)).
-             ckLocalBranch()->getStrategy (strategy_id);
-
-  row_length = classptr->GetRowLength ();
-
-  msgptr = (char *) (msg + CmiMsgHeaderSizeBytes + 2 * sizeof(int));
-  for (int i = 0; i < num_msgs; i++) {
-    dest_pe = ((int *) msgptr)[0];
-    msgsize = ((int *) msgptr)[1];
-
-    newmsg = (char *) CmiAlloc (msgsize);
-
-    memcpy (newmsg, (msgptr + 3 * sizeof(int)), msgsize);
-
-    if (dest_pe == my_pe) {
-      CmiSyncSendAndFree (my_pe, msgsize, newmsg);
-    } else {
-      dest_row = dest_pe / row_length;
-      classptr->InsertIntoRowBucket (dest_row, newmsg);
+    int dest_pe;
+    int dest_row;
+    int msgsize;
+    int my_pe;
+    //int num_msgs;
+    int row_length;
+    //int strategy_id;
+    //char *msgptr;
+    char *newmsg;
+    MeshStreamingStrategy *classptr;
+        
+    ComlibPrintf ("[%d] column_handler() invoked.\n", CkMyPe());
+    
+    my_pe = CkMyPe ();
+
+    //PUP_cmialloc mem lets us use the converse reference counting
+    //black magic in a transparent way. PUP_fromCmiAllocMem lets sub
+    //messages in a messages be used freely in the program as messages.     
+    PUP_fromCmiAllocMem fp(msg);    
+    MeshStreamingHeader mhdr;
+    
+    //Read the header from the message
+    fp | mhdr;
+    
+    //strategy_id = ((int *) (msg + CmiMsgHeaderSizeBytes))[0];
+    //num_msgs = ((int *) (msg + CmiMsgHeaderSizeBytes))[1];
+    
+    classptr = (MeshStreamingStrategy *)
+        CProxy_ComlibManager (CkpvAccess (cmgrID)).
+        ckLocalBranch()->getStrategy (mhdr.strategy_id);
+    
+    row_length = classptr->GetRowLength ();
+    
+    //msgptr = (char *) (msg + CmiMsgHeaderSizeBytes + 2 * sizeof(int));
+    
+    for (int i = 0; i < mhdr.num_msgs; i++) {
+        /*
+        dest_pe = ((int *) msgptr)[0];
+        msgsize = ((int *) msgptr)[1];
+        
+        newmsg = (char *) CmiAlloc (msgsize);
+        
+        memcpy (newmsg, (msgptr + 3 * sizeof(int)), msgsize);
+        
+        if (dest_pe == my_pe) {
+            CmiSyncSendAndFree (my_pe, msgsize, newmsg);
+        } else {
+            dest_row = dest_pe / row_length;
+            classptr->InsertIntoRowBucket (dest_row, newmsg);
+        }
+        
+        msgptr += msgsize + 3 * sizeof(int);
+        */
+
+        int dest_pe;
+        fp | dest_pe;
+
+        //Returns a part of a message as an independent message and
+        //updates the reference count of the container message.
+        fp.pupCmiAllocBuf((void **)&newmsg);
+        int msgsize = SIZEFIELD(newmsg);
+
+        if (dest_pe == my_pe) {
+            CmiSyncSendAndFree (my_pe, msgsize, newmsg);
+        } else {
+            dest_row = dest_pe / row_length;
+            classptr->InsertIntoRowBucket (dest_row, newmsg);
+        }
     }
-
-    msgptr += msgsize + 3 * sizeof(int);
-  }
-
-  CmiFree (msg);
+    
+    CmiFree (msg);
 }
 
 
@@ -189,20 +217,23 @@ void column_handler (char *msg)
 MeshStreamingStrategy::MeshStreamingStrategy (int period, int bucket_size) 
     : CharmStrategy() 
 {
-  ComlibPrintf ("[%d] MeshStreamingStrategy::MeshStreamingStrategy() invoked.\n", CkMyPe());
-
-  num_pe = CkNumPes ();
-
-  num_columns = (int) (ceil (sqrt ((double) num_pe)));
-  num_rows = num_columns;
-  row_length = num_columns;
-
-  flush_period = period;
-  max_bucket_size = bucket_size;
+    ComlibPrintf ("[%d] MeshStreamingStrategy::MeshStreamingStrategy() invoked.\n", CkMyPe());
+
+    num_pe = CkNumPes ();
+    
+    num_columns = (int) (ceil (sqrt ((double) num_pe)));
+    num_rows = num_columns;
+    row_length = num_columns;
+    
+    flush_period = period;
+    max_bucket_size = bucket_size;
+    
+    column_bucket = new CkQ<char *>[num_columns];
+    column_destQ = new CkQ<int>[num_columns];
+    column_bytes = new int[num_columns];
+    row_bucket = new CkQ<char *>[num_rows];
 
-  column_bucket = new CkQ<char *>[num_columns];
-  column_bytes = new int[num_columns];
-  row_bucket = new CkQ<char *>[num_rows];
+    shortMsgPackingFlag = CmiFalse;
 }
 
 
@@ -260,54 +291,57 @@ MeshStreamingStrategy::MeshStreamingStrategy (int period, int bucket_size)
 ** (The column buckets are queues of pointers that are allocated with
 ** "new" and must be deallocated with "delete"!)
 */
+
 void MeshStreamingStrategy::insertMessage (CharmMessageHolder *cmsg)
 {
-  int dest_pe;
-  int dest_row;
-  int dest_col;
-  int env_size;
-  int misc_size;
-  int total_size;
-  char *usr;
-  char *env;
-  char *blk;
-  char *newmsg;
-
-
-  ComlibPrintf ("[%d] MeshStreamingStrategy::insertMessage() invoked.\n", CkMyPe());
-
-  dest_pe = cmsg->dest_proc;
-  dest_col = dest_pe % num_columns;
-  usr = cmsg->getCharmMessage ();
-  env = (char *) UsrToEnv (usr);
-  blk = (char *) BLKSTART (env);
-  env_size = SIZEFIELD (env);
-  misc_size = (env - blk);
-  total_size = sizeof (int) + misc_size + env_size;
-
-  if (dest_pe == my_pe) {
-    CmiSyncSend (my_pe, env_size, env);
-  } else if (dest_col == my_column) {
-    newmsg = (char *) CmiAlloc (env_size);
-    memcpy (newmsg, env, env_size);
-
-    dest_row = dest_pe / row_length;
-
-    InsertIntoRowBucket (dest_row, newmsg);
-  } else {
-    newmsg = new char[total_size];
-    ((int *) newmsg)[0] = dest_pe;
-    memcpy ( (void *) &(((int *) newmsg)[1]), blk, misc_size + env_size);
-
-    column_bucket[dest_col].enq (newmsg);
-    column_bytes[dest_col] += total_size;
-
-    if (column_bucket[dest_col].length() > max_bucket_size) {
-      FlushColumn (dest_col);
+    int dest_pe;
+    int dest_row;
+    int dest_col;
+    int env_size;
+    int misc_size;
+    int total_size;
+    char *usr;
+    char *env;
+    //char *blk;
+    //char *newmsg;
+    
+    ComlibPrintf ("[%d] MeshStreamingStrategy::insertMessage() invoked.\n", 
+                  CkMyPe());
+    
+    dest_pe = cmsg->dest_proc;
+    dest_col = dest_pe % num_columns;
+    usr = cmsg->getCharmMessage ();
+    env = (char *) UsrToEnv (usr);
+    //blk = (char *) BLKSTART (env);
+    env_size = SIZEFIELD (env);
+    //misc_size = (env - blk);
+    total_size = sizeof (int) + sizeof(CmiChunkHeader) + env_size;
+    
+    if (dest_pe == my_pe) {
+        CmiSyncSend (my_pe, env_size, env);
+    } else if (dest_col == my_column) {
+        //newmsg = (char *) CmiAlloc (env_size);
+        //memcpy (newmsg, env, env_size);
+        //newmsg = env;
+        
+        dest_row = dest_pe / row_length;
+        
+        InsertIntoRowBucket (dest_row, env);
+    } else {
+        //newmsg = new char[total_size];
+        //((int *) newmsg)[0] = dest_pe;
+        //memcpy ( (void *) &(((int *) newmsg)[1]), blk, misc_size + env_size);
+        
+        column_bucket[dest_col].enq (env);
+        column_destQ[dest_col].enq(dest_pe);
+        column_bytes[dest_col] += total_size;
+        
+        if (column_bucket[dest_col].length() > max_bucket_size) {
+            FlushColumn (dest_col);
+        }
     }
-  }
-
-  delete cmsg;
+    
+    delete cmsg;
 }
 
 
@@ -317,13 +351,11 @@ void MeshStreamingStrategy::insertMessage (CharmMessageHolder *cmsg)
 */
 void MeshStreamingStrategy::doneInserting ()
 {
-  ComlibPrintf ("[%d] MeshStreamingStrategy::doneInserting() invoked.\n", CkMyPe());
-
-  // Empty for this strategy.
+    ComlibPrintf ("[%d] MeshStreamingStrategy::doneInserting() invoked.\n", CkMyPe());    
+    // Empty for this strategy.
 }
 
 
-
 /**************************************************************************
 ** This method is invoked prior to any processing taking place in the
 ** class.  Various initializations take place here that cannot take place
@@ -334,28 +366,29 @@ void MeshStreamingStrategy::doneInserting ()
 */
 void MeshStreamingStrategy::beginProcessing (int ignored)
 {
-  ComlibPrintf ("[%d] MeshStreamingStrategy::beginProcessing() invoked.\n", CkMyPe());
-
-  strategy_id = myInstanceID;
-
-  my_pe = CkMyPe ();
-
-  my_column = my_pe % num_columns;
-  my_row = my_pe / row_length;
-
-  column_bucket = new CkQ<char *>[num_columns];
-  column_bytes = new int[num_columns];
-  for (int i = 0; i < num_columns; i++) {
-    column_bytes[i] = 0;
-  }
-
-  row_bucket = new CkQ<char *>[num_rows];
-
-  column_handler_id = CkRegisterHandler ((CmiHandler) column_handler);
-
-  CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler,
-                         (void *) this, CkMyPe());
-  RegisterPeriodicFlush ();
+    ComlibPrintf ("[%d] MeshStreamingStrategy::beginProcessing() invoked.\n", CkMyPe());
+    
+    strategy_id = myInstanceID;
+    
+    my_pe = CkMyPe ();
+
+    my_column = my_pe % num_columns;
+    my_row = my_pe / row_length;
+    
+    //column_bucket = new CkQ<char *>[num_columns];
+    //column_bytes = new int[num_columns];
+    
+    for (int i = 0; i < num_columns; i++) {
+        column_bytes[i] = 0;
+    }
+    
+    row_bucket = new CkQ<char *>[num_rows];
+    
+    column_handler_id = CkRegisterHandler ((CmiHandler) column_handler);
+    
+    CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE, idle_flush_handler,
+                               (void *) this, CkMyPe());
+    RegisterPeriodicFlush ();
 }
 
 
@@ -411,58 +444,88 @@ void MeshStreamingStrategy::RegisterPeriodicFlush (void)
 ** and ref count at the beginning of the buffer.  These are not shown in
 ** the diagram above since they are basically irrelevant to this software.
 */
+
 void MeshStreamingStrategy::FlushColumn (int column)
 {
-  int dest_pe;
-  int num_msgs;
-  int newmsgsize;
-  int msgsize;
-  char *newmsg;
-  char *newmsgptr;
-  char *msgptr;
-
-
-  ComlibPrintf ("[%d] MeshStreamingStrategy::FlushColumn() invoked.\n", CkMyPe());
-
-  assert (column < num_columns);
-
-  dest_pe = column + (my_row * row_length);
-  if (dest_pe >= num_pe) {
-    // This means that there is a hole in the mesh.
-    dest_pe = column + ((my_row % (num_rows - 1) - 1) * row_length);
-  }
-
-  num_msgs = column_bucket[column].length ();
-
-  if (num_msgs > 0) {
-    newmsgsize = CmiMsgHeaderSizeBytes + 2 * sizeof (int) +
-                 column_bytes[column];
-    newmsg = (char *) CmiAlloc (newmsgsize);
-
-    ((int *) (newmsg + CmiMsgHeaderSizeBytes))[0] = strategy_id;
-    ((int *) (newmsg + CmiMsgHeaderSizeBytes))[1] = num_msgs;
-
-    newmsgptr = (char *) (newmsg + CmiMsgHeaderSizeBytes + 2 * sizeof (int));
-    for (int i = 0; i < num_msgs; i++) {
-      msgptr = column_bucket[column].deq ();
-      msgsize = ((int *) msgptr)[1] + (3 * sizeof (int));
-      memcpy (newmsgptr, msgptr, msgsize);
-
-      newmsgptr += msgsize;
-
-      delete [] msgptr;
+    int dest_pe;
+    int num_msgs;
+    int newmsgsize;
+    int msgsize;
+    char *newmsg;
+    char *newmsgptr;
+    char *msgptr;
+    
+    ComlibPrintf ("[%d] MeshStreamingStrategy::FlushColumn() invoked.\n", 
+                  CkMyPe());
+
+    assert (column < num_columns);
+    
+    dest_pe = column + (my_row * row_length);
+    if (dest_pe >= num_pe) {
+        // This means that there is a hole in the mesh.
+        dest_pe = column + ((my_row % (num_rows - 1) - 1) * row_length);
+    }
+    
+    num_msgs = column_bucket[column].length ();
+        
+    if (num_msgs > 0) {
+        PUP_cmiAllocSizer sp;        
+        int i = 0;
+        MeshStreamingHeader mhdr;
+
+        mhdr.strategy_id = strategy_id;
+        mhdr.num_msgs = num_msgs;
+        sp | mhdr;
+        
+        for (i = 0; i < num_msgs; i++) {
+            void *msg = column_bucket[column][i];
+            int size = SIZEFIELD(msg);
+            
+            int dest_pe = column_destQ[column][i];
+            sp | dest_pe;
+            sp.pupCmiAllocBuf((void **)&msg, size);
+        }
+
+        newmsgsize = sp.size();
+        newmsg = (char *) CmiAlloc (newmsgsize);
+        
+        //((int *) (newmsg + CmiMsgHeaderSizeBytes))[0] = strategy_id;
+        //((int *) (newmsg + CmiMsgHeaderSizeBytes))[1] = num_msgs;
+
+        PUP_toCmiAllocMem mp(newmsg);
+        //make a structure header
+        mp | mhdr;
+        
+        /*
+          newmsgptr = (char *) (newmsg + CmiMsgHeaderSizeBytes + 2 * sizeof (int));               
+          for (int i = 0; i < num_msgs; i++) {
+          msgptr = column_bucket[column].deq ();            
+          msgsize = ((int *) msgptr)[1] + (3 * sizeof (int));
+          memcpy (newmsgptr, msgptr, msgsize);
+          
+          newmsgptr += msgsize;
+          
+            delete [] msgptr;
+            }
+        */
+        
+        for (i = 0; i < num_msgs; i++) {
+            void *msg = column_bucket[column].deq();
+            int destpe = column_destQ[column].deq();
+            int size = SIZEFIELD(msg);
+            
+            mp | destpe;
+            mp.pupCmiAllocBuf((void **)&msg, size);
+            CmiFree(msg);
+        }
+        
+        column_bytes[column] = 0;        
+        CmiSetHandler (newmsg, column_handler_id);        
+        CmiSyncSendAndFree (dest_pe, newmsgsize, newmsg);
     }
-
-    column_bytes[column] = 0;
-
-    CmiSetHandler (newmsg, column_handler_id);
-
-    CmiSyncSendAndFree (dest_pe, newmsgsize, newmsg);
-  }
 }
 
 
-
 /**************************************************************************
 ** This method is used to flush a specified row bucket, either as the
 ** result of the row bucket reaching its maximum capacity, as a result
@@ -493,42 +556,57 @@ void MeshStreamingStrategy::FlushColumn (int column)
 **                        msg
 **
 */
+
 void MeshStreamingStrategy::FlushRow (int row)
 {
-  int dest_pe;
-  int num_msgs;
-  int *sizes;
-  char *msg;
-  char **msgComps;
-  int i;
-
-  ComlibPrintf ("[%d] MeshStreamingStrategy::FlushRow() invoked.\n", CkMyPe());
-
-  assert (row < num_rows);
-
-  dest_pe = my_column + (row * row_length);
-
-  num_msgs = row_bucket[row].length ();
-  if (num_msgs > 0) {
-    sizes = new int[num_msgs];
-    msgComps = new char *[num_msgs];
-
-    for (i = 0; i < num_msgs; i++) {
-      msg = row_bucket[row].deq ();
-      CmiSetHandler (msg, CkpvAccess(RecvmsgHandle));
-      sizes[i] = SIZEFIELD (msg);
-      msgComps[i] = msg;
-    }
-
-    CmiMultipleSend (dest_pe, num_msgs, sizes, msgComps);
-
-    for (i = 0; i < num_msgs; i++) {
-      CmiFree (msgComps[i]);
+    int dest_pe;
+    int num_msgs;
+    int *sizes;
+    char *msg;
+    char **msgComps;
+    int i;
+    
+    ComlibPrintf ("[%d] MeshStreamingStrategy::FlushRow() invoked.\n", 
+                  CkMyPe());
+    
+    assert (row < num_rows);
+    
+    dest_pe = my_column + (row * row_length);
+    
+    num_msgs = row_bucket[row].length ();
+    if (num_msgs > 0) {
+        
+        //Strip charm++ envelopes from messages
+        if(shortMsgPackingFlag) {
+            MsgPacker mpack(row_bucket[row], num_msgs);
+            CombinedMessage *msg; 
+            int size;
+            mpack.getMessage(msg, size);
+            
+            CmiSyncSendAndFree(dest_pe, size, (char *)msg);
+            return;
+        }
+        
+        //Send messages without short message packing
+        sizes = new int[num_msgs];
+        msgComps = new char *[num_msgs];
+        
+        for (i = 0; i < num_msgs; i++) {
+            msg = row_bucket[row].deq ();
+            CmiSetHandler (msg, CkpvAccess(RecvmsgHandle));
+            sizes[i] = SIZEFIELD (msg);
+            msgComps[i] = msg;
+        }
+        
+        CmiMultipleSend (dest_pe, num_msgs, sizes, msgComps);
+        
+        for (i = 0; i < num_msgs; i++) {
+            CmiFree (msgComps[i]);
+        }
+        
+        delete [] sizes;
+        delete [] msgComps;
     }
-
-    delete [] sizes;
-    delete [] msgComps;
-  }
 }
 
 
@@ -539,15 +617,16 @@ void MeshStreamingStrategy::FlushRow (int row)
 */
 void MeshStreamingStrategy::FlushBuffers (void)
 {
-  ComlibPrintf ("[%d] MeshStreamingStrategy::PeriodicFlush() invoked.\n", CkMyPe());
-
-  for (int column = 0; column < num_columns; column++) {
-    FlushColumn (column);
-  }
+    ComlibPrintf ("[%d] MeshStreamingStrategy::PeriodicFlush() invoked.\n", 
+                  CkMyPe());
 
-  for (int row = 0; row < num_rows; row++) {
-    FlushRow (row);
-  }
+    for (int column = 0; column < num_columns; column++) {
+        FlushColumn (column);
+    }
+    
+    for (int row = 0; row < num_rows; row++) {
+        FlushRow (row);
+    }
 }
 
 
@@ -615,32 +694,37 @@ void MeshStreamingStrategy::pup (PUP::er &p)
   p | strategy_id;
   p | column_handler_id;
 
+  p | shortMsgPackingFlag;
+
   // Handle the column_bucket[] data structure.
   // For each element in column_bucket[], pup the length of the queue
   // at that element followed by the contents of that queue.  For each
   // queue, pup the size of the message pointed to by the (char *)
   // entry, followed by the memory for the (char *) entry.
   if (p.isUnpacking ()) {
-    column_bucket = new CkQ<char *>[num_columns];
+      column_bucket = new CkQ<char *>[num_columns];
+      column_destQ = new CkQ<int>[num_columns];
   }
 
+  /*In correct code, will only be useful for checkpointing though
   for (i = 0; i < num_columns; i++) {
     int length = column_bucket[i].length ();
 
     p | length;
 
     for (int j = 0; j < length; j++) {
-      char *msg = column_bucket[i].deq ();
-      int size = sizeof (int) + ((int *) msg)[1];
-      p | size;
-      p(msg, size);
+        char *msg = column_bucket[i].deq ();
+        int size = sizeof (int) + ((int *) msg)[1];
+        p | size;
+        p(msg, size);
     }
   }
+  */
 
   // Handle the column_bytes[] data structure.
   // This is a straightforward packing of an int array.
   if (p.isUnpacking ()) {
-    column_bytes = new int[num_columns];
+      column_bytes = new int[num_columns];
   }
 
   p(column_bytes, num_columns);
@@ -650,7 +734,8 @@ void MeshStreamingStrategy::pup (PUP::er &p)
   if (p.isUnpacking ()) {
     row_bucket = new CkQ<char *>[num_rows];
   }
-
+  
+  /* In correct code, will only be useful for checkpointing though
   for (i = 0; i < num_rows; i++) {
     int length = row_bucket[i].length ();
 
@@ -663,4 +748,5 @@ void MeshStreamingStrategy::pup (PUP::er &p)
       p(msg, size);
     }
   }
+  */
 }
index f5cfae1cbdae6afe4918daf37d1aa17d09071f77..3664669151444b070917efbb16092eaca6390440 100644 (file)
 #define DEFAULT_FLUSH_PERIOD 10        // milliseconds
 #define DEFAULT_MAX_BUCKET_SIZE 1000   // number of messages
 
+//Passed along with every row message header in the first iteration of
+//the MesgStreamingStrategy
+struct MeshStreamingHeader {
+    char conv_hdr[CmiMsgHeaderSizeBytes];
+    int strategy_id;
+    int num_msgs;
+};
+
+PUPbytes(MeshStreamingHeader);
+
 class MeshStreamingStrategy : public CharmStrategy
 {
-  public:
+    CmiBool shortMsgPackingFlag;
+ public:
     MeshStreamingStrategy (int period=DEFAULT_FLUSH_PERIOD,
                           int bucket_size=DEFAULT_MAX_BUCKET_SIZE);
     MeshStreamingStrategy (CkMigrateMessage *m) : CharmStrategy(m){ }
-
+        
     void insertMessage (CharmMessageHolder *msg);
     void doneInserting ();
     void beginProcessing (int ignored);
@@ -35,7 +46,12 @@ class MeshStreamingStrategy : public CharmStrategy
     virtual void pup (PUP::er &p);
     PUPable_decl (MeshStreamingStrategy);
 
+    //Should be used only for array messages
+    virtual void enableShortArrayMessagePacking()
+    {shortMsgPackingFlag=CmiTrue;} 
+
   private:
+
     int num_pe;
     int num_columns;
     int num_rows;
@@ -53,6 +69,8 @@ class MeshStreamingStrategy : public CharmStrategy
     int column_handler_id;
 
     CkQ<char *> *column_bucket;
+    CkQ<int> *column_destQ;
+
     int *column_bytes;
     CkQ<char *> *row_bucket;
 };
index 8f326e515ae314385ce8f24e6476a265859b22c9..6866c97cfdf4f836641d5a4a3b04ef0239fc7795 100644 (file)
@@ -16,7 +16,8 @@ MsgPacker::MsgPacker(CkQ<CharmMessageHolder *> &msgq, int n_msgs){
 
     for(int count = 0; count < n_msgs; count ++){
         CharmMessageHolder *cmsg = msgq.deq();
-        envelope *env = (envelope *)UsrToEnv(cmsg->getCharmMessage());
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = (envelope *)UsrToEnv(msg);
         CkPackMessage(&env);
 
         if(count == 0) {
@@ -28,15 +29,42 @@ MsgPacker::MsgPacker(CkQ<CharmMessageHolder *> &msgq, int n_msgs){
         msgList[count].epIdx = env->getsetArrayEp();
         msgList[count].size = env->getTotalsize() - sizeof(envelope);
         msgList[count].idx = env->getsetArrayIndex();
-        msgList[count].data = cmsg->getCharmMessage();
-
-        if(msgList[count].size >= MAX_MESSAGE_SIZE-1)
-            CkAbort("Can't send messges larger than 64KB\n");
+        msgList[count].data = msg;
 
+        CkAssert(msgList[count].size < MAX_MESSAGE_SIZE);
         delete cmsg;
     }
 }
 
+//Takes a queue of envelopes as char* ptrs and not charm message holders
+//Used by mesh streaming strategy
+MsgPacker::MsgPacker(CkQ<char *> &msgq, int n_msgs){
+    
+    CkAssert(n_msgs < 65536);  //16 bit field for num messages
+    
+    nShortMsgs = n_msgs;
+    msgList = new short_envelope[n_msgs];    
+    
+    for(int count = 0; count < n_msgs; count ++){
+        envelope *env = (envelope *)msgq.deq();
+        char *msg = (char *)EnvToUsr(env);
+        CkPackMessage(&env);
+
+        if(count == 0) {
+            aid = env->getsetArrayMgr();
+            if(aid.isZero()) 
+                CkAbort("Array packing set and ArrayID is zero");
+        }        
+        
+        msgList[count].epIdx = env->getsetArrayEp();
+        msgList[count].size = env->getTotalsize() - sizeof(envelope);
+        msgList[count].idx = env->getsetArrayIndex();
+        msgList[count].data = msg;
+        
+        CkAssert(msgList[count].size < MAX_MESSAGE_SIZE);
+    }
+}
+
 MsgPacker::~MsgPacker(){
     if(nShortMsgs > 0 && msgList != NULL) {
         for(int count = 0; count < nShortMsgs; count ++)
index 749f2e882d465d1af7a6637c1e15979beec62dea..103ab7ca396245ac80b3909cf4b6558b21561d85 100644 (file)
@@ -37,11 +37,18 @@ inline short_envelope::~short_envelope(){
     */
 }
 
-inline void short_envelope::pup(PUP::er &p){
+inline void short_envelope::pup(PUP::er &p){    
+
     p | epIdx;
-    p | size;    
-    p | idx;
+    p | size;        
+    //p | idx;
     
+    if(p.isUnpacking())
+        idx.nInts = 0;
+
+    p((char *)&(idx.nInts), 1);
+    p((int *)(idx.data()), idx.nInts);
+
     p.pupCmiAllocBuf((void **)&data, size);
 }
 
@@ -63,8 +70,14 @@ class MsgPacker {
  public:
     MsgPacker();
     ~MsgPacker();    
+    
+    //Makes a message out of a queue of CharmMessageHolders
     MsgPacker(CkQ<CharmMessageHolder*> &cmsg_list, int n_msgs);
     
+    //Takes a queue of envelopes as char* ptrs and not charm message holders
+    //Used by mesh streaming strategy
+    MsgPacker::MsgPacker(CkQ<char *> &msgq, int n_msgs);
+    
     void getMessage(CombinedMessage *&msg, int &size);
     static void deliver(CombinedMessage *cmb_msg);
 };