update PipeBroadcastStrategy for the new comlib interface
authorFilippo Gioachin <gioachin@illinois.edu>
Wed, 30 Jun 2004 23:51:54 +0000 (23:51 +0000)
committerFilippo Gioachin <gioachin@illinois.edu>
Wed, 30 Jun 2004 23:51:54 +0000 (23:51 +0000)
src/ck-com/PipeBroadcastStrategy.C
src/ck-com/PipeBroadcastStrategy.h

index 943a953e43ebb4d5594a0c942a5fc4773f5ad234..7f0b007e75d19adcc6d202be279bad30f39637cf 100644 (file)
 #include "PipeBroadcastStrategy.h"
 
-//PipeBcastHashKey CODE
-int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
-    return ((const PipeBcastHashKey *)k1)->
-                compare(*(const PipeBcastHashKey *)k2);
-}
-
-CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
-    return ((const PipeBcastHashKey *)v)->hash();
-}
-
-CkpvExtern(CkGroupID, cmgrID);
-
 void propagate_handler(void *message) {
-  // call the appropriate function PipeBroadcastStrategy::propagate
-  //int instid = ((envelope *)message)->getEpIdx();
-  //int instid = ((CkMcastBaseMsg*)(EnvToUsr((envelope*)message)))->_cookie.sInfo.cInfo.instId;
   int instid = CmiGetXHandler(message);
-  PipeBroadcastStrategy *myStrategy = (PipeBroadcastStrategy *)CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
-  myStrategy->propagate((envelope *)message, false);
-}
-
-void propagate_handler_frag(void *message) {
-  int instid = CmiGetXHandler(message);
-  PipeBroadcastStrategy *myStrategy = (PipeBroadcastStrategy *)CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
-  myStrategy->propagate((envelope *)message, true);
-}
-
-
-void PipeBroadcastStrategy::propagate(envelope *env, int isFragmented){
-  // find destination processors and send
-  int destination, tmp, k;
-  int num_pes, *dest_pes;
-  PipeBcastInfo *info = (PipeBcastInfo*)(((char*)env)+CmiReservedHeaderSize);
-  int srcPeNumber = isFragmented ? info->srcPe : env->getSrcPe();
-  int totalSendingSize = isFragmented ? info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo) : env->getTotalsize();
-
-  switch (topology) {
-  case USE_LINEAR:
-    if (srcPeNumber == (CkMyPe()+1)%CkNumPes()) break;
-    destination = (CkMyPe()+1) % CkNumPes();
-    ComlibPrintf("[%d] Pipebroadcast sending to %d\n",CkMyPe(), destination);
-    CmiSyncSend(destination, totalSendingSize, (char *)env);
-    break;
-  case USE_HYPERCUBE:
-    tmp = srcPeNumber ^ CkMyPe();
-    k = int(log((double)CkNumPes()) * log_of_2_inv + 2);
-    if (tmp) {
-      do {--k;} while (!(tmp>>k));
-    }
-    ComlibPrintf("[%d] tmp=%d, k=%d\n",CkMyPe(),tmp,k);
-    // now 'k' is the last dimension in the hypercube used for exchange
-    if (isFragmented) info->srcPe = CkMyPe();
-    else env->setSrcPe(CkMyPe());  // where the message is coming from
-    dest_pes = (int *)malloc(k*sizeof(int));
-    --k;  // next dimension in the cube to be used
-    num_pes = HypercubeGetBcastDestinations(k, dest_pes);
-
-    /*
-    for ( ; k>=0; --k) {
-      // add the processor destination at level k if it exist
-      dest_pes[num_pes] = CkMyPe() ^ (1<<k);
-      if (dest_pes[num_pes] >= CkNumPes()) {
-       dest_pes[num_pes] &= (-1)<<k;
-       if (CkNumPes()>dest_pes[num_pes]) dest_pes[num_pes] += (CkMyPe() - (CkMyPe() & ((-1)<<k))) % (CkNumPes() - dest_pes[num_pes]);
-      }
-      if (dest_pes[num_pes] < CkNumPes()) {
-       ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CkMyPe(), dest_pes[num_pes]);
-       ++num_pes;
-      }
-    }
-    */
-
-    //CmiSyncListSend(num_pes, dest_pes, env->getTotalsize(), (char *)env);
-    for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], totalSendingSize, (char *)env);
-    free(dest_pes);
-    break;
-
-    // for other strategies
-
-  default:
-    // should NEVER reach here!
-    CmiPrintf("Error, topology %d not known\n",topology);
-    CkExit();
-  }
-
-  // deliver messages to local objects (i.e. send it to ComlibManager)
-  deliverer(env, isFragmented);
-  //CmiSetHandler(env, CmiGetXHandler(env));
-  //CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
-
+  PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse *)ConvComlibGetStrategy(instid);
+  envelope *env = (envelope*)message;
+  myStrategy->propagate((char*)message, false, env->getSrcPe(), env->getTotalsize(), envelope::setSrcPe);
 }
 
