fixing log problem, and little modification to the interface to allow
[charm.git] / src / conv-com / pipebroadcastconverse.C
1 #include <math.h>
2 #include "pipebroadcastconverse.h"
3
4 inline int log_of_2 (int i) {
5   int m;
6   for (m=0; i>(1<<m); ++m);
7   return m;
8 }
9
10 //PipeBcastHashKey CODE
11 int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
12     return ((const PipeBcastHashKey *)k1)->
13                 compare(*(const PipeBcastHashKey *)k2);
14 }
15
16 CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
17     return ((const PipeBcastHashKey *)v)->hash();
18 }
19
20 void PipeBroadcastConverse::commonInit(){
21   //log_of_2_inv = 1/log((double)2);
22   seqNumber = 0;
23 }
24
25 extern void propagate_handler(void *);
26
27 void propagate_handler_frag(void *message) {
28   int instid = CmiGetXHandler(message);
29   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse*)ConvComlibGetStrategy(instid);
30   //CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
31   PipeBcastInfo *info = (PipeBcastInfo*)(((char*)message)+CmiReservedHeaderSize);
32   myStrategy->propagate((char*)message, true, info->srcPe, info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo), NULL);
33 }
34
35 void PipeBroadcastConverse::propagate(char *env, int isFragmented, int srcPeNumber, int totalSendingSize, setFunction setPeNumber){
36   // find destination processors and send
37   int destination, tmp, k;
38   int num_pes, *dest_pes;
39   PipeBcastInfo *info = (PipeBcastInfo*)(env+CmiReservedHeaderSize);
40   //int srcPeNumber = isFragmented ? info->srcPe : env->getSrcPe();
41   //int totalSendingSize = isFragmented ? info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo) : env->getTotalsize();
42
43   switch (topology) {
44   case USE_LINEAR:
45     if (srcPeNumber == (CmiMyPe()+1)%CmiNumPes()) break;
46     destination = (CmiMyPe()+1) % CmiNumPes();
47     ComlibPrintf("[%d] Pipebroadcast sending to %d\n",CmiMyPe(), destination);
48     CmiSyncSend(destination, totalSendingSize, env);
49     break;
50   case USE_HYPERCUBE:
51     tmp = srcPeNumber ^ CmiMyPe();
52     k = log_of_2(CmiNumPes()) + 2;
53     if (tmp) {
54       do {--k;} while (!(tmp>>k));
55     }
56     ComlibPrintf("[%d] tmp=%d, k=%d\n",CmiMyPe(),tmp,k);
57     // now 'k' is the last dimension in the hypercube used for exchange
58     if (isFragmented) info->srcPe = CmiMyPe();
59     else setPeNumber(env,CmiMyPe());  // where the message is coming from
60     dest_pes = (int *)malloc(k*sizeof(int));
61     --k;  // next dimension in the cube to be used
62     num_pes = HypercubeGetBcastDestinations(k, dest_pes);
63
64     /*
65     for ( ; k>=0; --k) {
66       // add the processor destination at level k if it exist
67       dest_pes[num_pes] = CmiMyPe() ^ (1<<k);
68       if (dest_pes[num_pes] >= CmiNumPes()) {
69         dest_pes[num_pes] &= (-1)<<k;
70         if (CmiNumPes()>dest_pes[num_pes]) dest_pes[num_pes] += (CmiMyPe() - (CmiMyPe() & ((-1)<<k))) % (CmiNumPes() - dest_pes[num_pes]);
71       }
72       if (dest_pes[num_pes] < CmiNumPes()) {
73         ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CmiMyPe(), dest_pes[num_pes]);
74         ++num_pes;
75       }
76     }
77     */
78
79     //CmiSyncListSend(num_pes, dest_pes, env->getTotalsize(), (char *)env);
80     for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], totalSendingSize, env);
81     free(dest_pes);
82     break;
83
84     // for other strategies
85
86   default:
87     // should NEVER reach here!
88     CmiPrintf("Error, topology %d not known\n",topology);
89     CkExit();
90   }
91
92   // deliver messages to local objects (i.e. send it to ComlibManager)
93   storing(env, isFragmented);
94   //CmiSetHandler(env, CmiGetXHandler(env));
95   //CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
96
97 }
98
99 void PipeBroadcastConverse::storing(char* fragment, int isFragmented) {
100   char *complete;
101   int isFinished=0;
102   int totalDimension;
103   //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
104
105   // check if the message is fragmented
106   if (isFragmented) {
107     // store the fragment in the hash table until completed
108     ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
109     PipeBcastInfo *info = (PipeBcastInfo*)(fragment+CmiReservedHeaderSize);
110
111     PipeBcastHashKey key (info->bcastPe, info->seqNumber);
112     PipeBcastHashObj *position = fragments.get(key);
113
114     char *incomingMsg;
115     if (position) {
116       // the message already exist, add to it
117       ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CmiMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
118       incomingMsg = position->message;
119       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
120
121       if (--position->remaining == 0) {  // message completely received
122         isFinished = 1;
123         complete = incomingMsg;
124         totalDimension = position->dimension;
125         // delete from the hash table
126         fragments.remove(key);
127       }
128
129     } else {
130       // the message doesn't exist, create it
131       ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CmiMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
132       incomingMsg = (char*)CmiAlloc(info->messageSize);
133       memcpy (incomingMsg, fragment, CmiReservedHeaderSize);
134       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
135       int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo)))-1;
136       if (remaining) {  // more than one chunk (it was not forced to be splitted)
137         PipeBcastHashObj *object = new PipeBcastHashObj(info->messageSize, remaining, incomingMsg);
138         fragments.put(key) = object;
139       } else {  // only one chunk, it was forces to be splitted
140         isFinished = 1;
141         complete = incomingMsg;
142         // nothing to delete from fragments since nothing has been added
143       }
144     }
145     CmiFree(fragment);
146
147   } else {  // message not fragmented
148     ComlibPrintf("[%d] deliverer: received message in single chunk\n",CmiMyPe());
149     isFinished = 1;
150     complete = fragment;
151   }
152
153   if (isFinished) {
154     higherLevel->deliverer(complete, totalDimension);
155   }
156 }
157
158 void PipeBroadcastConverse::deliverer(char *msg, int dimension) {
159   if (destinationHandler) {
160     CmiSetHandler(msg, destinationHandler);
161     CmiSyncSendAndFree(CmiMyPe(), dimension, msg);
162   } else {
163     CmiPrintf("[%d] Pipelined Broadcast: message not delivered since destination not set!");
164   }
165 }
166
167 PipeBroadcastConverse::PipeBroadcastConverse(int _topology, int _pipeSize, Strategy *parent) : Strategy(), topology(_topology), pipeSize(_pipeSize) {
168   higherLevel = parent;
169   seqNumber = 0;
170   ComlibPrintf("init: %d %d\n",topology, pipeSize);
171 }
172
173 void PipeBroadcastConverse::insertMessage(MessageHolder *cmsg){
174   ComlibPrintf("[%d] Pipelined Broadcast with converse strategy %d\n",CkMyPe(),topology);
175   messageBuf->enq(cmsg);
176   doneInserting();
177 }
178
179 void PipeBroadcastConverse::doneInserting(){
180   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
181   while (!messageBuf->isEmpty()) {
182     MessageHolder *cmsg = messageBuf->deq();
183     // modify the Handler to deliver the message to the propagator
184     char *env = cmsg->getMessage();
185
186     //conversePipeBcast(env, env->getTotalsize(), false);
187   }
188 }
189
190 // routine for interfacing with converse.
191 // Require only the converse reserved header if forceSplit is true
192 void PipeBroadcastConverse::conversePipeBcast(char *env, int totalSize) {
193   // set the instance ID to be used by the receiver using the XHandler variable
194   CmiSetXHandler(env, myInstanceID);
195
196   ++seqNumber;
197   // message doesn't fit into the pipe: split it into chunks and propagate them individually
198   ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CmiMyPe(),totalSize);
199
200   char *sendingMsg;
201   char *nextChunk = env+CmiReservedHeaderSize;
202   int remaining = totalSize-CmiReservedHeaderSize;
203   int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
204   ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipeBcastInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipeBcastInfo));
205   ComlibPrintf("sending %d chunks of size %d, total=%d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining);
206   for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
207     sendingMsg = (char*)CmiAlloc(pipeSize);
208     CmiSetHandler(env, propagateHandle_frag);
209     memcpy (sendingMsg, env, CmiReservedHeaderSize);
210     PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
211     info->srcPe = CmiMyPe();
212     info->bcastPe = CmiMyPe();
213     info->seqNumber = seqNumber;
214     info->chunkNumber = i;
215     info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
216     info->messageSize = totalSize;
217     memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, reducedPipe);
218
219     remaining -= reducedPipe;
220     nextChunk += reducedPipe;
221
222     propagate(sendingMsg, true, CmiMyPe(), totalSize, NULL);
223   }
224 }
225
226 void PipeBroadcastConverse::pup(PUP::er &p){
227   Strategy::pup(p);
228   ComlibPrintf("[%d] initial of Pipeconverse pup %s\n",CmiMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
229
230   p | pipeSize;
231   p | topology;
232   p | seqNumber;
233
234   ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d, topology=%d\n",CmiMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize,topology);
235
236   if (p.isUnpacking()) {
237     //log_of_2_inv = 1/log((double)2);
238     messageBuf = new CkQ<MessageHolder *>;
239     propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
240   }
241   //p|(*messageBuf);
242   //p|fragments;
243
244 }
245
246 PUPable_def(PipeBroadcastConverse);