Checking in a new broadcast strategy which broadcasts to arrays along a load-balanced...
authorSameer Kumar <skumar2@uiuc.edu>
Thu, 17 Feb 2005 01:19:55 +0000 (01:19 +0000)
committerSameer Kumar <skumar2@uiuc.edu>
Thu, 17 Feb 2005 01:19:55 +0000 (01:19 +0000)
Streaming dosent have any change. A new scheme to packmessages is now checked in but it is commented out.

src/ck-com/BroadcastStrategy.C
src/ck-com/BroadcastStrategy.h
src/ck-com/StreamingStrategy.C
src/ck-com/StreamingStrategy.h

index 581eb3d8cb5899bc6a955f52d28168d65f840f61..9931d5b094cfbe95de1899ce04af3f98f86deb50 100644 (file)
@@ -1,22 +1,34 @@
+
 //Broadcast strategy for charm++ programs using the net version
-//This stategy will wonly work for groups.
 //This strategy implements a tree based broadcast
-//I will extent it for arrays later.
 //Developed by Sameer Kumar 04/10/04
 
+//Extend for array sections later
+
 #include "BroadcastStrategy.h"
+#include "ComlibManager.h"
 
 CkpvExtern(CkGroupID, cmgrID);
 extern int sfactor;
 
 static void recv_bcast_handler(void *msg) {
-    int instid = CmiGetXHandler(msg);
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
+    int instid = conv_header->stratid;
+
     BroadcastStrategy *bstrat = (BroadcastStrategy *)
         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
     
     bstrat->handleMessage((char *)msg);    
 }
 
+
+//Initialize the hypercube variables
+void BroadcastStrategy::initHypercube() {
+    logp = log((double) CkNumPes())/log(2.0);
+    logp = ceil(logp);
+}
+
+
 //Constructor, 
 //Can read spanning factor from command line
 BroadcastStrategy::BroadcastStrategy(int topology) : 
@@ -25,18 +37,44 @@ BroadcastStrategy::BroadcastStrategy(int topology) :
     if(sfactor > 0)
         spanning_factor = sfactor;
     
+    setType(GROUP_STRATEGY);
+    initHypercube();
+}
+
+//Array Constructor
+//Can read spanning factor from command line
+BroadcastStrategy::BroadcastStrategy(CkArrayID aid, int topology) : 
+    CharmStrategy(), _topology(topology) {
+        
+    setType(ARRAY_STRATEGY);
+    ainfo.setDestinationArray(aid);
+    
+    spanning_factor = DEFAULT_BROADCAST_SPANNING_FACTOR;
+    if(sfactor > 0)
+        spanning_factor = sfactor;    
+
+    initHypercube();
+    //if(topology == USE_HYPERCUBE)
+    //  CkPrintf("Warning: hypercube only works on powers of two PES\n");
 }
 
 
 //Receives the message and sends it along the spanning tree.
 void BroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){
-    CkPrintf("[%d] BROADCASTING\n", CkMyPe());
+    //CkPrintf("[%d] BROADCASTING\n", CkMyPe());
 
     char *msg = cmsg->getCharmMessage();
-    if(_topology == USE_HYPERCUBE) {
-        envelope *env = UsrToEnv(msg);
-        env->setSrcPe(0);    
-    }
+
+    envelope *env = UsrToEnv(msg);
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) env;
+
+    conv_header->root = 0;        //Use root later
+    if(_topology == USE_HYPERCUBE) 
+        conv_header->xhdl = 0;
+    else
+        //conv_header->root = CkMyPe();
+        conv_header->xhdl = CkMyPe();
+    
     handleMessage((char *)UsrToEnv(msg));
     
     delete cmsg;
