Fixed converse master-slave library and the test program.
authorMilind Bhandarkar <milind@cs.uiuc.edu>
Mon, 18 Dec 2000 02:31:38 +0000 (02:31 +0000)
committerMilind Bhandarkar <milind@cs.uiuc.edu>
Mon, 18 Dec 2000 02:31:38 +0000 (02:31 +0000)
src/libs/conv-libs/master-slave/cms.c
src/libs/conv-libs/master-slave/cms.h

index 788c9dec9ebffc4cef39fb4cf08ff039ed205650..6e09b02f5587a4ddac858076c2d2a9c676838e0e 100644 (file)
  *****************************************************************************/
 
 
-/* converse master-slave (or manager-worker or agenda) paradigm library */
+/*
+ * converse master-slave (or manager-worker or agenda) paradigm library 
+ */
 
 
-#include <stdlib.h>
-#include <converse.h>
-
-typedef  int (* cms_WorkerFn)(void *, void *);
-
-typedef int (* cms_consumer)(void *, int);
+#include "cms.h"
 
 typedef struct {
-  int ref;
-  char * response;
+    int ref;
+    char *response;
 } ResponseRecord;
 
 CpvDeclare(int, stopHandlerIndex);
 CpvDeclare(int, workHandler);
-CpvDeclare(int,responseHandler);
-CpvDeclare(int,infoIdx);
-CpvDeclare (ResponseRecord *, responses);
-CpvDeclare(cms_WorkerFn, clientWorker);
-CpvDeclare(cms_consumer, consumerFunction);
-CpvDeclare(int,counter);
+CpvDeclare(int, responseHandler);
+CpvDeclare(int, infoIdx);
+CpvDeclare(ResponseRecord *, responses);
+CpvDeclare(CmsWorkerFn, clientWorker);
+CpvDeclare(CmsConsumerFn, consumerFunction);
+CpvDeclare(int, counter);
 CpvDeclare(int, tableSize);
 
-void initForCms(int argc,char *argv[])
+static void initForCms(int argc, char *argv[])
 {
 }
 
-void cms_infoFn(void *msg, CldPackFn *pfn, int *len,
-            int *queueing, int *priobits, unsigned int **prioptr)
+static void
+cms_infoFn(void *msg, CldPackFn * pfn, int *len,
+          int *queueing, int *priobits, unsigned int **prioptr)
 {
-  *pfn = 0;
-  *len = *( (int*) ((char *) msg +CmiExtHeaderSizeBytes)) + CmiExtHeaderSizeBytes + 8;
-  *queueing = CQS_QUEUEING_FIFO;
-  *priobits = 0;
-  *prioptr = 0;
+    *pfn = 0;
+    *len =
+       *((int *) ((char *) msg + CmiExtHeaderSizeBytes)) +
+       CmiExtHeaderSizeBytes + 8;
+    *queueing = CQS_QUEUEING_FIFO;
+    *priobits = 0;
+    *prioptr = 0;
 }
 
-void stopHandler(char * msg)
+static void stopHandler(void *msg)
 {
-  CsdExitScheduler();
-  ConverseExit();
+    CsdExitScheduler();
+    ConverseExit();
 }
 
-void callWorker(char *msg)
+static void callWorker(void *msg)
 {
char * m, *r;
- int size, msgSize, ref;
-char * msg2;
-
ref = *((int *) (msg + sizeof(int) + CmiExtHeaderSizeBytes));
m = msg + 2*sizeof(int) + CmiExtHeaderSizeBytes;
size = CpvAccess(clientWorker)(m, &r);
-
-  msgSize = 2*sizeof(int) + CmiMsgHeaderSizeBytes + size;
-  msg2 = CmiAlloc( msgSize );
-  m = msg2 +  CmiMsgHeaderSizeBytes;
-  *((int *) m) = size;
-  m += sizeof(int);
-  *((int *) m) = ref;
-  m += sizeof(int);
-  memcpy( m, r, size);
-  CmiSetHandler(msg2, CpvAccess(responseHandler));
-  CmiSyncSendAndFree( 0, msgSize, msg2);
   char *m, *r;
   int size, msgSize, ref;
+    char *msg2;
+
   ref = *((int *) ((char *) msg + sizeof(int) + CmiExtHeaderSizeBytes));
   m = (char *) msg + 2 * sizeof(int) + CmiExtHeaderSizeBytes;
   size = CpvAccess(clientWorker) (m, &r);
+
+    msgSize = 2 * sizeof(int) + CmiMsgHeaderSizeBytes + size;
+    msg2 = CmiAlloc(msgSize);
+    m = msg2 + CmiMsgHeaderSizeBytes;
+    *((int *) m) = size;
+    m += sizeof(int);
+    *((int *) m) = ref;
+    m += sizeof(int);
+    memcpy(m, r, size);
+    CmiSetHandler(msg2, CpvAccess(responseHandler));
+    CmiSyncSendAndFree(0, msgSize, msg2);
 };
 
