update PipeBroadcastStrategy for the new comlib interface
[charm.git] / src / ck-com / PipeBroadcastStrategy.C
1 #include "PipeBroadcastStrategy.h"
2
3 void propagate_handler(void *message) {
4   int instid = CmiGetXHandler(message);
5   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse *)ConvComlibGetStrategy(instid);
6   envelope *env = (envelope*)message;
7   myStrategy->propagate((char*)message, false, env->getSrcPe(), env->getTotalsize(), envelope::setSrcPe);
8 }
9
10 void PipeBroadcastStrategy::deliverer(char *msg) {
11   envelope *env = (envelope*)msg;
12   ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
13
14   if (getType() == ARRAY_STRATEGY) {
15     // deliver the message to the predefined group "ainfo"
16     ComlibPrintf("[%d] deliverer: delivering a finished message\n",CkMyPe());
17     ainfo.localBroadcast(env);
18   }
19
20   if (getType() == GROUP_STRATEGY) {
21     // deliver the message to the predifined group "ginfo"
22     CkGroupID gid;
23     ginfo.getSourceGroup(gid);
24     CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), gid);
25   }
26 }
27
28 void PipeBroadcastStrategy::commonInit(int _topology, int _pipeSize) {
29   converseStrategy = new PipeBroadcastConverse(_topology, _pipeSize, this);
30 }
31
32 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, int _pipeSize)
33   : CharmStrategy() {
34   //isArray = 0;
35   commonInit(_topology, _pipeSize);
36 }
37
38 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize)
39   : CharmStrategy() {
40   setType(ARRAY_STRATEGY);
41   ainfo.setDestinationArray(_aid);
42   commonInit(_topology, _pipeSize);
43 }
44
45 PipeBroadcastStrategy::PipeBroadcastStrategy(CkGroupID _gid, int _topology, int _pipeSize)
46   : CharmStrategy() {
47   setType(GROUP_STRATEGY);
48   ginfo.setSourceGroup(_gid);
49   commonInit(_topology, _pipeSize);
50 }
51
52 void PipeBroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){
53   messageBuf->enq(cmsg);
54   doneInserting();
55 }
56
57 // routine for interfacing with converse.
58 // Require only the converse reserved header if forceSplit is true
59 void PipeBroadcastStrategy::conversePipeBcast(envelope *env, int totalSize) {
60   // set the instance ID to be used by the receiver using the XHandler variable
61   CmiSetXHandler(env, myInstanceID);
62
63   if (totalSize > ((PipeBroadcastConverse*)converseStrategy)->getPipeSize()) {
64     ((PipeBroadcastConverse*)converseStrategy)->conversePipeBcast((char*)env, totalSize);
65   } else {
66     // the message fit into the pipe, so send it in a single chunk
67     ComlibPrintf("[%d] Propagating message in one single chunk\n",CkMyPe());
68     CmiSetHandler(env, propagateHandle);
69     env->setSrcPe(CkMyPe());
70     ((PipeBroadcastConverse*)converseStrategy)->propagate((char*)env, false, CkMyPe(), totalSize, envelope::setSrcPe);
71   }
72 }
73
74 void PipeBroadcastStrategy::doneInserting(){
75   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
76   while (!messageBuf->isEmpty()) {
77     CharmMessageHolder *cmsg = messageBuf->deq();
78     // modify the Handler to deliver the message to the propagator
79     envelope *env = UsrToEnv(cmsg->getCharmMessage());
80
81     conversePipeBcast(env, env->getTotalsize());
82   }
83 }
84
85 void PipeBroadcastStrategy::pup(PUP::er &p){
86   ComlibPrintf("[%d] PipeBroadcast pupping %s\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
87   CharmStrategy::pup(p);
88
89   if (p.isUnpacking()) {
90     converseStrategy = new PipeBroadcastConverse(0,0,this);
91   }
92   p | *converseStrategy;
93
94   if (p.isUnpacking()) {
95     propagateHandle = CmiRegisterHandler((CmiHandler)propagate_handler);
96     messageBuf = new CkQ<CharmMessageHolder *>;
97     converseStrategy->setHigherLevel(this);
98   }
99 }
100
101 //PUPable_def(PipeBroadcastStrategy);