fixing for ft
[charm.git] / src / conv-com / pipebroadcastconverse.C
1 /**
2    @addtogroup ComlibConverseStrategy
3    @{
4    @file
5    Implementation of the PipeBroadcastConverse strategy
6 */
7
8 #include <math.h>
9 #include "pipebroadcastconverse.h"
10
11 inline int log_of_2 (int i) {
12   int m;
13   for (m=0; i>(1<<m); ++m);
14   return m;
15 }
16
17 //PipeBcastHashKey CODE
18 int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
19     return ((const PipeBcastHashKey *)k1)->
20                 compare(*(const PipeBcastHashKey *)k2);
21 }
22
23 CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
24     return ((const PipeBcastHashKey *)v)->hash();
25 }
26
27 /*
28 void PipeBroadcastConverse::commonInit(){
29   //log_of_2_inv = 1/log((double)2);
30   seqNumber = 0;
31 }
32 */
33
34 //extern void propagate_handler(void *);
35
36 CkpvDeclare(int, pipeline_handler);
37 /**
38  * Converse handler for messages broadcasted through PipeBroadcastConverse and
39  * subclasses when fragmentation is needed. The message in this case has always
40  * a PipeBcastInfo structure right after the converse header.
41  */
42 void PipelineFragmentHandler(void *message) {
43   int instid = CmiGetStrategy(message);
44   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse*)ConvComlibGetStrategy(instid);
45   ComlibPrintf("[%d] PipelineFragmentHandler: %d\n",CkMyPe(),instid);
46   //  PipeBcastInfo *info = (PipeBcastInfo*)(((char*)message)+CmiReservedHeaderSize);
47   myStrategy->propagate((char*)message, true);//, info->srcPe, info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo), NULL);
48 }
49
50 CkpvDeclare(int, pipeline_frag_handler);
51 /**
52  * Converse handler for messages broadcasted through PipeBroadcastConverse and
53  * subclasses when no fragmentation is needed (i.e the total size is less than
54  * the pipeSize)
55  */
56 void PipelineHandler(void *message) {
57   int instid = CmiGetStrategy(message);
58   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse*)ConvComlibGetStrategy(instid);
59   ComlibPrintf("[%d] PipelineHandler: %d\n",CkMyPe(),instid);
60   //PipeBcastInfo *info = (PipeBcastInfo*)(((char*)message)+CmiReservedHeaderSize);
61   myStrategy->propagate((char*)message, false);
62 }
63
64 PipeBroadcastConverse::PipeBroadcastConverse(short _topology, int _pipeSize) : Strategy(), topology(_topology), pipeSize(_pipeSize) {
65   seqNumber = 0;
66   //messageBuf = new CkQ<MessageHolder *>;
67   //if (!parent) propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
68   ComlibPrintf("[%d] PipeBroadcastConverse constructor: %d %d\n",CkMyPe(),topology, pipeSize);
69   //if (!parent) ComlibPrintf("[%d] registered handler fragmented to %d\n",CkMyPe(),propagateHandle_frag);
70 }
71
72 CmiFragmentHeader *PipeBroadcastConverse::getFragmentHeader(char *msg) {
73   return (CmiFragmentHeader*)(msg+CmiReservedHeaderSize);
74 }
75
76 void PipeBroadcastConverse::propagate(char *msg, int isFragmented) {//, int srcPeNumber, int totalSendingSize, setFunction setPeNumber){
77   // find destination processors and send
78   int destination, tmp, k; //, sizeToSend;
79   int num_pes, *dest_pes;
80   //int srcPeNumber = isFragmented ? info->srcPe : env->getSrcPe();
81   //int totalSendingSize = isFragmented ? info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo) : env->getTotalsize();
82
83   // get the information about sourcePe and message size
84   int srcPeNumber, totalSendingSize;
85   CmiFragmentHeader *frag = NULL;
86   PipeBcastInfo *info = NULL;
87   if (isFragmented) {
88     info = (PipeBcastInfo*)(msg+CmiReservedHeaderSize);
89     srcPeNumber = info->srcPe;
90     totalSendingSize = info->messageSize;
91   } else {
92     frag = getFragmentHeader(msg);
93     srcPeNumber = frag->senderPe;
94     totalSendingSize = frag->msgSize;
95   }
96
97   switch (topology) {
98   case USE_LINEAR:
99     if (srcPeNumber == (CkMyPe()+1)%CkNumPes()) break;
100     destination = (CkMyPe()+1) % CkNumPes();
101     ComlibPrintf("[%d] Pipebroadcast sending to %d\n",CkMyPe(), destination);
102     CmiSyncSend(destination, totalSendingSize, msg);
103     break;
104   case USE_HYPERCUBE:
105     tmp = srcPeNumber ^ CkMyPe();
106     k = log_of_2(CkNumPes()) + 2;
107     if (tmp) {
108       do {--k;} while (!(tmp>>k));
109     }
110     ComlibPrintf("[%d] tmp=%d, k=%d\n",CkMyPe(),tmp,k);
111     // now 'k' is the last dimension in the hypercube used for exchange
112     if (isFragmented) info->srcPe = CkMyPe();
113     else frag->senderPe = CkMyPe();
114     dest_pes = (int *)malloc(k*sizeof(int));
115     --k;  // next dimension in the cube to be used
116     num_pes = HypercubeGetBcastDestinations(CkMyPe(), CkNumPes(), k, dest_pes);
117
118     /*
119     for ( ; k>=0; --k) {
120       // add the processor destination at level k if it exist
121       dest_pes[num_pes] = CkMyPe() ^ (1<<k);
122       if (dest_pes[num_pes] >= CkNumPes()) {
123         dest_pes[num_pes] &= (-1)<<k;
124         if (CkNumPes()>dest_pes[num_pes]) dest_pes[num_pes] += (CkMyPe() - (CkMyPe() & ((-1)<<k))) % (CkNumPes() - dest_pes[num_pes]);
125       }
126       if (dest_pes[num_pes] < CkNumPes()) {
127         ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CkMyPe(), dest_pes[num_pes]);
128         ++num_pes;
129       }
130     }
131     */
132
133     //CmiSyncListSend(num_pes, dest_pes, env->getTotalsize(), (char *)env);
134 #ifdef CMI_COMLIB_WITH_REFERENCE
135     for (k=0; k<num_pes; ++k) {
136       //ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CkMyPe(), dest_pes[k]);
137       CmiReference(msg);
138       CmiSyncSendAndFree(dest_pes[k], totalSendingSize, msg);
139     }
140 #else
141     CmiSyncListSend(num_pes, dest_pes, totalSendingSize, msg);
142 #endif
143     //sizeToSend = pipeSize<totalSendingSize ? pipeSize : totalSendingSize;
144     //for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], sizeToSend, env);
145     free(dest_pes);
146     break;
147
148     // for other strategies
149
150   default:
151     // should NEVER reach here!
152     char error_msg[100];
153     sprintf(error_msg, "Error, topology %d not known\n",topology);
154     CmiAbort(error_msg);
155   }
156
157   // decide what to do after with the message
158   if (isFragmented) store(msg);
159   else deliver(msg, totalSendingSize);
160
161   // deliver messages to local objects (i.e. send it to ComlibManager)
162   //storing(env, isFragmented);
163   //CmiSetHandler(env, CmiGetXHandler(env));
164   //CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
165
166 }
167
168 // this function is called only on fragmented messages
169 void PipeBroadcastConverse::store(char* fragment) {
170   //char *complete;
171   //int isFinished=0;
172   //int totalDimension;
173   //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
174
175   // check if the message is fragmented
176   //if (isFragmented) {
177   // store the fragment in the hash table until completed
178   //ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
179   PipeBcastInfo *info = (PipeBcastInfo*)(fragment+CmiReservedHeaderSize);
180
181   PipeBcastHashKey key (info->bcastPe, info->seqNumber);
182   PipeBcastHashObj *position = fragments.get(key);
183
184   char *incomingMsg;
185   if (position) {
186     // the message already exist, add to it
187     ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
188     incomingMsg = position->message;
189     memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
190
191     if (--position->remaining == 0) {  // message completely received
192       // the deliver function will take care of deleting the message
193       deliver(incomingMsg, position->dimension);
194       //isFinished = 1;
195       //complete = incomingMsg;
196       //totalDimension = position->dimension;
197       // delete from the hash table
198       fragments.remove(key);
199       delete position;
200     }
201
202   } else {
203     // the message doesn't exist, create it
204     ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
205     incomingMsg = (char*)CmiAlloc(info->messageSize);
206     memcpy (incomingMsg, fragment, CmiReservedHeaderSize);
207     memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
208     int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo)))-1;
209     CmiAssert(remaining > 0);
210     //if (remaining) {  // more than one chunk (it was not forced to be splitted)
211     PipeBcastHashObj *object = new PipeBcastHashObj(info->messageSize, remaining, incomingMsg);
212     fragments.put(key) = object;
213     /*
214       } else {  // only one chunk, it was forces to be splitted
215       isFinished = 1;
216       complete = incomingMsg;
217       // nothing to delete from fragments since nothing has been added
218       }
219     */
220   }
221   CmiFree(fragment);
222
223   /*
224     } else {  // message not fragmented
225     ComlibPrintf("[%d] deliverer: received message in single chunk\n",CkMyPe());
226     isFinished = 1;
227     complete = fragment;
228     }
229   */
230
231   //if (isFinished) {
232   //}
233 }
234
235 void PipeBroadcastConverse::deliver(char *msg, int dimension) {
236   //ComlibPrintf("{%d} dest = %d, %d, %x\n",CkMyPe(),destinationHandler, dimension,CmiHandlerToInfo(destinationHandler).hdlr);
237   CmiFragmentHeader *info = (CmiFragmentHeader*)(msg+CmiReservedHeaderSize);
238   CmiSetHandler(msg, info->destination);
239   CmiSyncSendAndFree(CkMyPe(), dimension, msg);
240   /*
241   if (destinationHandler) {
242     CmiSetHandler(msg, destinationHandler);
243     CmiSyncSendAndFree(CkMyPe(), dimension, msg);
244   } else {
245     CmiPrintf("[%d] Pipelined Broadcast: message not delivered since destination not set!");
246   }
247   */
248 }
249
250 void PipeBroadcastConverse::insertMessage(MessageHolder *cmsg){
251   ComlibPrintf("[%d] PipeBroadcastConverse::insertMessage %d\n",CkMyPe(),topology);
252   char *msg = cmsg->getMessage();
253   int size = cmsg->getSize();
254   if (size < pipeSize) {
255     // sending message in a single chunk
256     CmiSetHandler(msg, CkpvAccess(pipeline_handler));
257     CmiFragmentHeader *frag = getFragmentHeader(msg);
258     frag->senderPe = CkMyPe();
259     frag->msgSize = size;
260     propagate(msg, false);
261
262   } else {
263     // sending message in multiple chunk: message doesn't fit into the pipe:
264     // split it into chunks and propagate them individually
265     ++seqNumber;
266     ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CkMyPe(),size);
267
268     char *sendingMsg;
269     char *nextChunk = msg+CmiReservedHeaderSize;
270     int remaining = size-CmiReservedHeaderSize;
271     int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
272     int sendingMsgSize;
273     CmiSetHandler(msg, CkpvAccess(pipeline_frag_handler));
274
275     // send all the chunks one after the other
276     for (int i=0; i<(int)ceil(((double)size-CmiReservedHeaderSize)/reducedPipe); ++i) {
277       sendingMsgSize = reducedPipe<remaining? pipeSize : remaining+CmiReservedHeaderSize+sizeof(PipeBcastInfo);
278       sendingMsg = (char*)CmiAlloc(sendingMsgSize);
279       memcpy (sendingMsg, msg, CmiReservedHeaderSize);
280       PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
281       info->srcPe = CkMyPe();
282       info->bcastPe = CkMyPe();
283       info->seqNumber = seqNumber;
284       info->chunkNumber = i;
285       info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
286       info->messageSize = size;
287       memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, info->chunkSize);
288
289       remaining -= info->chunkSize;
290       nextChunk += info->chunkSize;
291
292       propagate(sendingMsg, true);
293     }
294
295   }
296   //CmiSetHandler(msg, CsvAccess(pipeBcastPropagateHandle_frag));
297   //conversePipeBcast(msg, cmsg->getSize());
298   delete cmsg;
299 }
300
301 /*
302 void PipeBroadcastConverse::doneInserting(){
303   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
304   while (!messageBuf->isEmpty()) {
305     MessageHolder *cmsg = messageBuf->deq();
306     // modify the Handler to deliver the message to the propagator
307     char *env = cmsg->getMessage();
308     CmiSetHandler(env, CsvAccess(pipeBcastPropagateHandle_frag));
309     conversePipeBcast(env, cmsg->getSize());
310     delete cmsg;
311     //conversePipeBcast(env, env->getTotalsize(), false);
312   }
313 }
314 */
315
316 /*
317 // routine for interfacing with converse.
318 // Require only the converse reserved header if forceSplit is true
319 void PipeBroadcastConverse::conversePipeBcast(char *msg, int totalSize) {
320   // set the instance ID to be used by the receiver using the XHandler variable
321   //CmiSetXHandler(env, myInstanceID);
322
323   ++seqNumber;
324   // message doesn't fit into the pipe: split it into chunks and propagate them individually
325   ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CkMyPe(),totalSize);
326
327   char *sendingMsg;
328   char *nextChunk = msg+CmiReservedHeaderSize;
329   int remaining = totalSize-CmiReservedHeaderSize;
330   int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
331   int sendingMsgSize;
332   //ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipeBcastInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipeBcastInfo));
333   //ComlibPrintf("sending %d chunks of size %d, total=%d to handle %d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining,CsvAccess(pipeBcastPropagateHandle_frag));
334   CmiSetHandler(msg, CsvAccess(pipeline_frag_handler));
335   //ComlibPrintf("setting env handler to %d\n",CsvAccess(pipeBcastPropagateHandle_frag));
336
337   // send all the chunks one after the other
338   for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
339     sendingMsgSize = reducedPipe<remaining? pipeSize : remaining+CmiReservedHeaderSize+sizeof(PipeBcastInfo);
340     sendingMsg = (char*)CmiAlloc(sendingMsgSize);
341     memcpy (sendingMsg, env, CmiReservedHeaderSize);
342     PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
343     info->srcPe = CkMyPe();
344     info->bcastPe = CkMyPe();
345     info->seqNumber = seqNumber;
346     info->chunkNumber = i;
347     info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
348     info->messageSize = totalSize;
349     memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, info->chunkSize);
350
351     remaining -= info->chunkSize;
352     nextChunk += info->chunkSize;
353
354     propagate(sendingMsg, true);//, CkMyPe(), sendingMsgSize, NULL);
355   }
356   CmiFree(msg);
357 }
358 */
359
360 void PipeBroadcastConverse::pup(PUP::er &p){
361   Strategy::pup(p);
362   ComlibPrintf("[%d] initial of PipeBroadcastConverse::pup %s\n",CkMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
363
364   p | pipeSize;
365   p | topology;
366   p | seqNumber;
367
368   //ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d, topology=%d\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize,topology);
369
370   /*
371   if (p.isUnpacking()) {
372     //log_of_2_inv = 1/log((double)2);
373     messageBuf = new CkQ<MessageHolder *>;
374     //propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
375     ComlibPrintf("[%d] registered handler fragmented to %d\n",CkMyPe(),CsvAccess(pipeBcastPropagateHandle_frag));
376   }
377   if (p.isPacking()) {
378     delete messageBuf;
379   }
380   //p|(*messageBuf);
381   //p|fragments;
382   */
383 }
384
385 PUPable_def(PipeBroadcastConverse)
386
387 /*@}*/