-void PipeBroadcastStrategy::deliverer(envelope *env_frag, int isFragmented) {
-  envelope *env;
-  int isFinished=0;
+void PipeBroadcastStrategy::deliverer(char *msg) {
+  envelope *env = (envelope*)msg;
   ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
 
-  // check if the message is fragmented
-  if (isFragmented) {
-    // store the fragment in the hash table until completed
-    ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
-    PipeBcastInfo *info = (PipeBcastInfo*)(((char*)env_frag)+CmiReservedHeaderSize);
-
-    PipeBcastHashKey key (info->bcastPe, info->seqNumber);
-    PipeBcastHashObj *position = fragments.get(key);
-
-    char *incomingMsg;
-    if (position) {
-      // the message already exist, add to it
-      ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
-      incomingMsg = position->message;
-      memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), ((char*)env_frag)+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
-
-      if (--position->remaining == 0) {  // message completely received
-       isFinished = 1;
-       env = (envelope*)incomingMsg;
-       // delete from the hash table
-       fragments.remove(key);
-      }
-
-    } else {
-      // the message doesn't exist, create it
-      ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
-      incomingMsg = (char*)CmiAlloc(info->messageSize);
-      memcpy (incomingMsg, env_frag, CmiReservedHeaderSize);
-      memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), ((char*)env_frag)+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
-      int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo)))-1;
-      if (remaining) {  // more than one chunk (it was not forced to be splitted)
-       PipeBcastHashObj *object = new PipeBcastHashObj(info->messageSize, remaining, incomingMsg);
-       fragments.put(key) = object;
-      } else {  // only one chunk, it was forces to be splitted
-       isFinished = 1;
-       env = (envelope*)incomingMsg;
-       // nothing to delete from fragments since nothing has been added
-      }
-    }
-    CmiFree(env_frag);
-
-  } else {  // message not fragmented
-    ComlibPrintf("[%d] deliverer: received message in single chunk\n",CkMyPe());
-    isFinished = 1;
-    env = env_frag;
+  if (getType() == ARRAY_STRATEGY) {
+    // deliver the message to the predefined group "ainfo"
+    ComlibPrintf("[%d] deliverer: delivering a finished message\n",CkMyPe());
+    ainfo.localBroadcast(env);
   }
 
-  if (isFinished) {
-    if (getType() == ARRAY_STRATEGY) {
-      CkArray *dest_array = CkArrayID::CkLocalBranch(aid);
-      localDest = new CkVec<CkArrayIndexMax>;
-      dest_array->getComlibArrayListener()->getLocalIndices(*localDest);
-      void *msg = EnvToUsr(env);
-      CkArrayIndexMax idx;
-      ArrayElement *elem;
-      int ep = env->getsetArrayEp();
-      CkUnpackMessage(&env);
-
-      ComlibPrintf("[%d] deliverer: delivering a finished message\n",CkMyPe());
-      for (int count = 0; count < localDest->size(); ++count) {
-       idx = (*localDest)[count];
-       ComlibPrintf("[%d] Sending message to ",CkMyPe());
-       if (comm_debug) idx.print();
-
-       CProxyElement_ArrayBase ap(aid, idx);
-       elem = ap.ckLocal();
-       CkDeliverMessageReadonly (ep, msg, elem);
-      }
-      delete localDest;
-      // the envelope env should be deleted only if the message is delivered
-      CmiFree(env);
-    }
-
-    if (getType() == GROUP_STRATEGY) {
-      // deliver the message to the predifined group "gid"
-      CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), gid);
-    }
+  if (getType() == GROUP_STRATEGY) {
+    // deliver the message to the predifined group "ginfo"
+    CkGroupID gid;
+    ginfo.getSourceGroup(gid);
+    CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), gid);
   }
 }
 
-PipeBroadcastStrategy::PipeBroadcastStrategy()
-  :topology(USE_HYPERCUBE), pipeSize(DEFAULT_PIPE), CharmStrategy() {
-
-    //isArray = 0;
-    commonInit();
-}
-
-PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology)
-  :topology(_topology), pipeSize(DEFAULT_PIPE), CharmStrategy() {
-    //isArray = 0;
-  commonInit();
+void PipeBroadcastStrategy::commonInit(int _topology, int _pipeSize) {
+  converseStrategy = new PipeBroadcastConverse(_topology, _pipeSize, this);
 }
 
 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, int _pipeSize)
