Added Message Streams to Common/langs
authorMilind Bhandarkar <milind@cs.uiuc.edu>
Thu, 16 Oct 1997 17:34:51 +0000 (17:34 +0000)
committerMilind Bhandarkar <milind@cs.uiuc.edu>
Thu, 16 Oct 1997 17:34:51 +0000 (17:34 +0000)
src/langs/streams/Communicate.C [new file with mode: 0644]
src/langs/streams/Communicate.h [new file with mode: 0644]
src/langs/streams/MStream.C [new file with mode: 0644]
src/langs/streams/MStream.h [new file with mode: 0644]

diff --git a/src/langs/streams/Communicate.C b/src/langs/streams/Communicate.C
new file mode 100644 (file)
index 0000000..f8bd3a4
--- /dev/null
@@ -0,0 +1,74 @@
+/***************************************************************************/
+/*         (C) Copyright 1995,1996,1997 The Board of Trustees of the       */
+/*                          University of Illinois                         */
+/*                           All Rights Reserved                           */
+/***************************************************************************/
+
+#include <string.h>
+#include "Communicate.h"
+#include "MStream.h"
+
+CpvStaticDeclare(CmmTable, CsmMessages);
+
+static void CsmHandler(void *msg)
+{
+  CmiGrabBuffer((char **)&msg);
+  // get start of user message
+  int *m = (int *) ((char *)msg+CmiMsgHeaderSizeBytes);
+  // sending node  & tag act as tags
+  CmmPut(CpvAccess(CsmMessages), 2, m, msg);
+}
+
+Communicate::Communicate(void) 
+{
+  CpvInitialize(CmmTable, CsmMessages);
+  CsmHandlerIndex = CmiRegisterHandler((CmiHandler) CsmHandler);
+  CpvAccess(CsmMessages) = CmmNew();
+}
+
+
+Communicate::~Communicate(void) 
+{
+  // do nothing
+}
+
+MIStream *Communicate::newInputStream(int PE, int tag)
+{
+  MIStream *st = new MIStream(this, PE, tag);
+  return st;
+}
+
+MOStream *Communicate::newOutputStream(int PE, int tag, unsigned int bufSize)
+{
+  MOStream *st = new MOStream(this, PE, tag, bufSize);
+  return st;
+}
+
+void *Communicate::getMessage(int PE, int tag)
+{
+  int itag[2], rtag[2];
+  void *msg;
+
+  itag[0] = (PE==(-1)) ? (CmmWildCard) : PE;
+  itag[1] = (tag==(-1)) ? (CmmWildCard) : tag;
+  while((msg=CmmGet(CpvAccess(CsmMessages),2,itag,rtag))==0) {
+    CmiDeliverMsgs(0);
+  }
+  return msg;
+}
+
+void Communicate::sendMessage(int PE, void *msg, int size)
+{
+  CmiSetHandler(msg, CsmHandlerIndex);
+  switch(PE) {
+    case ALL:
+      CmiSyncBroadcastAll(size, (char *)msg);
+      break;
+    case ALLBUTME:
+      CmiSyncBroadcast(size, (char *)msg);
+      break;
+    default:
+      CmiSyncSend(PE, size, (char *)msg);
+      break;
+  }
+}
diff --git a/src/langs/streams/Communicate.h b/src/langs/streams/Communicate.h
new file mode 100644 (file)
index 0000000..4b4bf5b
--- /dev/null
@@ -0,0 +1,40 @@
+//-*-c++-*-
+/***************************************************************************/
+/*       (C) Copyright 1995,1996,1997 The Board of Trustees of the         */
+/*                          University of Illinois                         */
+/*                           All Rights Reserved                           */
+/***************************************************************************/
+
+#ifndef COMMUNICATE_H
+#define COMMUNICATE_H
+
+extern "C" {
+#include "converse.h"
+extern void CmiGrabBuffer(char **msg);
+}
+
+class MIStream;
+class MOStream;
+
+#define ALL      -1
+#define ALLBUTME -2
+#define BUFSIZE  4096
+#define ANY      -1
+
+class Communicate {
+
+private:
+  int CsmHandlerIndex;
+
+public:
+  Communicate(void);
+  ~Communicate();
+  MIStream *newInputStream(int pe, int tag);
+  MOStream *newOutputStream(int pe, int tag, unsigned int bufsize);
+  void *getMessage(int PE, int tag);
+  void sendMessage(int PE, void *msg, int size);
+};
+
+#include "MStream.h"
+
+#endif
diff --git a/src/langs/streams/MStream.C b/src/langs/streams/MStream.C
new file mode 100644 (file)
index 0000000..7e90e97
--- /dev/null
@@ -0,0 +1,92 @@
+#include "Communicate.h"
+#include "MStream.h"
+#include <string.h>
+
+MIStream::MIStream(Communicate *c, int p, int t)
+{
+  cobj = c;
+  PE = p;
+  tag = t;
+  msg = (StreamMessage *) 0;
+}
+
+MIStream::~MIStream()
+{
+  if(msg!=0)
+    CmiFree(msg);
+}
+
+MOStream::MOStream(Communicate *c, int p, int t, unsigned int size)
+{
+  cobj = c;
+  PE = p;
+  tag = t;
+  bufLen = size;
+  msgBuf = (StreamMessage *)CmiAlloc(sizeof(StreamMessage)+size);
+  msgBuf->PE = CmiMyPe();
+  msgBuf->tag = tag;
+  msgBuf->len = 0;
+  msgBuf->isLast = 0;
+}
+
+MOStream::~MOStream()
+{
+  if(msgBuf != 0)
+    end();
+}
+
+MIStream *MIStream::Get(char *buf, int len)
+{
+  while(len) {
+    if(msg==0) {
+      msg = (StreamMessage *) cobj->getMessage(PE, tag);
+      currentPos = 0;
+    }
+    if(currentPos+len <= msg->len) {
+      memcpy(buf, &(msg->data[currentPos]), len);
+      currentPos += len;
+      len = 0;
+    } else {
+      int b = msg->len-currentPos;
+      memcpy(buf, &(msg->data[currentPos]), b);
+      len -= b;
+      buf += b;
+      currentPos += b;
+    }
+    if(currentPos == msg->len) {
+      CmiFree(msg);
+      msg = 0;
+    }
+  }
+  return this;
+}
+
+MOStream *MOStream::Put(char *buf, int len)
+{
+  while(len) {
+    if(msgBuf->len + len <= bufLen) {
+      memcpy(&(msgBuf->data[msgBuf->len]), buf, len);
+      msgBuf->len += len;
+      len = 0;
+    } else {
+      int b = bufLen - msgBuf->len;
+      memcpy(&(msgBuf->data[msgBuf->len]), buf, b);
+      msgBuf->len = bufLen;
+      cobj->sendMessage(PE, (void *)msgBuf, bufLen+sizeof(StreamMessage));
+      msgBuf->len = 0;
+      msgBuf->isLast = 0;
+      len -= b;
+      buf += b;
+    }
+  }
+  return this;
+}
+
+void MOStream::end(void)
+{
+  msgBuf->isLast = 1;
+  cobj->sendMessage(PE,(void*)msgBuf,msgBuf->len+sizeof(StreamMessage));
+  msgBuf->len = 0;
+  msgBuf->isLast = 0;
+}
+
diff --git a/src/langs/streams/MStream.h b/src/langs/streams/MStream.h
new file mode 100644 (file)
index 0000000..97fbd63
--- /dev/null
@@ -0,0 +1,160 @@
+#ifndef MSTREAM_H
+#define MSTREAM_H
+
+struct StreamMessage {
+  char header[CmiMsgHeaderSizeBytes];
+  int PE;
+  int tag;
+  unsigned short len; // sizeof the data 
+  unsigned short isLast; // 1 if its last packet
+  char data[1];
+};
+
+class Communicate;
+
+class MIStream {
+  private:
+    int PE, tag;
+    StreamMessage *msg;
+    int currentPos;
+    Communicate *cobj;
+    MIStream *Get(char *buf, int len);  // get len bytes from message to buf
+  public:
+    MIStream(Communicate *c, int pe, int tag);
+    ~MIStream();
+    MIStream *get(char &data) { 
+      return Get(&data,sizeof(char)); 
+    }
+    MIStream *get(unsigned char &data) { 
+      return Get((char *)&data,sizeof(unsigned char)); 
+    }
+    MIStream *get(short &data) { 
+      return Get((char *)&data, sizeof(short)); 
+    }
+    MIStream *get(unsigned short &data) { 
+      return Get((char *)&data, sizeof(unsigned short)); 
+    }
+    MIStream *get(int &data) { 
+      return Get((char *)&data, sizeof(int)); 
+    }
+    MIStream *get(unsigned int &data) { 
+      return Get((char *)&data, sizeof(unsigned int)); 
+    }
+    MIStream *get(long &data) { 
+      return Get((char *)&data, sizeof(long)); 
+    }
+    MIStream *get(unsigned long &data) { 
+      return Get((char *)&data, sizeof(unsigned long)); 
+    }
+    MIStream *get(float &data) { 
+      return Get((char *)&data, sizeof(float)); 
+    }
+    MIStream *get(double &data) { 
+      return Get((char *)&data, sizeof(double)); 
+    }
+    MIStream *get(int len, char *data) { 
+      return Get(data,len*sizeof(char)); 
+    }
+    MIStream *get(int len, unsigned char *data) { 
+      return Get((char *)data,len*sizeof(unsigned char)); 
+    }
+    MIStream *get(int len, short *data) { 
+      return Get((char *)data,len*sizeof(short)); 
+    }
+    MIStream *get(int len, unsigned short *data) { 
+      return Get((char *)data,len*sizeof(unsigned short)); 
+    }
+    MIStream *get(int len, int *data) { 
+      return Get((char *)data,len*sizeof(int)); 
+    }
+    MIStream *get(int len, unsigned int *data) { 
+      return Get((char *)data,len*sizeof(unsigned int)); 
+    }
+    MIStream *get(int len, long *data) { 
+      return Get((char *)data,len*sizeof(long)); 
+    }
+    MIStream *get(int len, unsigned long *data) { 
+      return Get((char *)data,len*sizeof(unsigned long)); 
+    }
+    MIStream *get(int len, float *data) { 
+      return Get((char *)data,len*sizeof(float)); 
+    }
+    MIStream *get(int len, double *data) { 
+      return Get((char *)data,len*sizeof(double)); 
+    }
+};
+
+class MOStream {
+  private:
+    int PE, tag;
+    unsigned int bufLen;
+    StreamMessage *msgBuf;
+    Communicate *cobj;
+    MOStream *Put(char *buf, int len);  // put len bytes from buf into message
+  public:
+    MOStream(Communicate *c, int pe, int tag, unsigned int bufSize);
+    ~MOStream();
+    void end(void);
+    MOStream *put(char data) { 
+      return Put(&data,sizeof(char)); 
+    }
+    MOStream *put(unsigned char data) { 
+      return Put((char *)&data,sizeof(unsigned char)); 
+    }
+    MOStream *put(short data) { 
+      return Put((char *)&data, sizeof(short)); 
+    }
+    MOStream *put(unsigned short data) { 
+      return Put((char *)&data, sizeof(unsigned short)); 
+    }
+    MOStream *put(int data) { 
+      return Put((char *)&data, sizeof(int)); 
+    }
+    MOStream *put(unsigned int data) { 
+      return Put((char *)&data, sizeof(unsigned int)); 
+    }
+    MOStream *put(long data) { 
+      return Put((char *)&data, sizeof(long)); 
+    }
+    MOStream *put(unsigned long data) { 
+      return Put((char *)&data, sizeof(unsigned long)); 
+    }
+    MOStream *put(float data) { 
+      return Put((char *)&data, sizeof(float)); 
+    }
+    MOStream *put(double data) { 
+      return Put((char *)&data, sizeof(double)); 
+    }
+    MOStream *put(int len, char *data) { 
+      return Put(data,len*sizeof(char)); 
+    }
+    MOStream *put(int len, unsigned char *data) { 
+      return Put((char *)data,len*sizeof(unsigned char)); 
+    }
+    MOStream *put(int len, short *data) { 
+      return Put((char *)data,len*sizeof(short)); 
+    }
+    MOStream *put(int len, unsigned short *data) { 
+      return Put((char *)data,len*sizeof(unsigned short)); 
+    }
+    MOStream *put(int len, int *data) { 
+      return Put((char *)data,len*sizeof(int)); 
+    }
+    MOStream *put(int len, unsigned int *data) { 
+      return Put((char *)data,len*sizeof(unsigned int)); 
+    }
+    MOStream *put(int len, long *data) { 
+      return Put((char *)data,len*sizeof(long)); 
+    }
+    MOStream *put(int len, unsigned long *data) { 
+      return Put((char *)data,len*sizeof(unsigned long)); 
+    }
+    MOStream *put(int len, float *data) { 
+      return Put((char *)data,len*sizeof(float)); 
+    }
+    MOStream *put(int len, double *data) { 
+      return Put((char *)data,len*sizeof(double)); 
+    }
+};
+
+#endif