doc: Add serial to list of ci file reserved words
[charm.git] / src / ck-com / BroadcastStrategy.C
index 63ea02fcafb7e65cc1002435beb7242f77205947..96435d92a9c956212884bb5cc211c8b4a95f8848 100644 (file)
@@ -1,58 +1,99 @@
-//Broadcast strategy for charm++ programs using the net version
-//This stategy will wonly work for groups.
-//This strategy implements a tree based broadcast
-//I will extent it for arrays later.
-//Developed by Sameer Kumar 04/10/04
+/**
+   @addtogroup ComlibCharmStrategy
+   @{
+   @file
+*/
 
 #include "BroadcastStrategy.h"
+//#include "ComlibManager.h"
 
 CkpvExtern(CkGroupID, cmgrID);
 extern int sfactor;
 
+/*
 static void recv_bcast_handler(void *msg) {
-    int instid = CmiGetXHandler(msg);
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
+    int instid = conv_header->stratid;
+
     BroadcastStrategy *bstrat = (BroadcastStrategy *)
         CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
     
     bstrat->handleMessage((char *)msg);    
 }
+*/
+
+//Initialize the hypercube variables
+void BroadcastStrategy::initHypercube() {
+    logp = log((double) CkNumPes())/log(2.0);
+    logp = ceil(logp);
+}
+
 
 //Constructor, 
 //Can read spanning factor from command line
 BroadcastStrategy::BroadcastStrategy(int topology) : 
-    CharmStrategy(), _topology(topology) {
+    Strategy(), CharmStrategy(), _topology(topology) {
     spanning_factor = DEFAULT_BROADCAST_SPANNING_FACTOR;
     if(sfactor > 0)
         spanning_factor = sfactor;
     
+    setType(GROUP_STRATEGY);
+    initHypercube();
+}
+
+//Array Constructor
+//Can read spanning factor from command line
+BroadcastStrategy::BroadcastStrategy(CkArrayID aid, int topology) : 
+    Strategy(), CharmStrategy(), _topology(topology) {
+        
+       CkAbort("BroadcastStrategy currently works only for groups");
+    setType(ARRAY_STRATEGY);
+    ainfo.setDestinationArray(aid);
+    
+    spanning_factor = DEFAULT_BROADCAST_SPANNING_FACTOR;
+    if(sfactor > 0)
+        spanning_factor = sfactor;    
+
+    initHypercube();
+    //if(topology == USE_HYPERCUBE)
+    //  CkPrintf("Warning: hypercube only works on powers of two PES\n");
 }
 
 
 //Receives the message and sends it along the spanning tree.
 void BroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){
-    CkPrintf("[%d] BRAODCASTING\n", CkMyPe());
+    //CkPrintf("[%d] BROADCASTING\n", CkMyPe());
 
     char *msg = cmsg->getCharmMessage();
-    if(_topology == USE_HYPERCUBE) {
-        envelope *env = UsrToEnv(msg);
-        env->setSrcPe(0);    
-    }
-    handleMessage((char *)UsrToEnv(msg));
+
+    envelope *env = UsrToEnv(msg);
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) env;
+
+    conv_header->root = 0;        //Use root later
+    if(_topology == USE_HYPERCUBE) 
+        conv_header->xhdl = 0;
+    else
+        //conv_header->root = CkMyPe();
+        conv_header->xhdl = CkMyPe();
+    
+    CkPackMessage(&env);
+    handleMessage((char *) env);
     
     delete cmsg;
 }
 
 //not implemented here because no bracketing is required for this strategy
-void BroadcastStrategy::doneInserting(){
-}
-
+//void BroadcastStrategy::doneInserting(){
+//}
 
+/*
 //register the converse handler to recieve the broadcast message
 void BroadcastStrategy::beginProcessing(int nelements) {
     handlerId = CkRegisterHandler((CmiHandler)recv_bcast_handler);
 }
+*/
 