@@ -63,13 +101,16 @@ void BroadcastStrategy::handleMessage(char *msg) {
 void BroadcastStrategy::handleTree(char *msg){
     
     envelope *env = (envelope *)msg;
-    int startpe = env->getSrcPe();
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
+
+    int startpe = conv_header->xhdl;
     int size = env->getTotalsize();
     
     CkAssert(startpe>=0 && startpe < CkNumPes());
     
     CmiSetHandler(msg, handlerId);
-    CmiSetXHandler(msg, getInstance());    
+    
+    conv_header->stratid = getInstance();
     
     //Sending along the spanning tree
     //Gengbins tree building code stolen from the MPI machine layer    
@@ -92,47 +133,68 @@ void BroadcastStrategy::handleTree(char *msg){
         CmiSyncSend(p, size, msg);
     }
 
-    CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
-                    env->getGroupNum());
+    if(getType() == GROUP_STRATEGY)
+        CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
+                        env->getGroupNum());
+    else if(getType() == ARRAY_STRATEGY)
+        ainfo.localBroadcast(env);        
 }
 
 
 void BroadcastStrategy::handleHypercube(char *msg){
     envelope *env = (envelope *)msg;
-    int curcycle = env->getSrcPe();
+
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
+    //int curcycle = conv_header->root;
+    int curcycle = conv_header->xhdl;
+
     int i;
     int size = env->getTotalsize();
-    
-    double logp = CkNumPes();
-    logp = log(logp)/log(2.0);
-    logp = ceil(logp);
-    
+        
     //CkPrintf("In hypercube %d, %d\n", (int)logp, curcycle); 
     
     /* assert(startpe>=0 && startpe<_Cmi_numpes); */
     CmiSetHandler(msg, handlerId);
-    CmiSetXHandler(msg, getInstance());    
+
+    conv_header->stratid = getInstance();
+
+    //Copied from system hypercube message passing
 
     for (i = logp - curcycle - 1; i >= 0; i--) {
         int p = CkMyPe() ^ (1 << i);
 
         int newcycle = ++curcycle;
         //CkPrintf("%d --> %d, %d\n", CkMyPe(), p, newcycle); 
-
-        env->setSrcPe(newcycle);
-        if(p < CkNumPes()) {
-            CmiSyncSendFn(p, size, msg);
-        }
+        
+        //conv_header->root = newcycle;
+        conv_header->xhdl = newcycle;
+
+        if(p >= CkNumPes()) {
+            p &= (-1) << i;
+            
+            //loadbalancing
+            if (p < CkNumPes())
+                p += (CkMyPe() - 
+                      (CkMyPe() & ((-1) << i))) % (CkNumPes() - p);
+        }     
+        
+        if(p < CkNumPes())
+            CmiSyncSendFn(p, size, msg);                    
     }
-
-    CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
-                    env->getGroupNum());
+    
+    if(getType() == GROUP_STRATEGY)
+        CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
+                        env->getGroupNum());
+    else if(getType() == ARRAY_STRATEGY)
+        ainfo.localBroadcast(env);        
 }
 
 
 //Pack the group id and the entry point of the user message
 void BroadcastStrategy::pup(PUP::er &p){
-    Strategy::pup(p);    
+    CharmStrategy::pup(p);    
+
     p | spanning_factor;
     p | _topology;
+    p | logp;
 }
