Merge branch 'charm' of charmgit:charm into charm
[charm.git] / src / langs / simplemsg / simplemsg.c
1 /*****************************************************************************
2  * $Source$
3  * $Author$
4  * $Date$
5  * $Revision$
6  *****************************************************************************/
7
8 #include <converse.h>
9 #include "simplemsg.h"
10
11 typedef struct CsmMessageStruct *CsmMessage;
12
13 struct CsmMessageStruct
14 {
15   char cmiheader[CmiMsgHeaderSizeBytes];
16   int seqno, size, ntags;
17   int tags[1];
18 };
19
20 CpvStaticDeclare(int, CsmHandlerIndex);
21 CpvStaticDeclare(int *, CsmSeqOut);
22 CpvStaticDeclare(int *, CsmSeqIn);
23 CpvStaticDeclare(CmmTable, CsmMessages);
24 CpvStaticDeclare(CmmTable, CsmSleepers);
25
26 void CsmHandler(m)
27 CsmMessage m;
28 {
29   CthThread t;
30   CmmPut(CpvAccess(CsmMessages), m->ntags, m->tags, m);
31   t = (CthThread)CmmGet(CpvAccess(CsmSleepers), m->ntags, m->tags, (int *)0);
32   if (t) CthAwaken(t);
33 }
34
35 void CsmInit(argv)
36 char **argv;
37 {
38   int *seqout, *seqin; int i;
39
40   seqout = (int *)CmiAlloc(CmiNumPes()*sizeof(int));
41   seqin  = (int *)CmiAlloc(CmiNumPes()*sizeof(int));
42   for (i=0; i<CmiNumPes(); i++) seqout[i] = 0;
43   for (i=0; i<CmiNumPes(); i++) seqin [i] = 0;
44
45   CpvInitialize(int, CsmHandlerIndex);
46   CpvInitialize(int *, CsmSeqOut);
47   CpvInitialize(int *, CsmSeqIn);
48   CpvInitialize(CmmTable, CsmMessages);
49   CpvInitialize(CmmTable, CsmSleepers);
50
51   CpvAccess(CsmHandlerIndex) = CmiRegisterHandler(CsmHandler);
52   CpvAccess(CsmSeqOut) = seqout;
53   CpvAccess(CsmSeqIn)  = seqin;
54   CpvAccess(CsmMessages) = CmmNew();
55   CpvAccess(CsmSleepers) = CmmNew();
56 }
57
58 void CsmTVSend(pe, ntags, tags, buffer, buflen)
59 int pe, ntags;
60 int *tags;
61 void *buffer;
62 int buflen;
63 {
64   int headsize, totsize, i; CsmMessage msg;
65
66   headsize = sizeof(struct CsmMessageStruct) + (ntags*sizeof(int));
67   headsize = ((headsize + 7) & (~7));
68   totsize = headsize + buflen;
69   msg = (CsmMessage)CmiAlloc(totsize);
70   CmiSetHandler(msg, CpvAccess(CsmHandlerIndex));
71   msg->seqno = (CpvAccess(CsmSeqOut)[pe])++;
72   msg->size = buflen;
73   msg->ntags = ntags;
74   for (i=0; i<ntags; i++) msg->tags[i] = tags[i];
75   memcpy((((char *)msg)+headsize), buffer, buflen);
76   CmiSyncSend(pe, totsize, msg);
77 }
78
79 void CsmTVRecv(ntags, tags, buffer, buflen, rtags)
80 int ntags;
81 int *tags;
82 void *buffer;
83 int buflen;
84 int *rtags;
85 {
86   CsmMessage msg; CthThread self;
87   int headsize;
88
89   while (1) {  
90     msg = (CsmMessage)CmmGet(CpvAccess(CsmMessages), ntags, tags, rtags);
91     if (msg) break;
92     self = CthSelf();
93     CmmPut(CpvAccess(CsmSleepers), ntags, tags, self);
94     CthSuspend();
95   }
96   
97   if (msg->size > buflen) buflen = msg->size;
98   headsize = sizeof(struct CsmMessageStruct) + ((ntags-1)*sizeof(int));
99   headsize = ((headsize + 7) & (~7));
100   memcpy(buffer, ((char *)msg)+headsize, buflen);
101   CmiFree(msg);
102   return;
103 }
104