cac63755e7ca86a12de04b7b474739f25f6c1a99
[charm.git] / src / libs / ck-libs / idxl / idxl.C
1 /**
2 IDXL--Index List communication library.
3
4 This is a low-level bare-bones communication library.
5 The basic primitive is an "Index List", a list of user
6 array entries to send and receive.  This basic communication
7 primitive is enough to represent (for example) FEM shared
8 nodes or an FEM ghost layer; the ghost cells in a stencil-based
9 CFD computation, etc.
10
11 Orion Sky Lawlor, olawlor@acm.org, 1/7/2003
12 */
13 #include "idxl.h"
14 #include "charm-api.h"
15
16 void IDXL_Abort(const char *callingRoutine,const char *msg,int m0,int m1,int m2)
17 {
18         char msg1[1024], msg2[1024];
19         sprintf(msg1,msg,m0,m1,m2);
20         sprintf(msg2,"Fatal error in IDXL routine %s:\n%s",callingRoutine,msg1);
21         CkAbort(msg2);
22 }
23
24 CDECL void pupIDXL_Chunk(pup_er cp) {
25         PUP::er &p=*(PUP::er *)cp;
26         IDXL_Chunk *c=(IDXL_Chunk *)TCHARM_Get_global(IDXL_globalID);
27         if (c==NULL) {
28                 c=new IDXL_Chunk((CkMigrateMessage *)0);
29                 TCHARM_Set_global(IDXL_globalID,c,pupIDXL_Chunk);
30         }
31         c->pup(p);
32         if (p.isDeleting()) {
33                 delete c;
34                 /// Zero out our global entry now that we're gone--
35                 ///   this prevents use-after-delete bugs from creeping in.
36                 TCHARM_Set_global(IDXL_globalID,0,pupIDXL_Chunk);
37         }
38 }
39 IDXL_Chunk *IDXL_Chunk::get(const char *callingRoutine) {
40         IDXL_Chunk *c=getNULL();
41         if(!c) IDXL_Abort(callingRoutine,"IDXL is not initialized");
42         return c;
43 }
44
45 CDECL void
46 IDXL_Init(int mpi_comm) {
47         if (!TCHARM_Get_global(IDXL_globalID)) {
48                 IDXL_Chunk *c=new IDXL_Chunk(mpi_comm);
49                 TCHARM_Set_global(IDXL_globalID,c,pupIDXL_Chunk);
50         }
51 }
52 FORTRAN_AS_C(IDXL_INIT,IDXL_Init,idxl_init,  (int *comm), (*comm))
53
54 IDXL_Chunk::IDXL_Chunk(int mpi_comm_) 
55         :mpi_comm(mpi_comm_), currentComm(0)
56 {
57         init();
58 }
59 IDXL_Chunk::IDXL_Chunk(CkMigrateMessage *m) :currentComm(0)
60 {
61         init();
62 }
63 void IDXL_Chunk::init(void) {
64         
65 }
66
67 void IDXL_Chunk::pup(PUP::er &p) {
68         p|mpi_comm;
69
70         p|layouts;
71         if (currentComm && !currentComm->isComplete()){
72           CkPrintf("ERROR: Cannot migrate with ongoing IDXL communication: currentComm=%p ispacking=%d isunpacking=%d\n", currentComm, (int)p.isPacking(), (int)p.isUnpacking());
73           CkAbort("Cannot migrate with ongoing IDXL communication");
74         }       
75
76         //Pack the dynamic IDXLs (static IDXLs must re-add themselves)
77         int i, nDynamic=0;
78         if (!p.isUnpacking()) //Count the number of non-NULL idxls:
79                 for (i=0;i<dynamic_idxls.size();i++) 
80                         if (dynamic_idxls[i]) nDynamic++;
81         p|nDynamic;
82         if (p.isUnpacking()) {
83                 for (int d=0;d<nDynamic;d++) //Loop over non-NULL IDXLs
84                 {
85                         p|i;
86                         dynamic_idxls[i]=new IDXL;
87                         p|*dynamic_idxls[i];
88                 }
89         } else /* packing */ {
90                 for (i=0;i<dynamic_idxls.size();i++) //Loop over non-NULL IDXLs
91                 if (dynamic_idxls[i]) {
92                         p|i;
93                         p|*dynamic_idxls[i];
94                 }
95         }
96 }
97
98 IDXL_Chunk::~IDXL_Chunk() {
99         // we do not delete static_idxls
100         for (int i=0;i<dynamic_idxls.size();i++) 
101                 if (dynamic_idxls[i]) delete dynamic_idxls[i];
102         delete currentComm;
103         currentComm = 0;
104 }
105
106 /****** IDXL List ******/
107 /// Dynamically create a new empty IDXL:
108 IDXL_t IDXL_Chunk::addDynamic(void) 
109 { //Pick the next free dynamic index:
110         IDXL *ret=new IDXL;
111         ret->allocateDual(); //FIXME: add way to allocate single, too
112         return IDXL_DYNAMIC_IDXL_T+storeToFreeIndex(dynamic_idxls,ret);
113 }
114 /// Register this statically-allocated IDXL at this existing index
115 IDXL_t IDXL_Chunk::addStatic(IDXL *idx,IDXL_t at) {
116         if (at!=-1) 
117         { //User provided a (previously returned) index-- try that first
118                 if (at<IDXL_STATIC_IDXL_T || at>=IDXL_LAST_IDXL_T)
119                         CkAbort("Provided bad fixed address to IDXL_Chunk::add!");
120                 int lat=at-IDXL_STATIC_IDXL_T;
121                 while (static_idxls.size()<=lat) static_idxls.push_back(NULL);
122                 if (static_idxls[lat]==NULL)
123                 { /* that slot is free-- re-use the fixed address */
124                         static_idxls[lat]=idx;
125                         return at;
126                 }
127                 else /* idxls[lat]!=NULL, somebody's already there-- fall through */
128                         at=-1;
129         }
130         if (at==-1) { //Pick the next free static index
131                 return IDXL_STATIC_IDXL_T+storeToFreeIndex(static_idxls,idx);
132         }
133         return -1; //<- for whining compilers
134 }
135
136 /// Check this IDXL for validity
137 void IDXL_Chunk::check(IDXL_t at,const char *callingRoutine) const {
138         if (at<IDXL_DYNAMIC_IDXL_T || at>=IDXL_LAST_IDXL_T)
139                         IDXL_Abort(callingRoutine,"Invalid IDXL_t %d",at);
140 }
141 IDXL &IDXL_Chunk::lookup(IDXL_t at,const char *callingRoutine) {
142         IDXL *ret=0;
143         if (at>=IDXL_DYNAMIC_IDXL_T && at<IDXL_DYNAMIC_IDXL_T+dynamic_idxls.size())
144                 ret=dynamic_idxls[at-IDXL_DYNAMIC_IDXL_T];
145         else if (at>=IDXL_STATIC_IDXL_T && at<IDXL_STATIC_IDXL_T+static_idxls.size())
146                 ret=static_idxls[at-IDXL_STATIC_IDXL_T];
147         if (ret==NULL) 
148                 IDXL_Abort(callingRoutine,"Trying to look up invalid IDXL_t %d",at);
149         return *ret;
150 }
151 const IDXL &IDXL_Chunk::lookup(IDXL_t at,const char *callingRoutine) const {
152         IDXL_Chunk *dthis=(IDXL_Chunk *)this; // Call non-const version
153         return dthis->lookup(at,callingRoutine);
154 }
155 void IDXL_Chunk::destroy(IDXL_t at,const char *callingRoutine) {
156         IDXL **ret=NULL;
157         if (at>=IDXL_DYNAMIC_IDXL_T && at<IDXL_DYNAMIC_IDXL_T+dynamic_idxls.size())
158                 ret=&dynamic_idxls[at-IDXL_DYNAMIC_IDXL_T];
159         else if (at>=IDXL_STATIC_IDXL_T && at<IDXL_STATIC_IDXL_T+static_idxls.size())
160                 ret=&static_idxls[at-IDXL_STATIC_IDXL_T];
161         if (ret==NULL)
162                 IDXL_Abort(callingRoutine,"Trying to destroy invalid IDXL_t %d",at);
163         if (*ret==NULL)
164                 IDXL_Abort(callingRoutine,"Trying to destroy already deleted IDXL_t %d",at);
165         if (at<IDXL_STATIC_IDXL_T)
166                 delete *ret; /* only destroy dynamically allocated IDXLs */
167         *ret=NULL;
168 }
169
170 /****** User Datatype list ******/
171
172 /// Get the currently active layout list.
173 IDXL_Layout_List &IDXL_Layout_List::get(void) 
174 {
175         return IDXL_Chunk::get("IDXL_Layouts::get")->layouts; 
176 }
177
178 /**** Messaging logic ***/
179
180 IDXL_Comm_t IDXL_Chunk::addComm(int tag,int context)
181 {
182         if (currentComm && !currentComm->isComplete()) CkAbort("Cannot start two IDXL_Comms at once");
183         if (context==0) context=mpi_comm;
184         if (currentComm==0) 
185                 currentComm=new IDXL_Comm(tag,context);
186         else 
187                 currentComm->reset(tag,context);
188         return 27; //FIXME: there's only one outstanding comm!
189 }
190 IDXL_Comm *IDXL_Chunk::lookupComm(IDXL_Comm_t uc,const char *callingRoutine)
191 {
192         if (uc!=27) CkAbort("Invalid idxl_comm id");
193         return currentComm;
194 }
195
196 IDXL_Comm::IDXL_Comm(int tag_,int context) {
197         reset(tag_,context);
198 }
199 void IDXL_Comm::reset(int tag_,int context) {
200         tag=tag_;
201         if (context==0) comm=MPI_COMM_WORLD;
202         else comm=(MPI_Comm)context; /* silly: not all MPI's use "int" for MPI_Comm */
203         sto.resize(0);
204         nMsgs=0;
205         /* don't delete the msg array, because we want to avoid
206            reallocating its message buffers whenever possible. */
207         msgReq.resize(0);
208         msgSts.resize(0);
209         
210         isPost=false;
211         isDone=false;
212 }
213 IDXL_Comm::~IDXL_Comm() {
214         for (int i=0;i<msg.size();i++)
215                 delete msg[i];
216 }
217
218
219 // prepare to write this field to the message:
220 void IDXL_Comm::send(const IDXL_Side *idx,const IDXL_Layout *dtype,const void *src)
221 {
222         if (isPost) CkAbort("Cannot call IDXL_Comm_send after IDXL_Comm_flush!");
223         sto.push_back(sto_t(idx,dtype,(void *)src,send_t)); 
224 }
225 void IDXL_Comm::recv(const IDXL_Side *idx,const IDXL_Layout *dtype,void *dest)
226
227         if (isPost) CkAbort("Cannot call IDXL_Comm_recv after IDXL_Comm_flush!");
228         sto.push_back(sto_t(idx,dtype,dest,recv_t)); 
229 }
230 void IDXL_Comm::sum(const IDXL_Side *idx,const IDXL_Layout *dtype,void *srcdest)
231
232         if (isPost) CkAbort("Cannot call IDXL_Comm_sum after IDXL_Comm_flush!");
233         sto.push_back(sto_t(idx,dtype,srcdest,sum_t)); 
234 }
235
236 void IDXL_Comm::post(void) {
237         if (isPost) CkAbort("Cannot post the same IDXL_Comm_t more than once");
238         isPost=true;
239         
240         //Post all our sends and receives:
241         nMsgs=0;
242         for (int s=0;s<sto.size();s++) {
243                 const IDXL_Side *idx=sto[s].idx;
244                 const IDXL_Layout *dtype=sto[s].dtype;
245                 for (int ll=0;ll<idx->size();ll++) {
246                         const IDXL_List &l=idx->getLocalList(ll);
247                         
248                         // Create message struct
249                         ++nMsgs;
250                         if (nMsgs>msg.size()) {
251                                 msg.resize(nMsgs);
252                                 msg[nMsgs-1]=new msg_t;
253                         }
254                         msg_t *m=msg[nMsgs-1];
255                         m->sto=&sto[s];
256                         m->ll=ll;
257                         
258                         // Allocate storage for message data
259                         int len=l.size()*dtype->compressedBytes();
260                         m->allocate(len);
261                         
262                         // Copy data and post MPI request
263                         MPI_Request req;
264                         switch (sto[s].op) {
265                         case send_t:
266                                 sto[s].dtype->gather(l.size(),l.getVec(),sto[s].data,m->getBuf());
267                                 MPI_Isend(m->getBuf(),len,MPI_BYTE,l.getDest(),tag,comm,&req);
268                                 break;
269                         case recv_t:case sum_t:
270                                 MPI_Irecv(m->getBuf(),len,MPI_BYTE,l.getDest(),tag,comm,&req);
271                                 break;
272                         };
273                         msgReq.push_back(req);
274                 }
275         }
276 }
277
278 void IDXL_Comm::wait(void) {
279         if (!isPosted()) post();
280         CkAssert(msg.size()>=nMsgs);
281         CkAssert(msgReq.size()==nMsgs);
282         if (nMsgs == 0) { isDone=true; return; }
283         msgSts.resize(nMsgs);
284         MPI_Waitall(nMsgs,&msgReq[0],&msgSts[0]);
285         //Process all received messages:
286         for (int im=0;im<nMsgs;im++) {
287                 msg_t *m=msg[im];
288                 sto_t *s=m->sto;
289                 const IDXL_List &l=s->idx->getLocalList(m->ll);
290                 switch (s->op) {
291                 case send_t: /* nothing else to do */
292                         break;
293                 case recv_t:
294                         s->dtype->scatter(l.size(),l.getVec(),m->getBuf(),s->data);
295                         break;
296                 case sum_t:
297                         s->dtype->scatteradd(l.size(),l.getVec(),m->getBuf(),s->data);
298                         break;
299                 };
300         }
301         isDone=true;
302 }
303
304 void IDXL_Chunk::waitComm(IDXL_Comm *comm)
305 {
306         comm->wait();
307 }
308