index eeda41b1f0e92c9367fd6221c0f121f27f49fb5b..adb54b689604b909b59b94353a009c9e3f5e58d9 100644 (file)
 
 class BroadcastStrategy : public CharmStrategy {
 
-    int handlerId;
-    int spanning_factor;
-    int _topology;
+    int _topology;         //Topology to use Tree or Hypercube
+
+    int handlerId;          //broadcast handler id
+    int spanning_factor;    //the spanning factor of the tree
+
+    double logp;       //ceil of log of number of processors
+
+    void initHypercube();      
 
     void handleTree(char *msg);
-    void handleHypercube(char *msg);
+    void handleHypercube(char *msg);    
 
  public:
     BroadcastStrategy(int topology = USE_HYPERCUBE);
+    BroadcastStrategy(CkArrayID aid, int topology = USE_HYPERCUBE);
     BroadcastStrategy(CkMigrateMessage *){}
     void insertMessage(CharmMessageHolder *msg);
     void doneInserting();
index f2c8c1f9108b87399da01820934ad61c37aff369..c306ca63f59396d75937e6a74cdb4d3565a362c5 100644 (file)
@@ -1,6 +1,24 @@
 #include "StreamingStrategy.h"
 #include "MsgPacker.h"
 
+void StreamingHandlerFn(void *msg) {
+    CombinedMessage hdr;
+    
+    CkPrintf("In streaming handler fn\n");
+
+    PUP_fromCmiAllocMem fp(msg);
+    fp | hdr;
+    
+    for(int count = 0; count < hdr.nmsgs; count ++) {
+        char *msg;
+        fp.pupCmiAllocBuf((void **)&msg);
+        int size = SIZEFIELD(msg);
+        CmiSyncSendAndFree(CkMyPe(), size, msg);
+    }
+    CmiFree(msg);
+    return;
+}
+
 StreamingStrategy::StreamingStrategy(int periodMs,int bufferMax_)
     : PERIOD(periodMs), bufferMax(bufferMax_), CharmStrategy()
 {
@@ -8,6 +26,8 @@ StreamingStrategy::StreamingStrategy(int periodMs,int bufferMax_)
     streamingMsgCount=NULL;
     shortMsgPackingFlag = CmiFalse;
     idleFlush = CmiTrue;
+    streaming_handler_id = 0;
+    setType(ARRAY_STRATEGY);
 }
 
 void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
@@ -18,10 +38,10 @@ void StreamingStrategy::insertMessage(CharmMessageHolder *cmsg) {
     int size = env->getTotalsize();
 
     if(size > MAX_STREAMING_MESSAGE_SIZE) {//AVOID COPYING
-      ComlibPrintf("StreamingStrategy::insertMessage: direct send\n");
-      CmiSyncSendAndFree(pe, size, (char *)env);
-      delete cmsg;
-      return;
+        ComlibPrintf("StreamingStrategy::insertMessage: direct send\n");
+        CmiSyncSendAndFree(pe, size, (char *)env);
+        delete cmsg;
+        return;
     }
 
     ComlibPrintf("StreamingStrategy::insertMessage: buffering t=%d, n=%d, s=%d\n",  
@@ -39,57 +59,61 @@ void StreamingStrategy::doneInserting() {
 
 /// Send off all accumulated messages for this PE:
 void StreamingStrategy::flushPE(int pe) {
+
+  //CkPrintf("Checking %d\n", pe);
+
   if(streamingMsgCount[pe] == 0)
-    return; //Nothing to do.
+      return; //Nothing to do.
   
   CharmMessageHolder *cmsg, *toBeDeleted = NULL;
+  int size = 0;
   if(shortMsgPackingFlag){
-    MsgPacker mpack(streamingMsgBuf[pe], streamingMsgCount[pe]);
-    CombinedMessage *msg; 
-    int size;
-    mpack.getMessage(msg, size);
-    ComlibPrintf("[%d] StreamingStrategy::flushPE: packed %d short messages to %d\n", 
-                CkMyPe(), streamingMsgCount[pe], pe); 
-    CmiSyncSendAndFree(pe, size, (char *)msg);
-    streamingMsgCount[pe] = 0;
+      MsgPacker mpack(streamingMsgBuf[pe], streamingMsgCount[pe]);
+      CombinedMessage *msg; 
+      mpack.getMessage(msg, size);
+      ComlibPrintf("[%d] StreamingStrategy::flushPE: packed %d short messages to %d\n", 
+                   CkMyPe(), streamingMsgCount[pe], pe); 
+      CmiSyncSendAndFree(pe, size, (char *)msg);
+      streamingMsgCount[pe] = 0;
   }
   else {
     // Build a CmiMultipleSend list of messages to be sent off:
     int msg_count=streamingMsgCount[pe], msg_pe=0;
     if(msg_count == 1) {
-      cmsg = streamingMsgBuf[pe].deq();
-      char *msg = cmsg->getCharmMessage();
-      envelope *env = UsrToEnv(msg);
-      int size = env->getTotalsize();
-      CmiSyncSendAndFree(pe, size, (char *)env);
-      ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
-                  CkMyPe(), pe);            
-      delete cmsg;
-      streamingMsgCount[pe] = 0;
-      return;
+        cmsg = streamingMsgBuf[pe].deq();
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        int size = env->getTotalsize();
+        CmiSyncSendAndFree(pe, size, (char *)env);
+        ComlibPrintf("[%d] StreamingStrategy::flushPE: one message to %d\n", 
+                     CkMyPe(), pe);            
+        delete cmsg;
+        streamingMsgCount[pe] = 0;
+        return;
     }
+    
     char **msgComps = new char*[msg_count];
     int *sizes = new int[msg_count];
     ComlibPrintf("[%d] StreamingStrategy::flushPE: %d messages to %d\n", 
-                CkMyPe(), msg_count, pe);            
+                 CkMyPe(), msg_count, pe);            
     while (!streamingMsgBuf[pe].isEmpty()) {
-      cmsg = streamingMsgBuf[pe].deq();
-      char *msg = cmsg->getCharmMessage();
-      envelope *env = UsrToEnv(msg);
-      sizes[msg_pe] = env->getTotalsize();
-      msgComps[msg_pe] = (char *)env;
-      msg_pe++;
-      
-      // Link cmsg into the toBeDeleted list:
-      cmsg->next = toBeDeleted;
-      toBeDeleted = cmsg;            
+        cmsg = streamingMsgBuf[pe].deq();
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        sizes[msg_pe] = env->getTotalsize();
+        msgComps[msg_pe] = (char *)env;
+        msg_pe++;
+        
+        // Link cmsg into the toBeDeleted list:
+        cmsg->next = toBeDeleted;
+        toBeDeleted = cmsg;            
     }
     
     if (msg_count!=msg_pe) 
-      CkAbort("streamingMsgCount doesn't match streamingMsgBuf!\n");
-
+        CkAbort("streamingMsgCount doesn't match streamingMsgBuf!\n");
+    
     ComlibPrintf("--> Sending %d Messages to PE %d\n", msg_count, pe);
-
+    
     CmiMultipleSend(pe, msg_count, sizes, msgComps);
     delete [] msgComps;
     delete [] sizes;
@@ -103,38 +127,111 @@ void StreamingStrategy::flushPE(int pe) {
         delete cmsg;
         cmsg = toBeDeleted;            
     }     
+    
+    /*
+    PUP_cmiAllocSizer sp;
+    CombinedMessage hdr;
+    
+    sp | hdr;
+
+    int nmsgs = streamingMsgCount[pe];
+    for(int count = 0; count < nmsgs; count++) {
+        cmsg = streamingMsgBuf[pe][count];
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        size = env->getTotalsize();
+        
+        sp.pupCmiAllocBuf((void **)&env, size);
+    }
+    
+    char *newmsg = (char *)CmiAlloc(sp.size());
+    PUP_toCmiAllocMem mp(newmsg);
+    
+    hdr.aid.setZero();
+    hdr.srcPE = CkMyPe();
+    hdr.nmsgs = nmsgs;
+    mp | hdr;
+    
+    for(int count = 0; count < nmsgs; count++) {
+        cmsg = streamingMsgBuf[pe][count];
+        char *msg = cmsg->getCharmMessage();
+        envelope *env = UsrToEnv(msg);
+        size = env->getTotalsize();
+        
+        mp.pupCmiAllocBuf((void **)&env, size);
+    }
+
+    for(int count = 0; count < nmsgs; count++) {
+        cmsg = streamingMsgBuf[pe][count];
+        delete cmsg;
+    }    
+    
+    streamingMsgCount[pe] = 0;
+    CmiSetHandler(newmsg, streaming_handler_id);
+    CmiSyncSendAndFree(pe, sp.size(), newmsg); 
+    */
   }
 }
 
 void StreamingStrategy::periodicFlush() {
-  for (int pe=0; pe<CkNumPes(); pe++) flushPE(pe);
+    for (int proc = 0; proc < CkNumPes(); proc++) 
+        flushPE(proc);
+}
+
+struct MsgStruct {
+    char header[CmiReservedHeaderSize];
+    void *addr;
+};
+
+
+void testHandler(void *msg) {
+    StreamingStrategy *s;
+
+    MsgStruct *mstruct = (MsgStruct *)msg;
+
+    s = (StreamingStrategy *) (mstruct->addr);
+    s->periodicFlush();
+
+    CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
 }
 
 /// This routine is called via CcdCallFnAfter to flush all messages:
 static void call_delayFlush(void *arg,double curWallTime) {
-  StreamingStrategy *s=(StreamingStrategy *)arg;
-  s->periodicFlush();
-  s->registerFlush(); //Set ourselves up to be called again
+    StreamingStrategy *s=(StreamingStrategy *)arg;
+    s->periodicFlush();
+    s->registerFlush(); //Set ourselves up to be called again
 }
 
 void StreamingStrategy::registerFlush(void) {
-  // CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
-  CcdCallFnAfterOnPE((CcdVoidFn)call_delayFlush, (void *)this, PERIOD, CkMyPe());
+    //CkPrintf("[%d] Will call function again every %d ms\n",CkMyPe(),PERIOD);
+    CcdCallFnAfterOnPE(call_delayFlush, (void *)this, PERIOD, CkMyPe());
 }
 
 /// This routine is called via CcdCallOnCondition to flush all messages:
 static void call_idleFlush(void *arg,double curWallTime) {
-  StreamingStrategy *s=(StreamingStrategy *)arg;
-  s->periodicFlush();
+    StreamingStrategy *s=(StreamingStrategy *)arg;
+    s->periodicFlush();
 }
 
 // When we're finally ready to go, register for timeout and idle flush.
 void StreamingStrategy::beginProcessing(int ignored) {
-  registerFlush();
-  if(idleFlush)
-    CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
-                              (CcdVoidFn)call_idleFlush, 
-                              (void *)this, CkMyPe());
+    registerFlush();
+    //if(idleFlush)
+    //  CcdCallOnConditionKeepOnPE(CcdPROCESSOR_BEGIN_IDLE,
+    //                             (CcdVoidFn)call_idleFlush, 
+    //                             (void *)this, CkMyPe());
+    
+    streaming_handler_id = CkRegisterHandler(StreamingHandlerFn);
+    
+    /*
+      int handler = CkRegisterHandler(testHandler);
+      
+      MsgStruct *msg = (MsgStruct *)CmiAlloc(sizeof(MsgStruct));
+      msg->addr = this;
+      CmiSetHandler(msg, handler);
+      
+      CmiSyncSendAndFree(CkMyPe(), sizeof(MsgStruct), (char *)msg);
+    */
 }
 
 void StreamingStrategy::pup(PUP::er &p){
@@ -144,12 +241,13 @@ void StreamingStrategy::pup(PUP::er &p){
   p | bufferMax;
   p | shortMsgPackingFlag;
   p | idleFlush;
+  p | streaming_handler_id;
 
   if(p.isUnpacking()) {
-    streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
-    streamingMsgCount = new int[CkNumPes()];
-    for(int count = 0; count < CkNumPes(); count ++)
-      streamingMsgCount[count] = 0;
+      streamingMsgBuf = new CkQ<CharmMessageHolder *>[CkNumPes()];
+      streamingMsgCount = new int[CkNumPes()];
+      for(int count = 0; count < CkNumPes(); count ++)
+          streamingMsgCount[count] = 0;
   }
 }
 
index 8650457b8e957784f7ea54330d85278ccada7d61..15fd17b611416a3d6a0563608f5911f75674beba 100644 (file)
@@ -4,7 +4,6 @@
 
 #define MAX_STREAMING_MESSAGE_SIZE 2048*2
 
-
 class StreamingStrategy : public CharmStrategy {
  protected:
     CkQ<CharmMessageHolder *> *streamingMsgBuf;
@@ -12,6 +11,8 @@ class StreamingStrategy : public CharmStrategy {
     int PERIOD, bufferMax;
     CmiBool shortMsgPackingFlag, idleFlush;
 
+    int streaming_handler_id; //Getting rid of multiple send
+
     /// Flush all messages destined for this processor:
     void flushPE(int destPE);