-void response(char * msg)
+static void response(void *msg)
 {
-  char * r, *m;
-  int size, ref;
-   m = msg + CmiMsgHeaderSizeBytes;
-  size = *( (int *) m);
-  m += sizeof(int);
-  ref = *( (int *) m);
-  m += sizeof(int);
-  r = (char *) malloc(size);
-  memcpy(r, m, size);
-  if (CpvAccess(consumerFunction) == NULL) {
-    CpvAccess(responses)[ref].ref = ref;
-    CpvAccess(responses)[ref].response = r;}
-  else {
-    CpvAccess(consumerFunction)(r, ref);
-  }
-    
-  if (--CpvAccess(counter) == 0) {
-   CsdExitScheduler();
-  }
+    char *r, *m;
+    int size, ref;
+    m = (char *) msg + CmiMsgHeaderSizeBytes;
+    size = *((int *) m);
+    m += sizeof(int);
+    ref = *((int *) m);
+    m += sizeof(int);
+    r = (char *) malloc(size);
+    memcpy(r, m, size);
+    if (CpvAccess(consumerFunction) == 0) {
+       CpvAccess(responses)[ref].ref = ref;
+       CpvAccess(responses)[ref].response = r;
+    } else {
+       CpvAccess(consumerFunction) (r, ref);
+    }
+
+    if (--CpvAccess(counter) == 0) {
+       CsdExitScheduler();
+    }
 }
 
-char *CmsGetResponse(int ref)
+
+void *CmsGetResponse(int ref)
 {
-  return (CpvAccess(responses)[ref].response);
+    return (CpvAccess(responses)[ref].response);
 }
 
 
-void cms_registerHandlers(cms_WorkerFn f)
+static void cms_registerHandlers(CmsWorkerFn f)
 {
-  CpvInitialize(int,stopHandlerIndex);
-  CpvInitialize(int,workHandler);
-  CpvInitialize(int,responseHandler);
-  CpvInitialize(int,infoIdx);
-  CpvInitialize (ResponseRecord *, responses);
-  CpvInitialize(cms_WorkerFn, clientWorker);
-  CpvInitialize(cms_consumer, consumerFunction);
-  CpvInitialize(int,counter);
-  CpvInitialize(int,tableSize);
-
-
-  CpvAccess(stopHandlerIndex) = CmiRegisterHandler((CmiHandler) stopHandler);
-  CpvAccess(workHandler) = CmiRegisterHandler((CmiHandler) callWorker);
-  CpvAccess(responseHandler) = CmiRegisterHandler((CmiHandler) response);
-  CpvAccess(infoIdx) = CldRegisterInfoFn(cms_infoFn);
-  CpvAccess(clientWorker) = f;
-  CpvAccess(consumerFunction) = (cms_consumer) NULL;
+    CpvInitialize(int, stopHandlerIndex);
+    CpvInitialize(int, workHandler);
+    CpvInitialize(int, responseHandler);
+    CpvInitialize(int, infoIdx);
+    CpvInitialize(ResponseRecord *, responses);
+    CpvInitialize(CmsWorkerFn, clientWorker);
+    CpvInitialize(CmsConsumerFn, consumerFunction);
+    CpvInitialize(int, counter);
+    CpvInitialize(int, tableSize);
+
+
+    CpvAccess(stopHandlerIndex) =
+       CmiRegisterHandler((CmiHandler) stopHandler);
+    CpvAccess(workHandler) = CmiRegisterHandler((CmiHandler) callWorker);
+    CpvAccess(responseHandler) = CmiRegisterHandler((CmiHandler) response);
+    CpvAccess(infoIdx) = CldRegisterInfoFn(cms_infoFn);
+    CpvAccess(clientWorker) = f;
+    CpvAccess(consumerFunction) = (CmsConsumerFn) 0;
 }
 
 
 
