439b2c0b3e6e49386fad7cb34a5a96212b5a8834
[charm.git] / src / langs / simplemsg / simplemsg.c
1 #include <converse.h>
2 #include "simplemsg.h"
3
4 typedef struct CsmMessageStruct *CsmMessage;
5
6 struct CsmMessageStruct
7 {
8   char cmiheader[CmiMsgHeaderSizeBytes];
9   int seqno, size, ntags;
10   int tags[1];
11 };
12
13 CpvStaticDeclare(int, CsmHandlerIndex);
14 CpvStaticDeclare(int *, CsmSeqOut);
15 CpvStaticDeclare(int *, CsmSeqIn);
16 CpvStaticDeclare(CmmTable, CsmMessages);
17 CpvStaticDeclare(CmmTable, CsmSleepers);
18
19 void CsmHandler(m)
20 CsmMessage m;
21 {
22   CthThread t;
23   CmiGrabBuffer(&m);
24   CmmPut(CpvAccess(CsmMessages), m->ntags, m->tags, m);
25   t = (CthThread)CmmGet(CpvAccess(CsmSleepers), m->ntags, m->tags, (int *)0);
26   if (t) CthAwaken(t);
27 }
28
29 void CsmInit(argv)
30 char **argv;
31 {
32   int *seqout, *seqin; int i;
33
34   seqout = (int *)CmiAlloc(CmiNumPes()*sizeof(int));
35   seqin  = (int *)CmiAlloc(CmiNumPes()*sizeof(int));
36   for (i=0; i<CmiNumPes(); i++) seqout[i] = 0;
37   for (i=0; i<CmiNumPes(); i++) seqin [i] = 0;
38
39   CpvInitialize(int, CsmHandlerIndex);
40   CpvInitialize(int *, CsmSeqOut);
41   CpvInitialize(int *, CsmSeqIn);
42   CpvInitialize(CmmTable, CsmMessages);
43   CpvInitialize(CmmTable, CsmSleepers);
44
45   CpvAccess(CsmHandlerIndex) = CmiRegisterHandler(CsmHandler);
46   CpvAccess(CsmSeqOut) = seqout;
47   CpvAccess(CsmSeqIn)  = seqin;
48   CpvAccess(CsmMessages) = CmmNew();
49   CpvAccess(CsmSleepers) = CmmNew();
50 }
51
52 void CsmTVSend(pe, ntags, tags, buffer, buflen)
53 int pe, ntags;
54 int *tags;
55 void *buffer;
56 int buflen;
57 {
58   int headsize, totsize, i; CsmMessage msg;
59
60   headsize = sizeof(struct CsmMessageStruct) + (ntags*sizeof(int));
61   headsize = ((headsize + 7) & (~7));
62   totsize = headsize + buflen;
63   msg = (CsmMessage)CmiAlloc(totsize);
64   CmiSetHandler(msg, CpvAccess(CsmHandlerIndex));
65   msg->seqno = (CpvAccess(CsmSeqOut)[pe])++;
66   msg->size = buflen;
67   msg->ntags = ntags;
68   for (i=0; i<ntags; i++) msg->tags[i] = tags[i];
69   memcpy((((char *)msg)+headsize), buffer, buflen);
70   CmiSyncSend(pe, totsize, msg);
71 }
72
73 void CsmTVRecv(ntags, tags, buffer, buflen, rtags)
74 int ntags;
75 int *tags;
76 void *buffer;
77 int buflen;
78 int *rtags;
79 {
80   CsmMessage msg; CthThread self;
81   int headsize;
82
83   while (1) {  
84     msg = (CsmMessage)CmmGet(CpvAccess(CsmMessages), ntags, tags, rtags);
85     if (msg) break;
86     self = CthSelf();
87     CmmPut(CpvAccess(CsmSleepers), ntags, tags, self);
88     CthSuspend();
89   }
90   
91   if (msg->size > buflen) buflen = msg->size;
92   headsize = sizeof(struct CsmMessageStruct) + ((ntags-1)*sizeof(int));
93   headsize = ((headsize + 7) & (~7));
94   memcpy(buffer, ((char *)msg)+headsize, buflen);
95   CmiFree(msg);
96   return;
97 }
98