2f631deedec4ec88153b059dd8abbb4327042819
[charm.git] / src / conv-com / pipebroadcastconverse.C
1 #include <math.h>
2 #include "pipebroadcastconverse.h"
3
4 //PipeBcastHashKey CODE
5 int PipeBcastHashKey::staticCompare(const void *k1,const void *k2,size_t ){
6     return ((const PipeBcastHashKey *)k1)->
7                 compare(*(const PipeBcastHashKey *)k2);
8 }
9
10 CkHashCode PipeBcastHashKey::staticHash(const void *v,size_t){
11     return ((const PipeBcastHashKey *)v)->hash();
12 }
13
14 void PipeBroadcastConverse::commonInit(){
15   log_of_2_inv = 1/log((double)2);
16   seqNumber = 0;
17 }
18
19 extern void propagate_handler(void *);
20
21 void propagate_handler_frag(void *message) {
22   int instid = CmiGetXHandler(message);
23   PipeBroadcastConverse *myStrategy = (PipeBroadcastConverse*)ConvComlibGetStrategy(instid);
24   //CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
25   PipeBcastInfo *info = (PipeBcastInfo*)(((char*)message)+CmiReservedHeaderSize);
26   myStrategy->propagate((char*)message, true, info->srcPe, info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo), NULL);
27 }
28
29 void PipeBroadcastConverse::propagate(char *env, int isFragmented, int srcPeNumber, int totalSendingSize, setFunction setPeNumber){
30   // find destination processors and send
31   int destination, tmp, k;
32   int num_pes, *dest_pes;
33   PipeBcastInfo *info = (PipeBcastInfo*)(env+CmiReservedHeaderSize);
34   //int srcPeNumber = isFragmented ? info->srcPe : env->getSrcPe();
35   //int totalSendingSize = isFragmented ? info->chunkSize+CmiReservedHeaderSize+sizeof(PipeBcastInfo) : env->getTotalsize();
36
37   switch (topology) {
38   case USE_LINEAR:
39     if (srcPeNumber == (CmiMyPe()+1)%CmiNumPes()) break;
40     destination = (CmiMyPe()+1) % CmiNumPes();
41     ComlibPrintf("[%d] Pipebroadcast sending to %d\n",CmiMyPe(), destination);
42     CmiSyncSend(destination, totalSendingSize, env);
43     break;
44   case USE_HYPERCUBE:
45     tmp = srcPeNumber ^ CmiMyPe();
46     k = int(log((double)CmiNumPes()) * log_of_2_inv + 2);
47     if (tmp) {
48       do {--k;} while (!(tmp>>k));
49     }
50     ComlibPrintf("[%d] tmp=%d, k=%d\n",CmiMyPe(),tmp,k);
51     // now 'k' is the last dimension in the hypercube used for exchange
52     if (isFragmented) info->srcPe = CmiMyPe();
53     else setPeNumber(env,CmiMyPe());  // where the message is coming from
54     dest_pes = (int *)malloc(k*sizeof(int));
55     --k;  // next dimension in the cube to be used
56     num_pes = HypercubeGetBcastDestinations(k, dest_pes);
57
58     /*
59     for ( ; k>=0; --k) {
60       // add the processor destination at level k if it exist
61       dest_pes[num_pes] = CmiMyPe() ^ (1<<k);
62       if (dest_pes[num_pes] >= CmiNumPes()) {
63         dest_pes[num_pes] &= (-1)<<k;
64         if (CmiNumPes()>dest_pes[num_pes]) dest_pes[num_pes] += (CmiMyPe() - (CmiMyPe() & ((-1)<<k))) % (CmiNumPes() - dest_pes[num_pes]);
65       }
66       if (dest_pes[num_pes] < CmiNumPes()) {
67         ComlibPrintf("[%d] PipeBroadcast sending to %d\n",CmiMyPe(), dest_pes[num_pes]);
68         ++num_pes;
69       }
70     }
71     */
72
73     //CmiSyncListSend(num_pes, dest_pes, env->getTotalsize(), (char *)env);
74     for (k=0; k<num_pes; ++k) CmiSyncSend(dest_pes[k], totalSendingSize, env);
75     free(dest_pes);
76     break;
77
78     // for other strategies
79
80   default:
81     // should NEVER reach here!
82     CmiPrintf("Error, topology %d not known\n",topology);
83     CkExit();
84   }
85
86   // deliver messages to local objects (i.e. send it to ComlibManager)
87   storing(env, isFragmented);
88   //CmiSetHandler(env, CmiGetXHandler(env));
89   //CmiSyncSendAndFree(CkMyPe(), env->getTotalsize(), (char *)env);
90
91 }
92
93 void PipeBroadcastConverse::storing(char* fragment, int isFragmented) {
94   char *complete;
95   int isFinished=0;
96   //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
97
98   // check if the message is fragmented
99   if (isFragmented) {
100     // store the fragment in the hash table until completed
101     ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
102     PipeBcastInfo *info = (PipeBcastInfo*)(fragment+CmiReservedHeaderSize);
103
104     PipeBcastHashKey key (info->bcastPe, info->seqNumber);
105     PipeBcastHashObj *position = fragments.get(key);
106
107     char *incomingMsg;
108     if (position) {
109       // the message already exist, add to it
110       ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CmiMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
111       incomingMsg = position->message;
112       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
113
114       if (--position->remaining == 0) {  // message completely received
115         isFinished = 1;
116         complete = incomingMsg;
117         // delete from the hash table
118         fragments.remove(key);
119       }
120
121     } else {
122       // the message doesn't exist, create it
123       ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CmiMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
124       incomingMsg = (char*)CmiAlloc(info->messageSize);
125       memcpy (incomingMsg, fragment, CmiReservedHeaderSize);
126       memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipeBcastInfo), info->chunkSize);
127       int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo)))-1;
128       if (remaining) {  // more than one chunk (it was not forced to be splitted)
129         PipeBcastHashObj *object = new PipeBcastHashObj(info->messageSize, remaining, incomingMsg);
130         fragments.put(key) = object;
131       } else {  // only one chunk, it was forces to be splitted
132         isFinished = 1;
133         complete = incomingMsg;
134         // nothing to delete from fragments since nothing has been added
135       }
136     }
137     CmiFree(fragment);
138
139   } else {  // message not fragmented
140     ComlibPrintf("[%d] deliverer: received message in single chunk\n",CmiMyPe());
141     isFinished = 1;
142     complete = fragment;
143   }
144
145   if (isFinished) {
146     higherLevel->deliverer(complete);
147   }
148 }
149
150 void PipeBroadcastConverse::deliverer(char *msg) {
151   // TO BE DONE
152 }
153
154 PipeBroadcastConverse::PipeBroadcastConverse(int _topology, int _pipeSize, Strategy *parent) : Strategy(), topology(_topology), pipeSize(_pipeSize) {
155   higherLevel = parent;
156   seqNumber = 0;
157   ComlibPrintf("init: %d %d\n",topology, pipeSize);
158 }
159
160 void PipeBroadcastConverse::insertMessage(MessageHolder *cmsg){
161   ComlibPrintf("[%d] Pipelined Broadcast with converse strategy %d\n",CkMyPe(),topology);
162   messageBuf->enq(cmsg);
163   doneInserting();
164 }
165
166 void PipeBroadcastConverse::doneInserting(){
167   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
168   while (!messageBuf->isEmpty()) {
169     MessageHolder *cmsg = messageBuf->deq();
170     // modify the Handler to deliver the message to the propagator
171     char *env = cmsg->getMessage();
172
173     //conversePipeBcast(env, env->getTotalsize(), false);
174   }
175 }
176
177 // routine for interfacing with converse.
178 // Require only the converse reserved header if forceSplit is true
179 void PipeBroadcastConverse::conversePipeBcast(char *env, int totalSize) {
180   // set the instance ID to be used by the receiver using the XHandler variable
181   CmiSetXHandler(env, myInstanceID);
182
183   ++seqNumber;
184   // message doesn't fit into the pipe: split it into chunks and propagate them individually
185   ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CmiMyPe(),totalSize);
186
187   char *sendingMsg;
188   char *nextChunk = env+CmiReservedHeaderSize;
189   int remaining = totalSize-CmiReservedHeaderSize;
190   int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipeBcastInfo);
191   ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipeBcastInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipeBcastInfo));
192   ComlibPrintf("sending %d chunks of size %d, total=%d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining);
193   for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
194     sendingMsg = (char*)CmiAlloc(pipeSize);
195     CmiSetHandler(env, propagateHandle_frag);
196     memcpy (sendingMsg, env, CmiReservedHeaderSize);
197     PipeBcastInfo *info = (PipeBcastInfo*)(sendingMsg+CmiReservedHeaderSize);
198     info->srcPe = CmiMyPe();
199     info->bcastPe = CmiMyPe();
200     info->seqNumber = seqNumber;
201     info->chunkNumber = i;
202     info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
203     info->messageSize = totalSize;
204     memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipeBcastInfo), nextChunk, reducedPipe);
205
206     remaining -= reducedPipe;
207     nextChunk += reducedPipe;
208
209     propagate(sendingMsg, true, CmiMyPe(), totalSize, NULL);
210   }
211 }
212
213 void PipeBroadcastConverse::pup(PUP::er &p){
214   Strategy::pup(p);
215   ComlibPrintf("[%d] initial of Pipeconverse pup %s\n",CmiMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
216
217   p | pipeSize;
218   p | topology;
219   p | seqNumber;
220
221   ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d, topology=%d\n",CmiMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize,topology);
222
223   if (p.isUnpacking()) {
224     log_of_2_inv = 1/log((double)2);
225     messageBuf = new CkQ<MessageHolder *>;
226     propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
227   }
228   //p|(*messageBuf);
229   //p|fragments;
230
231 }
232
233 PUPable_def(PipeBroadcastConverse);