-void CmsInit(cms_WorkerFn f, int maxResponses)
+void CmsInit(CmsWorkerFn f, int maxResponses)
 {
-  char* argv[100];
-  int argc = 0;
-  argv[0] = 0;
-  ConverseInit(argc, argv,(CmiStartFn)initForCms,1,1);
-  cms_registerHandlers(f);
-  CpvAccess(tableSize) = maxResponses;
-  if (CmiMyPe() == 0) { /* I am the manager */
-    CpvAccess(responses) = malloc(maxResponses*sizeof(ResponseRecord));
-    CpvAccess(counter) = 0;
-  }
-  else { /* I am a worker */
-  CsdScheduler(-1);
-  ConverseExit();
-  }    
+    char *argv[1];
+    int argc = 0;
+    argv[0] = 0;
+    ConverseInit(argc, argv, (CmiStartFn) initForCms, 1, 1);
+    cms_registerHandlers(f);
+    CpvAccess(tableSize) = maxResponses;
+    if (CmiMyPe() == 0) {      /*
+                                * I am the manager 
+                                */
+       CpvAccess(responses) =
+           (ResponseRecord *) CmiAlloc(maxResponses * sizeof(ResponseRecord));
+       CpvAccess(counter) = 0;
+    } else {                   /*
+                                * I am a worker 
+                                */
+       CsdScheduler(-1);
+       ConverseExit();
+    }
 }
 
-void CmsFireTask(int ref, char * t, int size)
+void CmsFireTask(int ref, void *t, int size)
 {
-  char *m;
-  char * msg;
+    char *m;
+    char *msg;
 
-  msg = CmiAlloc( 2*sizeof(int) + CmiExtHeaderSizeBytes + size);
+    msg = CmiAlloc(2 * sizeof(int) + CmiExtHeaderSizeBytes + size);
 
-  CmiSetHandler(msg, CpvAccess(workHandler));
-  m = msg +  CmiExtHeaderSizeBytes;
-  *((int *) m) = size;
-  m += sizeof(int);
-  *((int *) m) = ref;
-  m += sizeof(int);
-  memcpy( m, t, size);
+    CmiSetHandler(msg, CpvAccess(workHandler));
+    m = msg + CmiExtHeaderSizeBytes;
+    *((int *) m) = size;
+    m += sizeof(int);
+    *((int *) m) = ref;
+    m += sizeof(int);
+    memcpy(m, t, size);
 
-  CldEnqueue(CLD_ANYWHERE, msg, CpvAccess(infoIdx)); 
+    CldEnqueue(CLD_ANYWHERE, msg, CpvAccess(infoIdx));
 
-  CpvAccess(counter)++;
+    CpvAccess(counter)++;
 }
 
+/*
+ * allows the system to use processor 0 as a worker.
+ */
 void CmsAwaitResponses(void)
-/* allows the system to use processor 0 as a worker.*/
 {
 
-  CsdScheduler(-1); /* be a worker for a while, and also process responses */
-  /* when back from the sceduler, return. Because all response have been recvd */
-} 
+    CsdScheduler(-1);          /*
+                                * be a worker for a while, and also process
+                                * responses 
+                                */
+    /*
+     * when back from the sceduler, return. Because all response have been
+     * recvd 
+     */
+}
 
-void CmsProcessResponses(cms_consumer f)
+void CmsProcessResponses(CmsConsumerFn f)
 {
-  CpvAccess(consumerFunction) = f;
-  CsdScheduler(-1); 
+    CpvAccess(consumerFunction) = f;
+    CsdScheduler(-1);
 }
 
 void CmsExit(void)
 {
-  char * msg;
+    void *msg;
 
-  msg = (char *) CmiAlloc(CmiMsgHeaderSizeBytes + 8);
-  CmiSetHandler(msg, CpvAccess(stopHandlerIndex));
-  CmiSyncBroadcastAndFree( CmiMsgHeaderSizeBytes + 8, msg);
+    msg = CmiAlloc(CmiMsgHeaderSizeBytes + 8);
+    CmiSetHandler(msg, CpvAccess(stopHandlerIndex));
+    CmiSyncBroadcastAndFree(CmiMsgHeaderSizeBytes + 8, msg);
 
-  ConverseExit();
+    ConverseExit();
 }
index de31ad558398d7d68ddb4159161206122eeaaf92..b58887472f899848536abce1b5d303fcc9f0e8a1 100644 (file)
@@ -7,4 +7,12 @@
 
 #include <converse.h>
 
-char * CmsGetResponse(int);
+typedef int (*CmsWorkerFn) (void *, void *);
+typedef int (*CmsConsumerFn) (void *, int);
+
+void CmsInit(CmsWorkerFn f, int maxResponses);
+void CmsFireTask(int ref, void *t, int size);
+void CmsAwaitResponses(void);
+void CmsProcessResponses(CmsConsumerFn f);
+void *CmsGetResponse (int ref);
+void CmsExit(void);