Charj: Construct Range when a range is given.
[charm.git] / src / langs / pvmc / pvmc_comm.c
1 #include <stddef.h>
2 #include "converse.h"
3 #include "pvmc.h"
4
5 CpvStaticDeclare(CmmTable,seq_table);
6
7 CpvStaticDeclare(int,pvmc_control_handler);
8 CpvStaticDeclare(int,pvmc_msg_handler);
9 CpvStaticDeclare(int*,send_seq_num);
10 CpvStaticDeclare(int*,recv_seq_num);
11
12 CpvExtern(int,pvmc_barrier_num);
13 CpvExtern(int,pvmc_at_barrier_num);
14
15 typedef struct msg_hdr_struct {
16   char handler[CmiMsgHeaderSizeBytes];
17   int sender;
18   unsigned int seq_num;
19 } msg_hdr;
20
21 typedef struct control_msg_struct {
22   char handler[CmiMsgHeaderSizeBytes];
23   int type;
24 } control_msg;
25
26 static void pvmc_control_handler_func();
27 static void pvmc_msg_handler_func();
28
29 void pvmc_init_comm(void)
30 {
31   int i;
32
33 #ifdef PVM_DEBUG
34   PRINTF("Pe(%d) tid=%d:pvm_init_comm()\n",MYPE(),pvm_mytid());
35 #endif
36
37   CpvInitialize(CmmTable,seq_table);
38   CpvAccess(seq_table) = CmmNew();
39
40   CpvInitialize(int,pvmc_control_handler);
41   CpvAccess(pvmc_control_handler)=
42     CmiRegisterHandler(pvmc_control_handler_func);
43   
44   CpvInitialize(int,pvmc_msg_handler);
45   CpvAccess(pvmc_msg_handler)=CmiRegisterHandler(pvmc_msg_handler_func);
46
47   CpvInitialize(int*,recv_seq_num);
48   CpvAccess(recv_seq_num)=MALLOC(CmiNumPes()*sizeof(int));
49
50   if (CpvAccess(recv_seq_num)==NULL) {
51     PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_comm() can't allocate seq buffer\n",
52            MYPE(),pvm_mytid(),__FILE__,__LINE__);
53     exit(1);
54   }
55   for(i=0; i<CmiNumPes(); i++)
56     CpvAccess(recv_seq_num)=0;
57
58   CpvInitialize(int*,send_seq_num);
59   CpvAccess(send_seq_num)=MALLOC(CmiNumPes()*sizeof(int));
60
61   if (CpvAccess(send_seq_num)==NULL) {
62     PRINTF("Pe(%d) tid=%d:%s:%d pvmc_init_comm() can't allocate seq buffer\n",
63            MYPE(),pvm_mytid(),__FILE__,__LINE__);
64     exit(1);
65   }
66   for(i=0; i<CmiNumPes(); i++)
67     CpvAccess(send_seq_num)[i]=0;
68
69 }
70
71 void pvmc_send_control_msg(int type, int pe)
72 {
73   control_msg *msg;
74
75   msg=CmiAlloc(sizeof(control_msg));
76   msg->type=type;
77   CmiSetHandler(msg,CpvAccess(pvmc_control_handler));
78   CmiSyncSendAndFree(pe,sizeof(control_msg),msg);
79 }
80
81 static void pvmc_control_handler_func(control_msg *msg)
82 {
83   switch (msg->type)  {
84   case PVMC_CTRL_AT_BARRIER:
85     CpvAccess(pvmc_at_barrier_num)++;
86     break;
87   case PVMC_CTRL_THROUGH_BARRIER:
88     CpvAccess(pvmc_barrier_num)++;
89     break;
90   case PVMC_CTRL_KILL:
91     ConverseExit();
92     exit(0);
93     break;
94   default:
95     PRINTF("WARNING: %s:%d, Illegal control message\n",__FILE__,__LINE__);
96   }
97 }
98
99 static void pvmc_msg_handler_func(void *msg)
100 {
101   int seq_num;
102   int sender;
103   int pvm_tag;
104   int tags[2];
105   int rtags[2];
106
107   sender=((msg_hdr *)msg)->sender;
108   seq_num=((msg_hdr *)msg)->seq_num;
109   
110   tags[0]=sender;
111   tags[1]=pvmc_gettag((char *)msg+sizeof(msg_hdr));
112   CmmPut(CpvAccess(seq_table),2,tags,msg);
113 }
114
115 int pvm_kill(int tid)
116 {
117   control_msg *exit_msg;
118   
119 #ifdef PVM_DEBUG
120   PRINTF("Pe(%d) tid=%d:pvm_kill(%d)\n",
121         MYPE(),pvm_mytid(),tid);
122 #endif
123   pvmc_send_control_msg(PVMC_CTRL_KILL,TID2PE(tid));
124 }
125
126 int pvm_send(int pvm_tid, int tag)
127 {
128   void *msg;
129   int msg_sz, conv_tid, conv_tag;
130   
131 #ifdef PVM_DEBUG
132   PRINTF("Pe(%d) tid=%d:pvm_send(%d,%d)\n",
133         MYPE(),pvm_mytid(),pvm_tid,tag);
134 #endif
135   pvmc_settidtag(pvm_mytid(),tag);
136
137   if ((pvm_tid<1) || ( pvm_tid > CmiNumPes())) {
138     PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() illegal tid %d\n",
139            MYPE(),pvm_mytid(),__FILE__,__LINE__,pvm_tid);
140     return -1;
141   } else conv_tid = pvm_tid-1;
142
143   if (tag<0) {
144     PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() illegal tag\n",
145            MYPE(),pvm_mytid(),__FILE__,__LINE__);
146     return -1;
147   } else conv_tag = tag;
148
149   msg_sz = sizeof(msg_hdr)+pvmc_sendmsgsz();
150   msg = CmiAlloc(msg_sz);
151
152   if (msg==NULL) {
153     PRINTF("Pe(%d) tid=%d:%s:%d pvm_send() can't alloc msg buffer\n",
154            MYPE(),pvm_mytid(),__FILE__,__LINE__);
155     return -1;
156   }
157
158   CmiSetHandler(msg,CpvAccess(pvmc_msg_handler));
159   ((msg_hdr *)msg)->sender=MYPE();
160   ((msg_hdr *)msg)->seq_num=CpvAccess(send_seq_num)[conv_tid];
161   CpvAccess(send_seq_num)[conv_tid]++;
162   
163   pvmc_packmsg((char *)msg + (int)sizeof(msg_hdr));
164   CmiSyncSendAndFree(conv_tid,msg_sz,msg);
165   return 0;
166 }
167
168 int pvm_mcast(int *tids, int ntask, int msgtag)
169 {
170   int i;
171
172 #ifdef PVM_DEBUG
173   PRINTF("Pe(%d) tid=%d:pvm_mcast(%x,%d,%d)\n",
174         MYPE(),pvm_mytid(),tids,ntask,msgtag);
175 #endif
176   for(i=0;i<ntask;i++)
177     pvm_send(tids[i],msgtag);
178 }
179
180 int pvm_nrecv(int tid, int tag)
181 {
182   int conv_tid, conv_tag;
183   void *msg;
184   int tags[2];
185   int rtags[2];
186   int sender, seq_num;
187   int rbuf;
188
189 #ifdef PVM_DEBUG
190   PRINTF("Pe(%d) tid=%d:pvm_nrecv(%d,%d)\n",
191         MYPE(),pvm_mytid(),tid,tag);
192 #endif
193
194   if (tid==-1)
195     conv_tid=CmmWildCard;
196   else conv_tid=tid-1;
197
198   if (tag==-1)
199     conv_tag=CmmWildCard;
200   else conv_tag=tag;
201
202   /*
203    * Empty messages from machine layer.
204    */
205
206   while(CmiDeliverMsgs(1)==0)
207     ;
208
209   /*
210    *  See if the message is already in the tag table and extract it.
211    */
212   
213   tags[0]=conv_tid;
214   tags[1]=conv_tag;
215   msg=CmmGet(CpvAccess(seq_table),2,tags,rtags);
216   if (msg!=NULL) {
217     sender = rtags[0];
218     /*
219     seq_num = CpvAccess(recv_seq_num)[sender];
220
221     if ((((msg_hdr *)msg)->seq_num) != seq_num)
222       PRINTF("tid=%d:%s:%d pvm_recv() seq number mismatch, I'm confused\n",
223              tid,__FILE__,__LINE__);
224     else CpvAccess(recv_seq_num)[sender]++;
225     */
226
227
228     rbuf=pvm_setrbuf(pvm_mkbuf(PvmDataRaw));
229     if (rbuf > 0)
230       {
231 #ifdef PVM_DEBUG
232       PRINTF("Pe(%d) tid=%d:%s:%d pvm_nrecv() says pvm_setrbuf=%d\n",
233         MYPE(),tid,__FILE__,__LINE__,rbuf);
234 #endif
235       pvm_freebuf(rbuf);
236       }
237     pvmc_unpackmsg(msg,(char *)msg+sizeof(msg_hdr));
238
239 #ifdef PVM_DEBUG
240     PRINTF("Pe(%d) tid=%d:%s:%d pvm_nrecv() returning pvm_getrbuf()=%d\n",
241         MYPE(),tid,__FILE__,__LINE__,pvm_getrbuf());
242 #endif
243     return pvm_getrbuf();
244   }
245   else return 0;  /* Non blocking receive returns immediately. */
246 }
247
248 int pvm_recv(int tid, int tag)
249 {
250   int bufid=0;
251
252 #ifdef PVM_DEBUG
253   PRINTF("Pe(%d) tid=%d:pvm_recv(%d,%d)\n",
254         MYPE(),pvm_mytid(),tid,tag);
255 #endif
256   while (bufid==0)
257     bufid=pvm_nrecv(tid,tag);
258
259 #ifdef PVM_DEBUG
260   PRINTF("Pe(%d) tid=%d:pvm_recv(%d,%d) returning %d\n",
261         MYPE(),pvm_mytid(),tid,tag,bufid);
262 #endif
263
264   return bufid;
265 }
266
267 int pvm_probe(int tid, int tag)
268 {
269   int conv_tid, conv_tag;
270   void *msg;
271   int tags[2];
272   int rtags[2];
273   int sender, seq_num;
274
275 #ifdef PVM_DEBUG
276   PRINTF("Pe(%d) tid=%d:pvm_probe(%d,%d)\n",
277         MYPE(),pvm_mytid(),tid,tag);
278 #endif
279   if (tid==-1)
280     conv_tid=CmmWildCard;
281   else conv_tid=tid;
282
283   if (tag==-1)
284     conv_tag=CmmWildCard;
285   else conv_tag=tag;
286
287   /*
288    * Empty messages from machine layer.
289    */
290
291   while(CmiDeliverMsgs(1)==0)
292     ;
293
294   /*
295    *  See if the message is already in the tag table
296    */
297   
298   tags[0]=conv_tid;
299   tags[1]=conv_tag;
300   msg=CmmProbe(CpvAccess(seq_table),2,tags,rtags);
301   if (msg!=NULL) {
302     /*
303     sender = rtag[0];
304     seq_num = CpvAccess(recv_seq_num)[sender];
305
306     if ((((msg_hdr *)msg)->seq_num) != seq_num)
307       PRINTF("Pe(%d) tid=%d:%s:%d pvm_recv() seq num mismatch, I'm confused\n",
308              MYPE(),pvm_mytid(),__FILE__,__LINE__);
309     else CpvAccess(recv_seq_num)[sender]++;
310     */
311
312   /*
313    * We will just unpack the message, so bufinfo works, but this
314    * should really just set up what bufinfo needs and unpack the
315    * rest later
316    */
317     pvmc_unpackmsg(msg,(char *)msg+sizeof(msg_hdr));
318
319
320 #ifdef PVM_DEBUG
321     PRINTF("Pe(%d) tid=%d:%s:%d pvm_probe() returning pvm_getrbuf()=%d\n",
322         MYPE(),tid,__FILE__,__LINE__,pvm_getrbuf());
323 #endif
324     return pvm_getrbuf();
325   }
326   else return 0;  /* Probe returns immediately. */
327 }