charj: leanmd update
[charm.git] / src / langs / streams / Communicate.C
1 #include <string.h>
2 #include "Communicate.h"
3 #include "MStream.h"
4
5 CpvStaticDeclare(CmmTable, CsmMessages);
6
7 static void CsmHandler(void *msg)
8 {
9   // get start of user message
10   int *m = (int *) ((char *)msg+CmiMsgHeaderSizeBytes);
11   // sending node  & tag act as tags
12   CmmPut(CpvAccess(CsmMessages), 2, m, msg);
13 }
14
15 Communicate::Communicate(void) 
16 {
17   CpvInitialize(CmmTable, CsmMessages);
18   CsmHandlerIndex = CmiRegisterHandler((CmiHandler) CsmHandler);
19   CpvAccess(CsmMessages) = CmmNew();
20 }
21
22
23 Communicate::~Communicate(void) 
24 {
25   // do nothing
26 }
27
28 MIStream *Communicate::newInputStream(int PE, int tag)
29 {
30   MIStream *st = new MIStream(this, PE, tag);
31   return st;
32 }
33
34 MOStream *Communicate::newOutputStream(int PE, int tag, unsigned int bufSize)
35 {
36   MOStream *st = new MOStream(this, PE, tag, bufSize);
37   return st;
38 }
39
40 void *Communicate::getMessage(int PE, int tag)
41 {
42   int itag[2], rtag[2];
43   void *msg;
44
45   itag[0] = (PE==(-1)) ? (CmmWildCard) : PE;
46   itag[1] = (tag==(-1)) ? (CmmWildCard) : tag;
47   while((msg=CmmGet(CpvAccess(CsmMessages),2,itag,rtag))==0) {
48     CmiDeliverMsgs(0);
49   }
50   return msg;
51 }
52
53 void Communicate::sendMessage(int PE, void *msg, int size)
54 {
55   CmiSetHandler(msg, CsmHandlerIndex);
56   switch(PE) {
57     case ALL:
58       CmiSyncBroadcastAll(size, (char *)msg);
59       break;
60     case ALLBUTME:
61       CmiSyncBroadcast(size, (char *)msg);
62       break;
63     default:
64       CmiSyncSend(PE, size, (char *)msg);
65       break;
66   }
67 }