change Ckpv to Cpv in conv-com
[charm.git] / src / conv-com / pipelinestrategy.C
1 // #ifdef filippo
2
3 // #include <math.h>
4 // #include "pipelinestrategy.h"
5
6 // inline int log_of_2 (int i) {
7 //   int m;
8 //   for (m=0; i>(1<<m); ++m);
9 //   return m;
10 // }
11
12 // //PipelineHashKey CODE
13 // int PipelineHashKey::staticCompare(const void *k1,const void *k2,size_t ){
14 //     return ((const PipelineHashKey *)k1)->
15 //                 compare(*(const PipelineHashKey *)k2);
16 // }
17
18 // CkHashCode PipelineHashKey::staticHash(const void *v,size_t){
19 //     return ((const PipelineHashKey *)v)->hash();
20 // }
21
22 // void PipelineStrategy::commonInit(){
23 //   //log_of_2_inv = 1/log((double)2);
24 //   seqNumber = 0;
25 // }
26
27 // //extern void propagate_handler(void *);
28
29 // void deliver_handler(void *message) {
30 //   int instid = CmiGetXHandler(message);
31 //   PipelineStrategy *myStrategy = (PipelineStrategy*)ConvComlibGetStrategy(instid);
32 //   ComlibPrintf("[%d] propagate_handler_frag: calling on instid %d %x\n",CkMyPe(),instid,myStrategy);
33 //   //CProxy_ComlibManager(CkpvAccess(cmgrID)).ckLocalBranch()->getStrategy(instid);
34 //   PipelineInfo *info = (PipelineInfo*)(((char*)message)+CmiReservedHeaderSize);
35 //   myStrategy->storing((char*)message);
36 // }
37
38 // void PipelineStrategy::storing(char* fragment) {
39 //   char *complete;
40 //   int isFinished=0;
41 //   int totalDimension;
42 //   //ComlibPrintf("isArray = %d\n", (getType() == ARRAY_STRATEGY));
43
44 //   // store the fragment in the hash table until completed
45 //   ComlibPrintf("[%d] deliverer: received fragmented message, storing\n",CkMyPe());
46 //   PipelineInfo *info = (PipelineInfo*)(fragment+CmiReservedHeaderSize);
47
48 //   PipelineHashKey key (info->bcastPe, info->seqNumber);
49 //   PipelineHashObj *position = fragments.get(key);
50
51 //   char *incomingMsg;
52 //   if (position) {
53 //     // the message already exist, add to it
54 //     ComlibPrintf("[%d] adding to an existing message for id %d/%d (%d remaining)\n",CkMyPe(),info->bcastPe,info->seqNumber,position->remaining-1);
55 //     incomingMsg = position->message;
56 //     memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipelineInfo), info->chunkSize);
57
58 //     if (--position->remaining == 0) {  // message completely received
59 //       isFinished = 1;
60 //       complete = incomingMsg;
61 //       totalDimension = position->dimension;
62 //       // delete from the hash table
63 //       fragments.remove(key);
64 //     }
65
66 //   } else {
67 //     // the message doesn't exist, create it
68 //     ComlibPrintf("[%d] creating new message of size %d for id %d/%d; chunk=%d chunkSize=%d\n",CkMyPe(),info->messageSize,info->bcastPe,info->seqNumber,info->chunkNumber,info->chunkSize);
69 //     incomingMsg = (char*)CmiAlloc(info->messageSize);
70 //     memcpy (incomingMsg, fragment, CmiReservedHeaderSize);
71 //     memcpy (incomingMsg+CmiReservedHeaderSize+((pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo))*info->chunkNumber), fragment+CmiReservedHeaderSize+sizeof(PipelineInfo), info->chunkSize);
72 //     int remaining = (int)ceil((double)info->messageSize/(pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo)))-1;
73 //     if (remaining) {  // more than one chunk (it was not forced to be splitted)
74 //       PipelineHashObj *object = new PipelineHashObj(info->messageSize, remaining, incomingMsg);
75 //       fragments.put(key) = object;
76 //     } else {  // only one chunk, it was forces to be splitted
77 //       isFinished = 1;
78 //       complete = incomingMsg;
79 //       // nothing to delete from fragments since nothing has been added
80 //     }
81 //   }
82 //   CmiFree(fragment);
83
84 //   if (isFinished) {
85 //     higherLevel->deliverer(complete, totalDimension);
86 //   }
87 // }
88
89 // void PipelineStrategy::deliverer(char *msg, int dimension) {
90 //   ComlibPrintf("{%d} dest = %d, %d, %x\n",CkMyPe(),destinationHandler, dimension,CmiHandlerToInfo(destinationHandler).hdlr);
91 //   if (destinationHandler) {
92 //     CmiSetHandler(msg, destinationHandler);
93 //     CmiSyncSendAndFree(CkMyPe(), dimension, msg);
94 //   } else {
95 //     CmiPrintf("[%d] Pipelined Broadcast: message not delivered since destination not set!");
96 //   }
97 // }
98
99 // PipelineStrategy::PipelineStrategy(int _pipeSize, Strategy *parent) : Strategy(), pipeSize(_pipeSize) {
100 //   if (parent) higherLevel = parent;
101 //   else higherLevel = this;
102 //   seqNumber = 0;
103 //   messageBuf = new CkQ<MessageHolder *>;
104 //   //if (!parent) propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
105 //   ComlibPrintf("init: %d (%x)\n",pipeSize,this);
106 //   //if (!parent) ComlibPrintf("[%d] registered handler fragmented to %d\n",CkMyPe(),propagateHandle_frag);
107 // }
108
109 // void PipelineStrategy::insertMessage(MessageHolder *cmsg){
110 //   ComlibPrintf("[%d] Pipelined Broadcast with converse strategy\n",CkMyPe());
111 //   messageBuf->enq(cmsg);
112 //   doneInserting();
113 // }
114
115 // void PipelineStrategy::doneInserting(){
116 //   ComlibPrintf("[%d] DoneInserting\n",CkMyPe());
117 //   while (!messageBuf->isEmpty()) {
118 //     MessageHolder *cmsg = messageBuf->deq();
119 //     // modify the Handler to deliver the message to the propagator
120 //     char *env = cmsg->getMessage();
121 //     //CmiSetHandler(env, deliverHandle);
122 //     conversePipeline(env, cmsg->getSize(), cmsg->dest_proc);
123 //     delete cmsg;
124 //     //conversePipeline(env, env->getTotalsize(), false);
125 //   }
126 // }
127
128 // // routine for interfacing with converse.
129 // // Require only the converse reserved header if forceSplit is true
130 // void PipelineStrategy::conversePipeline(char *env, int totalSize, int destination) {
131 //   // set the instance ID to be used by the receiver using the XHandler variable
132 //   CmiSetXHandler(env, myInstanceID);
133
134 //   ++seqNumber;
135 //   // message doesn't fit into the pipe: split it into chunks and propagate them individually
136 //   ComlibPrintf("[%d] Propagating message in multiple chunks (totalsize=%d)\n",CkMyPe(),totalSize);
137
138 //   char *sendingMsg;
139 //   char *nextChunk = env;//+CmiReservedHeaderSize;
140 //   int remaining = totalSize;//-CmiReservedHeaderSize;
141 //   int reducedPipe = pipeSize-CmiReservedHeaderSize-sizeof(PipelineInfo);
142 //   int sendingMsgSize;
143 //   ComlibPrintf("reducedPipe = %d, CmiReservedHeaderSize = %d, sizeof(PipelineInfo) = %d\n",reducedPipe,CmiReservedHeaderSize,sizeof(PipelineInfo));
144 //   ComlibPrintf("sending %d chunks of size %d, total=%d to handle %d\n",(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe),reducedPipe,remaining,deliverHandle);
145 //   //CmiSetHandler(env, deliverHandle);
146 //   ComlibPrintf("setting env handler to %d\n",deliverHandle);
147 //   for (int i=0; i<(int)ceil(((double)totalSize-CmiReservedHeaderSize)/reducedPipe); ++i) {
148 //     sendingMsgSize = reducedPipe<remaining? pipeSize : remaining+CmiReservedHeaderSize+sizeof(PipelineInfo);
149 //     sendingMsg = (char*)CmiAlloc(sendingMsgSize);
150 //     //memcpy (sendingMsg, env, CmiReservedHeaderSize);
151 //     CmiSetHandler(sendingMsg, deliverHandle);
152 //     PipelineInfo *info = (PipelineInfo*)(sendingMsg+CmiReservedHeaderSize);
153 //     info->srcPe = CkMyPe();
154 //     info->bcastPe = CkMyPe();
155 //     info->seqNumber = seqNumber;
156 //     info->chunkNumber = i;
157 //     info->chunkSize = reducedPipe<remaining ? reducedPipe : remaining;
158 //     info->messageSize = totalSize;
159 //     memcpy (sendingMsg+CmiReservedHeaderSize+sizeof(PipelineInfo), nextChunk, info->chunkSize);
160
161 //     remaining -= info->chunkSize;
162 //     nextChunk += info->chunkSize;
163
164 //     //propagate(sendingMsg, true, CkMyPe(), sendingMsgSize, NULL);
165 //     CmiSyncSendAndFree(destination, sendingMsgSize, sendingMsg);
166 //   }
167 //   CmiFree(env);
168 // }
169
170 // void PipelineStrategy::pup(PUP::er &p){
171 //   Strategy::pup(p);
172 //   ComlibPrintf("[%d] initial of Pipeconverse pup %s\n",CkMyPe(),(p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"));
173
174 //   p | pipeSize;
175 //   p | seqNumber;
176
177 //   ComlibPrintf("[%d] PipeBroadcast converse pupping %s, size=%d\n",CkMyPe(), (p.isPacking()==0)?(p.isUnpacking()?"UnPacking":"sizer"):("Packing"),pipeSize);
178
179 //   if (p.isUnpacking()) {
180 //     //log_of_2_inv = 1/log((double)2);
181 //     messageBuf = new CkQ<MessageHolder *>;
182 //     deliverHandle = CmiRegisterHandler((CmiHandler)deliver_handler);
183 //     //propagateHandle_frag = CmiRegisterHandler((CmiHandler)propagate_handler_frag);
184 //     //ComlibPrintf("[%d] registered handler to %d\n",CkMyPe(),deliverHandle);
185 //   }
186 //   if (p.isPacking()) {
187 //     delete messageBuf;
188 //   }
189 //   //p|(*messageBuf);
190 //   //p|fragments;
191
192 // }
193
194 // PUPable_def(PipelineStrategy);
195
196 // #endif