new converse strategy for message pipelining (point-to-point communication)
[charm.git] / src / conv-com / pipelinestrategy.C
1 #include <math.h>
2 #include "pipelinestrategy.h"
3
4 inline int log_of_2 (int i) {
5   int m;
6   for (m=0; i>(1<<m); ++m);
7   return m;
8 }
9
10 //PipelineHashKey CODE
11 int PipelineHashKey::staticCompare(const void *k1,const void *k2,size_t ){
12     return ((const PipelineHashKey *)k1)->
13                 compare(*(const PipelineHashKey *)k2);
14 }
15
16 CkHashCode PipelineHashKey::staticHash(const void *v,size_t){
17     return ((const PipelineHashKey *)v)->hash();
18 }
19
20 void PipelineStrategy::commonInit(){
21   //log_of_2_inv = 1/log((double)2);
22   seqNumber = 0;
23 }
24
25 //extern void propagate_handler(void *);
26
27 void deliver_handler(void *message) {
28   int instid = CmiGetXHandler(message);
29   PipelineStrategy *myStrategy = (PipelineStrategy*)ConvComlibGetStrategy(instid);
30   ComlibPrintf("[%d] propagate_handler_frag: calling on instid %d %x\n",CkMyPe(),instid,myStrategy);
31   //CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
32   PipelineInfo *info = (PipelineInfo*)(((char*)message)+CmiReservedHeaderSize);
33   myStrategy->storing((char*)message);
34 }
35
36 void PipelineStrategy::storing(char* fragment) {
37   char *complete;
38   int isFinished=0;
39   int totalDimension;
40   //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
41
42   // store the fragment in the hash table until completed
43   ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
44   PipelineInfo *info = (PipelineInfo*)(fragment+CmiReservedHeaderSize);
45
46   PipelineHashKey key (info->bcastPe, info->seqNumber);
47   PipelineHashObj *position = fragments.get(key);
48
49   char *incomingMsg;
50   if (position) {
51     // the message already exist, add to it
52     ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
53     incomingMsg = position->message;
54     memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipelineInfo), info->chunkSize);
55
56     if (--position->remaining == 0) {  // message completely received
57       isFinished = 1;
58       complete = incomingMsg;
59       totalDimension = position->dimension;
60       // delete from the hash table
61       fragments.remove(key);
62     }
63
64   } else {
65     // the message doesn't exist, create it
66     ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
67     incomingMsg = (char*)CmiAlloc(info->messageSize);
68     memcpy (incomingMsg, fragment, CmiReservedHeaderSize);
69     memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipelineInfo), info->chunkSize);
70     int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo)))-1;
71     if (remaining) {  // more than one chunk (it was not forced to be splitted)
72       PipelineHashObj *object = new PipelineHashObj(info->messageSize, remaining, incomingMsg);
73       fragments.put(key) = object;
74     } else {  // only one chunk, it was forces to be splitted
75       isFinished = 1;
76       complete = incomingMsg;
77       // nothing to delete from fragments since nothing has been added
78     }
79   }
80   CmiFree(fragment);
81
82   if (isFinished) {
83     higherLevel->deliverer(complete, totalDimension);
84   }
85 }
86
87 void PipelineStrategy::deliverer(char *msg, int dimension) {
88   ComlibPrintf("{%d} dest = %d, %d, %x\n",CkMyPe(),destinationHandler, dimension,CmiHandlerToInfo(destinationHandler).hdlr);
89   if (destinationHandler) {
90     CmiSetHandler(msg, destinationHandler);
91     CmiSyncSendAndFree(CkMyPe(), dimension, msg);
92   } else {
93     CmiPrintf("[%d] Pipelined Broadcast: message not delivered since destination not set!");
94   }
95 }
96
97 PipelineStrategy::PipelineStrategy(int _pipeSize, Strategy *parent) : Strategy(), pipeSize(_pipeSize) {
98   if (parent) higherLevel = parent;
99   else higherLevel = this;
100   seqNumber = 0;
101   messageBuf = new CkQ<MessageHolder *>;
102   //if (!parent) propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
103   ComlibPrintf("init: %d (%x)\n",pipeSize,this);
104   //if (!parent) ComlibPrintf("[%d] registered handler fragmented to %d\n",CkMyPe(),propagateHandle_frag);
105 }
106
107 void PipelineStrategy::insertMessage(MessageHolder *cmsg){
108   ComlibPrintf("[%d] Pipelined Broadcast with converse strategy\n",CkMyPe());
109   messageBuf->enq(cmsg);
110   doneInserting();
111 }
112
113 void PipelineStrategy::doneInserting(){
114   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
115   while (!messageBuf->isEmpty()) {
116     MessageHolder *cmsg = messageBuf->deq();
117     // modify the Handler to deliver the message to the propagator
118     char *env = cmsg->getMessage();
119     //CmiSetHandler(env, deliverHandle);
120     conversePipeline(env, cmsg->getSize(), cmsg->dest_proc);
121     delete cmsg;
122     //conversePipeline(env, env->getTotalsize(), false);
123   }
124 }
125
126 // routine for interfacing with converse.
127 // Require only the converse reserved header if forceSplit is true
128 void PipelineStrategy::conversePipeline(char *env, int totalSize, int destination) {
129   // set the instance ID to be used by the receiver using the XHandler variable
130   CmiSetXHandler(env, myInstanceID);
131
132   ++seqNumber;
133   // message doesn't fit into the pipe: split it into chunks and propagate them individually
134   ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CkMyPe(),totalSize);
135
136   char *sendingMsg;
137   char *nextChunk = env;//+CmiReservedHeaderSize;
138   int remaining = totalSize;//-CmiReservedHeaderSize;
139   int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo);
140   int sendingMsgSize;
141   ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipelineInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipelineInfo));
142   ComlibPrintf("sending %d chunks of size %d, total=%d to handle %d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining,deliverHandle);
143   //CmiSetHandler(env, deliverHandle);
144   ComlibPrintf("setting env handler to %d\n",deliverHandle);
145   for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
146     sendingMsgSize = reducedPipe<remaining? pipeSize : remaining+CmiReservedHeaderSize+sizeof(PipelineInfo);
147     sendingMsg = (char*)CmiAlloc(sendingMsgSize);
148     //memcpy (sendingMsg, env, CmiReservedHeaderSize);
149     CmiSetHandler(sendingMsg, deliverHandle);
150     PipelineInfo *info = (PipelineInfo*)(sendingMsg+CmiReservedHeaderSize);
151     info->srcPe = CkMyPe();
152     info->bcastPe = CkMyPe();
153     info->seqNumber = seqNumber;
154     info->chunkNumber = i;
155     info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
156     info->messageSize = totalSize;
157     memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipelineInfo), nextChunk, info->chunkSize);
158
159     remaining -= info->chunkSize;
160     nextChunk += info->chunkSize;
161
162     //propagate(sendingMsg, true, CkMyPe(), sendingMsgSize, NULL);
163     CmiSyncSendAndFree(destination, sendingMsgSize, sendingMsg);
164   }
165   CmiFree(env);
166 }
167
168 void PipelineStrategy::pup(PUP::er &p){
169   Strategy::pup(p);
170   ComlibPrintf("[%d] initial of Pipeconverse pup %s\n",CkMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
171
172   p | pipeSize;
173   p | seqNumber;
174
175   ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize);
176
177   if (p.isUnpacking()) {
178     //log_of_2_inv = 1/log((double)2);
179     messageBuf = new CkQ<MessageHolder *>;
180     deliverHandle = CmiRegisterHandler((CmiHandler)deliver_handler);
181     //propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
182     //ComlibPrintf("[%d] registered handler to %d\n",CkMyPe(),deliverHandle);
183   }
184   if (p.isPacking()) {
185     delete messageBuf;
186   }
187   //p|(*messageBuf);
188   //p|fragments;
189
190 }
191
192 PUPable_def(PipelineStrategy);