use CkRegisterHandler instead of CmiRegisterHandler
[charm.git] / src / ck-com / PipeBroadcastStrategy.C
1 #include "PipeBroadcastStrategy.h"
2
3 void propagate_handler(void *message) {
4   int instid = CmiGetXHandler(message);
5   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse *)ConvComlibGetStrategy(instid);
6   ComlibPrintf("[%d] propagate_handler: calling on instid %d (%x)\n",CmiMyPe(),instid,myStrategy);
7   envelope *env = (envelope*)message;
8   myStrategy->propagate((char*)message, false, env->getSrcPe(), env->getTotalsize(), &envelope::setSrcPe);
9 }
10
11 void PipeBroadcastStrategy::deliverer(char *msg, int dim) {
12   envelope *env = (envelope*)msg;
13   ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
14
15   if (getType() == ARRAY_STRATEGY) {
16     // deliver the message to the predefined group "ainfo"
17     ComlibPrintf("[%d] deliverer: delivering a finished message\n",CkMyPe());
18     ainfo.localBroadcast(env);
19   }
20
21   if (getType() == GROUP_STRATEGY) {
22     // deliver the message to the predifined group "ginfo"
23     CkGroupID gid;
24     ginfo.getSourceGroup(gid);
25     CkSendMsgBranchInline(env->getEpIdx(), EnvToUsr(env), CkMyPe(), gid);
26   }
27 }
28
29 void PipeBroadcastStrategy::commonInit(int _topology, int _pipeSize) {
30   converseStrategy = new PipeBroadcastConverse(_topology, _pipeSize, this);
31 }
32
33 /*PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, int _pipeSize)
34   : CharmStrategy() {
35   //isArray = 0;
36   commonInit(_topology, _pipeSize);
37   }*/
38
39 PipeBroadcastStrategy::PipeBroadcastStrategy(int _topology, CkArrayID _aid, int _pipeSize)
40   : CharmStrategy() {
41   ComlibPrintf("Creating charm pipebcast (%x)\n",this);
42   setType(ARRAY_STRATEGY);
43   ainfo.setDestinationArray(_aid);
44   commonInit(_topology, _pipeSize);
45 }
46
47 PipeBroadcastStrategy::PipeBroadcastStrategy(CkGroupID _gid, int _topology, int _pipeSize)
48   : CharmStrategy() {
49   setType(GROUP_STRATEGY);
50   ginfo.setSourceGroup(_gid);
51   commonInit(_topology, _pipeSize);
52 }
53
54 void PipeBroadcastStrategy::insertMessage(CharmMessageHolder *cmsg){
55   messageBuf->enq(cmsg);
56   doneInserting();
57 }
58
59 // routine for interfacing with converse.
60 // Require only the converse reserved header if forceSplit is true
61 void PipeBroadcastStrategy::conversePipeBcast(envelope *env, int totalSize) {
62   // set the instance ID to be used by the receiver using the XHandler variable
63   CmiSetXHandler(env, myInstanceID);
64   ComlibPrintf("[%d] PipeBroadcast charm, setting instid to %d\n",CkMyPe(),myInstanceID);
65
66   if (totalSize > ((PipeBroadcastConverse*)converseStrategy)->getPipeSize()) {
67     ((PipeBroadcastConverse*)converseStrategy)->conversePipeBcast((char*)env, totalSize);
68   } else {
69     // the message fit into the pipe, so send it in a single chunk
70     ComlibPrintf("[%d] Propagating message in one single chunk (%d)\n",CkMyPe(),CsvAccess(pipeBcastPropagateHandle));
71     CmiSetHandler(env, CsvAccess(pipeBcastPropagateHandle));
72     env->setSrcPe(CkMyPe());
73     ((PipeBroadcastConverse*)converseStrategy)->propagate((char*)env, false, CkMyPe(), totalSize, &envelope::setSrcPe);
74   }
75 }
76
77 void PipeBroadcastStrategy::doneInserting(){
78   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
79   while (!messageBuf->isEmpty()) {
80     CharmMessageHolder *cmsg = messageBuf->deq();
81     // modify the Handler to deliver the message to the propagator
82     envelope *env = UsrToEnv(cmsg->getCharmMessage());
83
84     delete cmsg;
85     conversePipeBcast(env, env->getTotalsize());
86   }
87 }
88
89 void PipeBroadcastStrategy::pup(PUP::er &p){
90   ComlibPrintf("[%d] PipeBroadcast pupping %s\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
91   CharmStrategy::pup(p);
92
93   if (p.isUnpacking()) {
94     converseStrategy = new PipeBroadcastConverse(0,0,this);
95   }
96   p | *converseStrategy;
97
98   if (p.isUnpacking()) {
99     //propagateHandle = CmiRegisterHandler((CmiHandler)propagate_handler);
100
101     ComlibPrintf("[%d] registered handler single to %d\n",CmiMyPe(),CsvAccess(pipeBcastPropagateHandle));
102     messageBuf = new CkQ<CharmMessageHolder *>;
103     converseStrategy->setHigherLevel(this);
104   }
105 }
106
107
108 void PipeBroadcastStrategy::beginProcessing(int x){ 
109   CsvAccess(pipeBcastPropagateHandle) = CkRegisterHandler((CmiHandler)propagate_handler);
110
111 }
112
113 //PUPable_def(PipeBroadcastStrategy);