-  :topology(_topology), pipeSize(_pipeSize), CharmStrategy() {
-    //isArray = 0;
-  commonInit();
-}
-
-PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid)
-  :topology(_topology), pipeSize(DEFAULT_PIPE), CharmStrategy() {
-    //isArray = 1;
-  setType(ARRAY_STRATEGY);
-  aid = _aid;
-  CmiPrintf("init: %d %d\n",topology, pipeSize);
-  commonInit();
+  : CharmStrategy() {
+  //isArray = 0;
+  commonInit(_topology, _pipeSize);
 }
 
 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize)
-  :topology(_topology), pipeSize(_pipeSize), CharmStrategy() {
-    setType(ARRAY_STRATEGY);
-  aid = _aid;
-  commonInit();
+  : CharmStrategy() {
+  setType(ARRAY_STRATEGY);
+  ainfo.setDestinationArray(_aid);
+  commonInit(_topology, _pipeSize);
 }
 
 PipeBroadcastStrategy::PipeBroadcastStrategy(CkGroupID _gid, int _topology, int _pipeSize)
-  :topology(_topology), pipeSize(_pipeSize), CharmStrategy() {
-    setType(GROUP_STRATEGY);
-  gid = _gid;
-  commonInit();
-}
-
-void PipeBroadcastStrategy::commonInit(){
-  log_of_2_inv = 1/log((double)2);
-  seqNumber = 0;
+  : CharmStrategy() {
+  setType(GROUP_STRATEGY);
+  ginfo.setSourceGroup(_gid);
+  commonInit(_topology, _pipeSize);
 }
 
 void PipeBroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){
-  ComlibPrintf("[%d] Pipelined Broadcast with strategy %d\n",CkMyPe(),topology);
   messageBuf->enq(cmsg);
   doneInserting();
 }
 
 // routine for interfacing with converse.
 // Require only the converse reserved header if forceSplit is true
-void PipeBroadcastStrategy::conversePipeBcast(envelope *env, int totalSize, int forceSplit) {
+void PipeBroadcastStrategy::conversePipeBcast(envelope *env, int totalSize) {
   // set the instance ID to be used by the receiver using the XHandler variable
   CmiSetXHandler(env, myInstanceID);
 
-  if (totalSize > pipeSize || forceSplit) {
-    ++seqNumber;
-    // message doesn't fit into the pipe: split it into chunks and propagate them individually
-    ComlibPrintf("[%d] Propagating message in multiple chunks\n",CkMyPe());
-
-    char *sendingMsg;
-    char *nextChunk = ((char*)env)+CmiReservedHeaderSize;
-    int remaining = totalSize-CmiReservedHeaderSize;
-    int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
-    ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipeBcastInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipeBcastInfo));
-    ComlibPrintf("sending %d chunks of size %d, total=%d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining);
-    for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
-      sendingMsg = (char*)CmiAlloc(pipeSize);
-      CmiSetHandler(env, propagateHandle_frag);
-      memcpy (sendingMsg, env, CmiReservedHeaderSize);
-      PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
-      info->srcPe = CkMyPe();
-      info->bcastPe = CkMyPe();
-      info->seqNumber = seqNumber;
-      info->chunkNumber = i;
-      info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
-      info->messageSize = totalSize;
-      memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, reducedPipe);
-
-      remaining -= reducedPipe;
-      nextChunk += reducedPipe;
-
-      propagate((envelope*)sendingMsg, true);
-    }
-
+  if (totalSize > ((PipeBroadcastConverse*)converseStrategy)->getPipeSize()) {
+    ((PipeBroadcastConverse*)converseStrategy)->conversePipeBcast((char*)env, totalSize);
   } else {
     // the message fit into the pipe, so send it in a single chunk
     ComlibPrintf("[%d] Propagating message in one single chunk\n",CkMyPe());
     CmiSetHandler(env, propagateHandle);
     env->setSrcPe(CkMyPe());
-    //env->setEpIdx(myInstanceID);
-    propagate(env, false);
+    ((PipeBroadcastConverse*)converseStrategy)->propagate((char*)env, false, CkMyPe(), totalSize, envelope::setSrcPe);
   }
 }
 
@@ -283,7 +78,7 @@ void PipeBroadcastStrategy::doneInserting(){
     // modify the Handler to deliver the message to the propagator
     envelope *env = UsrToEnv(cmsg->getCharmMessage());
 
-    conversePipeBcast(env, env->getTotalsize(), false);
+    conversePipeBcast(env, env->getTotalsize());
   }
 }
 