-void BroadcastStrategy::handleMessage(char *msg) {
+void BroadcastStrategy::handleMessage(void *msg) {
     if(_topology == USE_TREE)
         handleTree(msg);
     else if(_topology == USE_HYPERCUBE) 
@@ -60,16 +101,19 @@ void BroadcastStrategy::handleMessage(char *msg) {
     else CkAbort("Unknown Topology");
 }
 
-void BroadcastStrategy::handleTree(char *msg){
+void BroadcastStrategy::handleTree(void *msg){
     
     envelope *env = (envelope *)msg;
-    int startpe = env->getSrcPe();
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
+
+    int startpe = conv_header->xhdl;
     int size = env->getTotalsize();
     
     CkAssert(startpe>=0 && startpe < CkNumPes());
     
-    CmiSetHandler(msg, handlerId);
-    CmiSetXHandler(msg, getInstance());    
+    CmiSetHandler(msg, CkpvAccess(comlib_handler));
+    
+    conv_header->stratid = getInstance();
     
     //Sending along the spanning tree
     //Gengbins tree building code stolen from the MPI machine layer    
@@ -89,50 +133,78 @@ void BroadcastStrategy::handleTree(char *msg){
 
         CkAssert(p>=0 && p < CkNumPes() && p != CkMyPe());
 
-        CmiSyncSend(p, size, msg);
+        CmiSyncSend(p, size, (char*)msg);
     }
 
-    CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
-                    env->getGroupNum());
+    if(getType() == GROUP_STRATEGY) {
+      ComlibPrintf("BroadcastStrategy: delivering message\n");
+        CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
+                        env->getGroupNum());
+    }
+    else if(getType() == ARRAY_STRATEGY)
+        ainfo.localBroadcast(env);        
 }
 
 
-void BroadcastStrategy::handleHypercube(char *msg){
+void BroadcastStrategy::handleHypercube(void *msg){
     envelope *env = (envelope *)msg;
-    int curcycle = env->getSrcPe();
+
+    CmiMsgHeaderExt *conv_header = (CmiMsgHeaderExt *) msg;
+    //int curcycle = conv_header->root;
+    int curcycle = conv_header->xhdl;
+
     int i;
     int size = env->getTotalsize();
-    
-    double logp = CmiNumPes();
-    logp = log(logp)/log(2.0);
-    logp = ceil(logp);
-    
+        
     //CkPrintf("In hypercube %d, %d\n", (int)logp, curcycle); 
     
     /* assert(startpe>=0 && startpe<_Cmi_numpes); */
-    CmiSetHandler(msg, handlerId);
-    CmiSetXHandler(msg, getInstance());    
+    CmiSetHandler(msg, CkpvAccess(comlib_handler));
+
+    conv_header->stratid = getInstance();
+
+    //Copied from system hypercube message passing
 
     for (i = logp - curcycle - 1; i >= 0; i--) {
         int p = CkMyPe() ^ (1 << i);
 
         int newcycle = ++curcycle;
         //CkPrintf("%d --> %d, %d\n", CkMyPe(), p, newcycle); 
-
-        env->setSrcPe(newcycle);
-        if(p < CmiNumPes()) {
-            CmiSyncSendFn(p, size, msg);
-        }
+        
+        //conv_header->root = newcycle;
+        conv_header->xhdl = newcycle;
+
+        if(p >= CkNumPes()) {
+            p &= (-1) << i;
+            
+            //loadbalancing
+            if (p < CkNumPes())
+                p += (CkMyPe() - 
+                      (CkMyPe() & ((-1) << i))) % (CkNumPes() - p);
+        }     
+        
+        if(p < CkNumPes())
+            CmiSyncSendFn(p, size, (char*)msg);                    
     }
-
-    CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
-                    env->getGroupNum());
+    
+    if(getType() == GROUP_STRATEGY) {
+      ComlibPrintf("BroadcastStrategy: delivering message\n");
+        CkSendMsgBranch(env->getEpIdx(), EnvToUsr(env), CkMyPe(), 
+                        env->getGroupNum());
+    }
+    else if(getType() == ARRAY_STRATEGY)
+        ainfo.localBroadcast(env);        
 }
 
 
 //Pack the group id and the entry point of the user message
 void BroadcastStrategy::pup(PUP::er &p){
-    Strategy::pup(p);    
+    Strategy::pup(p);
+    CharmStrategy::pup(p);    
+
     p | spanning_factor;
     p | _topology;
+    p | logp;
 }
+
+/*@}*/