added a version which uses CmiReference (compiling with option -DCMI_COMLIB_WITH_REFE...
[charm.git] / src / conv-com / pipebroadcastconverse.C
1 #include <math.h>
2 #include "pipebroadcastconverse.h"
3
4 inline int log_of_2 (int i) {
5   int m;
6   for (m=0; i>(1<<m); ++m);
7   return m;
8 }
9
10 //PipeBcastHashKey CODE
11 int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
12     return ((const PipeBcastHashKey *)k1)->
13                 compare(*(const PipeBcastHashKey *)k2);
14 }
15
16 CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
17     return ((const PipeBcastHashKey *)v)->hash();
18 }
19
20 void PipeBroadcastConverse::commonInit(){
21   //log_of_2_inv = 1/log((double)2);
22   seqNumber = 0;
23 }
24
25 extern void propagate_handler(void *);
26
27 void propagate_handler_frag(void *message) {
28   int instid = CmiGetXHandler(message);
29   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse*)ConvComlibGetStrategy(instid);
30   ComlibPrintf("[%d] propagate_handler_frag: calling on instid %d %x\n",CkMyPe(),instid,myStrategy);
31   //CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
32   PipeBcastInfo *info = (PipeBcastInfo*)(((char*)message)+CmiReservedHeaderSize);
33   myStrategy->propagate((char*)message, true, info->srcPe, info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo), NULL);
34 }
35
36 void PipeBroadcastConverse::propagate(char *env, int isFragmented, int srcPeNumber, int totalSendingSize, setFunction setPeNumber){
37   // find destination processors and send
38   int destination, tmp, k, sizeToSend;
39   int num_pes, *dest_pes;
40   PipeBcastInfo *info = (PipeBcastInfo*)(env+CmiReservedHeaderSize);
41   //int srcPeNumber = isFragmented ? info->srcPe : env->getSrcPe();
42   //int totalSendingSize = isFragmented ? info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo) : env->getTotalsize();
43
44   switch (topology) {
45   case USE_LINEAR:
46     if (srcPeNumber == (CkMyPe()+1)%CkNumPes()) break;
47     destination = (CkMyPe()+1) % CkNumPes();
48     ComlibPrintf("[%d] Pipebroadcast sending to %d\n",CkMyPe(), destination);
49     CmiSyncSend(destination, totalSendingSize, env);
50     break;
51   case USE_HYPERCUBE:
52     tmp = srcPeNumber ^ CkMyPe();
53     k = log_of_2(CkNumPes()) + 2;
54     if (tmp) {
55       do {--k;} while (!(tmp>>k));
56     }
57     ComlibPrintf("[%d] tmp=%d, k=%d\n",CkMyPe(),tmp,k);
58     // now 'k' is the last dimension in the hypercube used for exchange
59     if (isFragmented) info->srcPe = CkMyPe();
60     else setPeNumber(env,CkMyPe());  // where the message is coming from
61     dest_pes = (int *)malloc(k*sizeof(int));
62     --k;  // next dimension in the cube to be used
63     num_pes = HypercubeGetBcastDestinations(k, dest_pes);
64
65     /*
66     for ( ; k>=0; --k) {
67       // add the processor destination at level k if it exist
68       dest_pes[num_pes] = CkMyPe() ^ (1<<k);
69       if (dest_pes[num_pes] >= CkNumPes()) {
70         dest_pes[num_pes] &= (-1)<<k;
71         if (CkNumPes()>dest_pes[num_pes]) dest_pes[num_pes] += (CkMyPe() - (CkMyPe() & ((-1)<<k))) % (CkNumPes() - dest_pes[num_pes]);
72       }
73       if (dest_pes[num_pes] < CkNumPes()) {
74         ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CkMyPe(), dest_pes[num_pes]);
75         ++num_pes;
76       }
77     }
78     */
79
80     //CmiSyncListSend(num_pes, dest_pes, env->getTotalsize(), (char *)env);
81 #ifdef CMI_COMLIB_WITH_REFERENCE
82     for (k=0; k<num_pes; ++k) CmiReference(env);
83 #endif
84     for (k=0; k<num_pes; ++k) {
85       ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CkMyPe(), dest_pes[k]);
86 #ifdef CMI_COMLIB_WITH_REFERENCE
87       CmiSyncSendAndFree(dest_pes[k], totalSendingSize, env);
88 #else
89       CmiSyncSend(dest_pes[k], totalSendingSize, env);
90 #endif
91     }
92     //sizeToSend = pipeSize<totalSendingSize ? pipeSize : totalSendingSize;
93     //for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], sizeToSend, env);
94     free(dest_pes);
95     break;
96
97     // for other strategies
98
99   default:
100     // should NEVER reach here!
101     char *error_msg;
102     sprintf(error_msg, "Error, topology %d not known\n",topology);
103     CmiAbort(error_msg);
104   }
105
106   // deliver messages to local objects (i.e. send it to ComlibManager)
107   storing(env, isFragmented);
108   //CmiSetHandler(env, CmiGetXHandler(env));
109   //CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
110
111 }
112
113 void PipeBroadcastConverse::storing(char* fragment, int isFragmented) {
114   char *complete;
115   int isFinished=0;
116   int totalDimension;
117   //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
118
119   // check if the message is fragmented
120   if (isFragmented) {
121     // store the fragment in the hash table until completed
122     ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
123     PipeBcastInfo *info = (PipeBcastInfo*)(fragment+CmiReservedHeaderSize);
124
125     PipeBcastHashKey key (info->bcastPe, info->seqNumber);
126     PipeBcastHashObj *position = fragments.get(key);
127
128     char *incomingMsg;
129     if (position) {
130       // the message already exist, add to it
131       ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
132       incomingMsg = position->message;
133       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
134
135       if (--position->remaining == 0) {  // message completely received
136         isFinished = 1;
137         complete = incomingMsg;
138         totalDimension = position->dimension;
139         // delete from the hash table
140         fragments.remove(key);
141       }
142
143     } else {
144       // the message doesn't exist, create it
145       ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
146       incomingMsg = (char*)CmiAlloc(info->messageSize);
147       memcpy (incomingMsg, fragment, CmiReservedHeaderSize);
148       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
149       int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo)))-1;
150       if (remaining) {  // more than one chunk (it was not forced to be splitted)
151         PipeBcastHashObj *object = new PipeBcastHashObj(info->messageSize, remaining, incomingMsg);
152         fragments.put(key) = object;
153       } else {  // only one chunk, it was forces to be splitted
154         isFinished = 1;
155         complete = incomingMsg;
156         // nothing to delete from fragments since nothing has been added
157       }
158     }
159     CmiFree(fragment);
160
161   } else {  // message not fragmented
162     ComlibPrintf("[%d] deliverer: received message in single chunk\n",CkMyPe());
163     isFinished = 1;
164     complete = fragment;
165   }
166
167   if (isFinished) {
168     higherLevel->deliverer(complete, totalDimension);
169   }
170 }
171
172 void PipeBroadcastConverse::deliverer(char *msg, int dimension) {
173   ComlibPrintf("{%d} dest = %d, %d, %x\n",CkMyPe(),destinationHandler, dimension,CmiHandlerToInfo(destinationHandler).hdlr);
174   if (destinationHandler) {
175     CmiSetHandler(msg, destinationHandler);
176     CmiSyncSendAndFree(CkMyPe(), dimension, msg);
177   } else {
178     CmiPrintf("[%d] Pipelined Broadcast: message not delivered since destination not set!");
179   }
180 }
181
182 PipeBroadcastConverse::PipeBroadcastConverse(int _topology, int _pipeSize, Strategy *parent) : Strategy(), topology(_topology), pipeSize(_pipeSize) {
183   if (parent) higherLevel = parent;
184   else higherLevel = this;
185   seqNumber = 0;
186   messageBuf = new CkQ<MessageHolder *>;
187   //if (!parent) propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
188   ComlibPrintf("init: %d %d (%x)\n",topology, pipeSize,this);
189   //if (!parent) ComlibPrintf("[%d] registered handler fragmented to %d\n",CkMyPe(),propagateHandle_frag);
190 }
191
192 void PipeBroadcastConverse::insertMessage(MessageHolder *cmsg){
193   ComlibPrintf("[%d] Pipelined Broadcast with converse strategy %d\n",CkMyPe(),topology);
194   messageBuf->enq(cmsg);
195   doneInserting();
196 }
197
198 void PipeBroadcastConverse::doneInserting(){
199   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
200   while (!messageBuf->isEmpty()) {
201     MessageHolder *cmsg = messageBuf->deq();
202     // modify the Handler to deliver the message to the propagator
203     char *env = cmsg->getMessage();
204     CmiSetHandler(env, CsvAccess(pipeBcastPropagateHandle_frag));
205     conversePipeBcast(env, cmsg->getSize());
206     delete cmsg;
207     //conversePipeBcast(env, env->getTotalsize(), false);
208   }
209 }
210
211 // routine for interfacing with converse.
212 // Require only the converse reserved header if forceSplit is true
213 void PipeBroadcastConverse::conversePipeBcast(char *env, int totalSize) {
214   // set the instance ID to be used by the receiver using the XHandler variable
215   CmiSetXHandler(env, myInstanceID);
216
217   ++seqNumber;
218   // message doesn't fit into the pipe: split it into chunks and propagate them individually
219   ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CkMyPe(),totalSize);
220
221   char *sendingMsg;
222   char *nextChunk = env+CmiReservedHeaderSize;
223   int remaining = totalSize-CmiReservedHeaderSize;
224   int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
225   int sendingMsgSize;
226   ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipeBcastInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipeBcastInfo));
227   ComlibPrintf("sending %d chunks of size %d, total=%d to handle %d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining,CsvAccess(pipeBcastPropagateHandle_frag));
228   CmiSetHandler(env, CsvAccess(pipeBcastPropagateHandle_frag));
229   ComlibPrintf("setting env handler to %d\n",CsvAccess(pipeBcastPropagateHandle_frag));
230   for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
231     sendingMsgSize = reducedPipe<remaining? pipeSize : remaining+CmiReservedHeaderSize+sizeof(PipeBcastInfo);
232     sendingMsg = (char*)CmiAlloc(sendingMsgSize);
233     memcpy (sendingMsg, env, CmiReservedHeaderSize);
234     PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
235     info->srcPe = CkMyPe();
236     info->bcastPe = CkMyPe();
237     info->seqNumber = seqNumber;
238     info->chunkNumber = i;
239     info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
240     info->messageSize = totalSize;
241     memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, info->chunkSize);
242
243     remaining -= info->chunkSize;
244     nextChunk += info->chunkSize;
245
246     propagate(sendingMsg, true, CkMyPe(), sendingMsgSize, NULL);
247   }
248   CmiFree(env);
249 }
250
251 void PipeBroadcastConverse::pup(PUP::er &p){
252   Strategy::pup(p);
253   ComlibPrintf("[%d] initial of Pipeconverse pup %s\n",CkMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
254
255   p | pipeSize;
256   p | topology;
257   p | seqNumber;
258
259   ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d, topology=%d\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize,topology);
260
261   if (p.isUnpacking()) {
262     //log_of_2_inv = 1/log((double)2);
263     messageBuf = new CkQ<MessageHolder *>;
264     //propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
265     ComlibPrintf("[%d] registered handler fragmented to %d\n",CkMyPe(),CsvAccess(pipeBcastPropagateHandle_frag));
266   }
267   if (p.isPacking()) {
268     delete messageBuf;
269   }
270   //p|(*messageBuf);
271   //p|fragments;
272
273 }
274
275 PUPable_def(PipeBroadcastConverse);