fixing log problem, and little modification to the interface to allow
[charm.git] / src / conv-com / pipebroadcastconverse.C
index 01824f32eedcb88d7e2fe6c58001989cf7c33428..f7149300a8f09bc7044a284f1c3626e2ce75d95d 100644 (file)
@@ -1,5 +1,12 @@
+#include <math.h>
 #include "pipebroadcastconverse.h"
 
+inline int log_of_2 (int i) {
+  int m;
+  for (m=0; i>(1<<m); ++m);
+  return m;
+}
+
 //PipeBcastHashKey CODE
 int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
     return ((const PipeBcastHashKey *)k1)->
@@ -11,7 +18,7 @@ CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
 }
 
 void PipeBroadcastConverse::commonInit(){
-  log_of_2_inv = 1/log((double)2);
+  //log_of_2_inv = 1/log((double)2);
   seqNumber = 0;
 }
 
@@ -42,7 +49,7 @@ void PipeBroadcastConverse::propagate(char *env, int isFragmented, int srcPeNumb
     break;
   case USE_HYPERCUBE:
     tmp = srcPeNumber ^ CmiMyPe();
-    k = int(log((double)CmiNumPes()) * log_of_2_inv + 2);
+    k = log_of_2(CmiNumPes()) + 2;
     if (tmp) {
       do {--k;} while (!(tmp>>k));
     }
@@ -92,6 +99,7 @@ void PipeBroadcastConverse::propagate(char *env, int isFragmented, int srcPeNumb
 void PipeBroadcastConverse::storing(char* fragment, int isFragmented) {
   char *complete;
   int isFinished=0;
+  int totalDimension;
   //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
 
   // check if the message is fragmented
@@ -113,6 +121,7 @@ void PipeBroadcastConverse::storing(char* fragment, int isFragmented) {
       if (--position->remaining == 0) {  // message completely received
        isFinished = 1;
        complete = incomingMsg;
+       totalDimension = position->dimension;
        // delete from the hash table
        fragments.remove(key);
       }
@@ -142,12 +151,17 @@ void PipeBroadcastConverse::storing(char* fragment, int isFragmented) {
   }
 
   if (isFinished) {
-    higherLevel->deliverer(complete);
+    higherLevel->deliverer(complete, totalDimension);
   }
 }
 
-void PipeBroadcastConverse::deliverer(char *msg) {
-  // TO BE DONE
+void PipeBroadcastConverse::deliverer(char *msg, int dimension) {
+  if (destinationHandler) {
+    CmiSetHandler(msg, destinationHandler);
+    CmiSyncSendAndFree(CmiMyPe(), dimension, msg);
+  } else {
+    CmiPrintf("[%d] Pipelined Broadcast: message not delivered since destination not set!");
+  }
 }
 
 PipeBroadcastConverse::PipeBroadcastConverse(int _topology, int _pipeSize, Strategy *parent) : Strategy(), topology(_topology), pipeSize(_pipeSize) {
@@ -220,7 +234,7 @@ void PipeBroadcastConverse::pup(PUP::er &p){
   ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d, topology=%d\n",CmiMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize,topology);
 
   if (p.isUnpacking()) {
-    log_of_2_inv = 1/log((double)2);
+    //log_of_2_inv = 1/log((double)2);
     messageBuf = new CkQ<MessageHolder *>;
     propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
   }