bugfixes for pipelined broadcast in converse
authorFilippo Gioachin <gioachin@illinois.edu>
Tue, 3 Aug 2004 20:41:31 +0000 (20:41 +0000)
committerFilippo Gioachin <gioachin@illinois.edu>
Tue, 3 Aug 2004 20:41:31 +0000 (20:41 +0000)
src/conv-com/convcomlibstrategy.h
src/conv-com/pipebroadcastconverse.C
src/conv-com/pipebroadcastconverse.h

index be70223118a9b4e18d5b1821ddc6e2877a6ab596..d68cc420e016c6dc6967ce0830aaa02d3b34ee82 100644 (file)
@@ -51,6 +51,10 @@ class MessageHolder : public PUP::able {
         return data;
     }
 
+    inline int getSize() {
+      return size;
+    }
+
     virtual void pup(PUP::er &p);
     PUPable_decl(MessageHolder);
 };
index f7149300a8f09bc7044a284f1c3626e2ce75d95d..02bf72e3d623e0193a8b0d5766e52103fdfe24f6 100644 (file)
@@ -77,7 +77,8 @@ void PipeBroadcastConverse::propagate(char *env, int isFragmented, int srcPeNumb
     */
 
     //CmiSyncListSend(num_pes, dest_pes, env->getTotalsize(), (char *)env);
-    for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], totalSendingSize, env);
+    // !!!!!!!!for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], totalSendingSize, env);
+    for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], pipeSize, env);
     free(dest_pes);
     break;
 
@@ -85,8 +86,9 @@ void PipeBroadcastConverse::propagate(char *env, int isFragmented, int srcPeNumb
 
   default:
     // should NEVER reach here!
-    CmiPrintf("Error, topology %d not known\n",topology);
-    CkExit();
+    char *error_msg;
+    sprintf(error_msg, "Error, topology %d not known\n",topology);
+    CmiAbort(error_msg);
   }
 
   // deliver messages to local objects (i.e. send it to ComlibManager)
@@ -156,6 +158,7 @@ void PipeBroadcastConverse::storing(char* fragment, int isFragmented) {
 }
 
 void PipeBroadcastConverse::deliverer(char *msg, int dimension) {
+  ComlibPrintf("{%d} dest = %d, %d, %x\n",CmiMyPe(),destinationHandler, dimension,CmiHandlerToInfo(destinationHandler).hdlr);
   if (destinationHandler) {
     CmiSetHandler(msg, destinationHandler);
     CmiSyncSendAndFree(CmiMyPe(), dimension, msg);
@@ -165,8 +168,11 @@ void PipeBroadcastConverse::deliverer(char *msg, int dimension) {
 }
 
 PipeBroadcastConverse::PipeBroadcastConverse(int _topology, int _pipeSize, Strategy *parent) : Strategy(), topology(_topology), pipeSize(_pipeSize) {
-  higherLevel = parent;
+  if (parent) higherLevel = parent;
+  else higherLevel = this;
   seqNumber = 0;
+  messageBuf = new CkQ<MessageHolder *>;
+  propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
   ComlibPrintf("init: %d %d\n",topology, pipeSize);
 }
 
@@ -182,7 +188,9 @@ void PipeBroadcastConverse::doneInserting(){
     MessageHolder *cmsg = messageBuf->deq();
     // modify the Handler to deliver the message to the propagator
     char *env = cmsg->getMessage();
-
+    CmiSetHandler(env, propagateHandle_frag);
+    conversePipeBcast(env, cmsg->getSize());
+    delete cmsg;
     //conversePipeBcast(env, env->getTotalsize(), false);
   }
 }
@@ -203,9 +211,10 @@ void PipeBroadcastConverse::conversePipeBcast(char *env, int totalSize) {
   int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
   ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipeBcastInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipeBcastInfo));
   ComlibPrintf("sending %d chunks of size %d, total=%d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining);
+  CmiSetHandler(env, propagateHandle_frag);
+  ComlibPrintf("setting env handler to %d\n",propagateHandle_frag);
   for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
     sendingMsg = (char*)CmiAlloc(pipeSize);
-    CmiSetHandler(env, propagateHandle_frag);
     memcpy (sendingMsg, env, CmiReservedHeaderSize);
     PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
     info->srcPe = CmiMyPe();
@@ -221,6 +230,7 @@ void PipeBroadcastConverse::conversePipeBcast(char *env, int totalSize) {
 
     propagate(sendingMsg, true, CmiMyPe(), totalSize, NULL);
   }
+  CmiFree(env);
 }
 
 void PipeBroadcastConverse::pup(PUP::er &p){
@@ -236,7 +246,10 @@ void PipeBroadcastConverse::pup(PUP::er &p){
   if (p.isUnpacking()) {
     //log_of_2_inv = 1/log((double)2);
     messageBuf = new CkQ<MessageHolder *>;
-    propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
+    //propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
+  }
+  if (p.isPacking()) {
+    delete messageBuf;
   }
   //p|(*messageBuf);
   //p|fragments;
index 65f00cc92183756239b82d861ffb5da752bacaae..2f2037b783ce29564c859292963982f1c0622f4e 100644 (file)
@@ -75,7 +75,7 @@ class PipeBroadcastConverse : public Strategy {
   int propagateHandle_frag;
 
  public:
-  PipeBroadcastConverse(int, int, Strategy*);
+  PipeBroadcastConverse(int top=USE_HYPERCUBE, int size=DEFAULT_PIPE, Strategy* st=NULL);
   PipeBroadcastConverse(CkMigrateMessage *) {};
   int getPipeSize() { return pipeSize; };
   void commonInit();