- Moved 4 files (ComlibStrategy.*, ComlibArrayListener.*) from ck-core to the
[charm.git] / src / ck-com / PipeBroadcastStrategy.C
1 #include "PipeBroadcastStrategy.h"
2
3 void propagate_handler(void *message) {
4   int instid = CmiGetXHandler(message);
5   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse *)ConvComlibGetStrategy(instid);
6   ComlibPrintf("[%d] propagate_handler: calling on instid %d (%x)\n",CmiMyPe(),instid,myStrategy);
7   envelope *env = (envelope*)message;
8   myStrategy->propagate((char*)message, false, env->getSrcPe(), env->getTotalsize(), &envelope::setSrcPe);
9 }
10
11 void PipeBroadcastStrategy::deliverer(char *msg, int dim) {
12   envelope *env = (envelope*)msg;
13   ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
14
15   if (getType() == ARRAY_STRATEGY) {
16     // deliver the message to the predefined group "ainfo"
17     ComlibPrintf("[%d] deliverer: delivering a finished message\n",CkMyPe());
18     ainfo.localBroadcast(env);
19   }
20
21   if (getType() == GROUP_STRATEGY) {
22     // deliver the message to the predifined group "ginfo"
23     CkGroupID gid;
24     ginfo.getSourceGroup(gid);
25     CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), gid);
26   }
27 }
28
29 void PipeBroadcastStrategy::commonInit(int _topology, int _pipeSize) {
30   converseStrategy = new PipeBroadcastConverse(_topology, _pipeSize, this);
31 }
32
33 /*PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, int _pipeSize)
34   : CharmStrategy() {
35   //isArray = 0;
36   commonInit(_topology, _pipeSize);
37   }*/
38
39 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize)
40   : CharmStrategy() {
41   ComlibPrintf("Creating charm pipebcast (%x)\n",this);
42   setType(ARRAY_STRATEGY);
43   ainfo.setDestinationArray(_aid);
44   commonInit(_topology, _pipeSize);
45 }
46
47 PipeBroadcastStrategy::PipeBroadcastStrategy(CkGroupID _gid, int _topology, int _pipeSize)
48   : CharmStrategy() {
49   setType(GROUP_STRATEGY);
50   ginfo.setSourceGroup(_gid);
51   commonInit(_topology, _pipeSize);
52 }
53
54 void PipeBroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){
55   messageBuf->enq(cmsg);
56   doneInserting();
57 }
58
59 // routine for interfacing with converse.
60 // Require only the converse reserved header if forceSplit is true
61 void PipeBroadcastStrategy::conversePipeBcast(envelope *env, int totalSize) {
62   // set the instance ID to be used by the receiver using the XHandler variable
63   CmiSetXHandler(env, myInstanceID);
64   ComlibPrintf("[%d] PipeBroadcast charm, setting instid to %d\n",CkMyPe(),myInstanceID);
65
66   if (totalSize > ((PipeBroadcastConverse*)converseStrategy)->getPipeSize()) {
67     ((PipeBroadcastConverse*)converseStrategy)->conversePipeBcast((char*)env, totalSize);
68   } else {
69     // the message fit into the pipe, so send it in a single chunk
70     ComlibPrintf("[%d] Propagating message in one single chunk (%d)\n",CkMyPe(),propagateHandle);
71     CmiSetHandler(env, propagateHandle);
72     env->setSrcPe(CkMyPe());
73     ((PipeBroadcastConverse*)converseStrategy)->propagate((char*)env, false, CkMyPe(), totalSize, &envelope::setSrcPe);
74   }
75 }
76
77 void PipeBroadcastStrategy::doneInserting(){
78   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
79   while (!messageBuf->isEmpty()) {
80     CharmMessageHolder *cmsg = messageBuf->deq();
81     // modify the Handler to deliver the message to the propagator
82     envelope *env = UsrToEnv(cmsg->getCharmMessage());
83
84     conversePipeBcast(env, env->getTotalsize());
85   }
86 }
87
88 void PipeBroadcastStrategy::pup(PUP::er &p){
89   ComlibPrintf("[%d] PipeBroadcast pupping %s\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
90   CharmStrategy::pup(p);
91
92   if (p.isUnpacking()) {
93     converseStrategy = new PipeBroadcastConverse(0,0,this);
94   }
95   p | *converseStrategy;
96
97   if (p.isUnpacking()) {
98     propagateHandle = CmiRegisterHandler((CmiHandler)propagate_handler);
99     ComlibPrintf("[%d] registered handler single to %d\n",CmiMyPe(),propagateHandle);
100     messageBuf = new CkQ<CharmMessageHolder *>;
101     converseStrategy->setHigherLevel(this);
102   }
103 }
104
105 //PUPable_def(PipeBroadcastStrategy);