@@ -291,21 +86,16 @@ void PipeBroadcastStrategy::pup(PUP::er &p){
   ComlibPrintf("[%d] PipeBroadcast pupping %s\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
   CharmStrategy::pup(p);
 
-  p | aid;
-  p | gid;
-  p | pipeSize;
-  p | topology;
-  p | seqNumber;
+  if (p.isUnpacking()) {
+    converseStrategy = new PipeBroadcastConverse(0,0,this);
+  }
+  p | *converseStrategy;
 
   if (p.isUnpacking()) {
-    log_of_2_inv = 1/log((double)2);
+    propagateHandle = CmiRegisterHandler((CmiHandler)propagate_handler);
     messageBuf = new CkQ<CharmMessageHolder *>;
-    propagateHandle = CkRegisterHandler((CmiHandler)propagate_handler);
-    propagateHandle_frag = CkRegisterHandler((CmiHandler)propagate_handler_frag);
+    converseStrategy->setHigherLevel(this);
   }
-  //p|(*messageBuf);
-  //p|fragments;
-
 }
 
 //PUPable_def(PipeBroadcastStrategy);
index af1d0d20fc8223593a0de3a4cae9517368677247..b13a8f6f315fcb2e14e57f28e885325ed34ca5a0 100644 (file)
@@ -1,92 +1,27 @@
 #ifndef PIPE_BROADCAST_STRATEGY
 #define PIPE_BROADCAST_STRATEGY
 #include "ComlibManager.h"
-
-#define DEFAULT_PIPE   8196
-
-struct PipeBcastInfo {
-  short bcastPe;     // pe who is doing the broadcast, used for the hash key
-  short seqNumber;
-  int chunkSize;   // it is the size of the data part of the message (without the converse header)
-  int chunkNumber;
-  int messageSize;   // the entire message size, all included
-  short srcPe;       // pe from whom I'm receiving the message
-};
-
-class PipeBcastHashKey{
- public:
-
-    int srcPe;
-    int seq;
-    PipeBcastHashKey(int _pe, int _seq):srcPe(_pe), seq(_seq){};
-
-    //These routines allow PipeBcastHashKey to be used in
-    //  a CkHashtableT
-    CkHashCode hash(void) const;
-    static CkHashCode staticHash(const void *a,size_t);
-    int compare(const PipeBcastHashKey &ind) const;
-    static int staticCompare(const void *a,const void *b,size_t);
-};
-
-// sequential numbers must be below 2^16, so the number of processors must
-inline CkHashCode PipeBcastHashKey::hash(void) const
-{
-    register int _seq = seq;
-    register int _pe = srcPe;
-    
-    register CkHashCode ret = (_seq << 16) + _pe;
-    return ret;
-}
-
-inline int PipeBcastHashKey::compare(const PipeBcastHashKey &k2) const
-{
-    if(seq == k2.seq && srcPe == k2.srcPe)
-        return 1;
-    
-    return 0;
-}
-
-class PipeBcastHashObj{
- public:
-  char *message;
-  int dimension;
-  int remaining;
-  PipeBcastHashObj (int dim, int rem, char *msg) :dimension(dim),remaining(rem),message(msg) {};
-
-};
+#include "pipebroadcaststrategy.h"
 
 class PipeBroadcastStrategy : public CharmStrategy {
  protected:
 
-  int pipeSize; // this is the size of the splitted messages, including the converse header
-  int topology;
-  double log_of_2_inv;
-  int seqNumber;
-  CkQ <CharmMessageHolder*> *messageBuf;
-  CkHashtableT<PipeBcastHashKey, PipeBcastHashObj *> fragments;
   int propagateHandle;
-  int propagateHandle_frag;
+  CkQ <CharmMessageHolder*> *messageBuf;
   CkVec<CkArrayIndexMax> *localDest;
 
-  CkArrayID aid;
-  CkGroupID gid;
-
-  void commonInit();
-  void deliverer(envelope *env, int isFrag);
+  void commonInit(int _top, int _pipe);
 
  public:
-  PipeBroadcastStrategy();
-  PipeBroadcastStrategy(int _topology);
-  PipeBroadcastStrategy(int _topology, int _pipeSize);
-  PipeBroadcastStrategy(int _topology, CkArrayID _aid);
-  PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize);
+  PipeBroadcastStrategy(int _topology=USE_HYPERCUBE, int _pipeSize=DEFAULT_PIPE);
+  PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize=DEFAULT_PIPE);
   PipeBroadcastStrategy(CkGroupID _gid, int _topology=USE_HYPERCUBE, int _pipeSize=DEFAULT_PIPE);
   PipeBroadcastStrategy(CkMigrateMessage *){}
   void insertMessage(CharmMessageHolder *msg);
   void doneInserting();
-  void conversePipeBcast(envelope *env, int size, int forceSplit);
+  void conversePipeBcast(envelope *env, int size);
 
-  void propagate(envelope *env, int isFrag);
+  void deliverer(char *msg);
 
   virtual void pup(PUP::er &p);
   PUPable_decl(PipeBroadcastStrategy);