Moved CcsReply to middle-ccs so it works both in bigsim and in normal charm
authorFilippo Gioachin <gioachin@uiuc.edu>
Wed, 21 Apr 2010 23:51:00 +0000 (18:51 -0500)
committerFilippo Gioachin <gioachin@uiuc.edu>
Wed, 21 Apr 2010 23:51:00 +0000 (18:51 -0500)
src/conv-ccs/conv-ccs.c
src/conv-ccs/conv-ccs.h
src/conv-ccs/middle-ccs.C

index 342e5fb6c5ff2d6e010a8f20f65a3b96b72a35a1..dafaa0634932cd8deb604bdab9ee65ef79556ae0 100644 (file)
  *
  *****************************************************************************/
  
-#include "ckhashtable.h"
-
-/* Includes all information stored about a single CCS handler. */
-typedef struct CcsHandlerRec {
-       const char *name; /*Name passed over socket*/
-       CmiHandler fnOld; /*Old converse-style handler, or NULL if new-style*/
-       CcsHandlerFn fn; /*New-style handler function, or NULL if old-style*/
-       void *userPtr;
-       CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
-       int nCalls; /* Number of times handler has been executed*/
-       CmiUInt2 redID; /*Reduction ID to be used with CmiListReduce*/
-} CcsHandlerRec;
-
 static void initHandlerRec(CcsHandlerRec *c,const char *name) {
   if (strlen(name)>=CCS_MAXHANDLER) 
        CmiAbort("CCS handler names cannot exceed 32 characters");
@@ -65,8 +52,7 @@ static void callHandlerRec(CcsHandlerRec *c,int reqLen,const void *reqData) {
 }
 
 /*Table maps handler name to CcsHandler object*/
-typedef CkHashtable_c CcsHandlerTable;
-CpvStaticDeclare(CcsHandlerTable, ccsTab);
+CpvDeclare(CcsHandlerTable, ccsTab);
 
 CpvStaticDeclare(CcsImplHeader*,ccsReq);/*Identifies CCS requestor (client)*/
 
@@ -83,6 +69,9 @@ void CcsRegisterHandlerFn(const char *name, CcsHandlerFn fn, void *ptr) {
   cp.userPtr=ptr;
   *(CcsHandlerRec *)CkHashtablePut(CpvAccess(ccsTab),(void *)&cp.name)=cp;
 }
+CcsHandlerRec *CcsGetHandler(const char *name) {
+  return CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
+}
 void CcsSetMergeFn(const char *name, CmiReduceMergeFn newMerge) {
   CcsHandlerRec *rec=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&name);
   if (rec==NULL) {
@@ -171,39 +160,7 @@ void CcsCallerId(skt_ip_t *pip, unsigned int *pport)
   *pport = ChMessageInt(CpvAccess(ccsReq)->attr.port);
 }
 
-static int rep_fw_handler_idx;
-
-/**
- * Decide if the reply is ready to be forwarded to the waiting client,
- * or if combination is required (for broadcast/multicast CCS requests.
- */
-int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
-  int repPE = (int)ChMessageInt(rep->pe);
-  if (repPE <= -1) {
-    /* Reduce the message to get the final reply */
-    CcsHandlerRec *fn;
-    int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
-    char *msg=CmiAlloc(len);
-    char *r=msg+CmiReservedHeaderSize;
-    char *handlerStr;
-    rep->len = ChMessageInt_new(repLen);
-    *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
-    memcpy(r,repData,repLen);
-    CmiSetHandler(msg,rep_fw_handler_idx);
-    handlerStr=rep->handler;
-    fn=(CcsHandlerRec *)CkHashtableGet(CpvAccess(ccsTab),(void *)&handlerStr);
-    if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
-    if (repPE == -1) {
-      /* CCS Broadcast */
-      CkReduce(msg, len, fn->mergeFn);
-    } else {
-      /* CCS Multicast */
-      CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
-    }
-  } else {
-    CcsImpl_reply(rep, repLen, repData);
-  }
-}
+int rep_fw_handler_idx;
 
 CcsDelayedReply CcsDelayReply(void)
 {
@@ -286,14 +243,13 @@ void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData)
     CcsSendReply(0,NULL);/*Send an empty reply if not*/
 }
 
-#if ! NODE_0_IS_CONVHOST
+#if ! NODE_0_IS_CONVHOST || CMK_BLUEGENE_CHARM
 /* The followings are necessary to prevent CCS requests to be processed before
  * CCS has been initialized. Really it matters only when NODE_0_IS_CONVHOST=0, but
  * it doesn't hurt having it in the other case as well */
 static char **bufferedMessages = NULL;
 static int CcsNumBufferedMsgs = 0;
 #define CCS_MAX_NUM_BUFFERED_MSGS  100
-#endif
 
 void CcsBufferMessage(char *msg) {
   //CmiPrintf("Buffering CCS message\n");
@@ -303,6 +259,7 @@ void CcsBufferMessage(char *msg) {
   bufferedMessages[CcsNumBufferedMsgs] = msg;
   CcsNumBufferedMsgs ++;
 }
+#endif
   
 /*Unpacks request message to call above routine*/
 int _ccsHandlerIdx = 0;/*Converse handler index of routine req_fw_handler*/
index 7f87b368519bade3cf90e00c8c049955cfd80eb7..e0e64868ca8ebd036d49f99da3d1ccc905af5c4f 100644 (file)
@@ -17,6 +17,8 @@ but it can use the CcsSendReply function.
 #include "sockRoutines.h"
 #include "ccs-server.h" /*for CcsSecAttr*/
 
+#include "ckhashtable.h"
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -29,10 +31,26 @@ extern int _ccsHandlerIdx;
 
 #if CMK_CCS_AVAILABLE
 
+typedef CkHashtable_c CcsHandlerTable;
+CpvExtern(CcsHandlerTable, ccsTab);
+
 typedef struct CcsDelayedReply_struct {
   CcsImplHeader *hdr;
 } CcsDelayedReply;
 
+typedef void (*CcsHandlerFn)(void *userPtr,int reqLen,const void *reqData);
+
+/* Includes all information stored about a single CCS handler. */
+typedef struct CcsHandlerRec {
+    const char *name; /*Name passed over socket*/
+    CmiHandler fnOld; /*Old converse-style handler, or NULL if new-style*/
+    CcsHandlerFn fn; /*New-style handler function, or NULL if old-style*/
+    void *userPtr;
+    CmiReduceMergeFn mergeFn; /*Merge function used for bcast requests*/
+    int nCalls; /* Number of times handler has been executed*/
+    CmiUInt2 redID; /*Reduction ID to be used with CmiListReduce*/
+} CcsHandlerRec;
+
 /**
  * Backward compatability routine: register a regular converse-style handler
  * to receive CCS requests.  The requests will arrive as a Converse message,
@@ -40,11 +58,12 @@ typedef struct CcsDelayedReply_struct {
  */
 void CcsRegisterHandler(const char *ccs_handlername, CmiHandler fn);
 
+CcsHandlerRec *CcsGetHandler(const char *name);
+
 /**
  * Register a real Ccs handler function to receive these CCS requests. 
  * The requests will arrive as a flat, readonly buffer.
  */
-typedef void (*CcsHandlerFn)(void *userPtr,int reqLen,const void *reqData);
 void CcsRegisterHandlerFn(const char *ccs_handlername, CcsHandlerFn fn, void *userPtr);
 
 /**
index 262aef92bc63b228601fe7436a450df824ca22b0..f13a248cb89ad4ab9927372bed246458295ea5dc 100644 (file)
@@ -1,9 +1,10 @@
-#include "converse.h"
+#include "middle.h"
 
 #if CMK_BLUEGENE_CHARM
 #include "bgconverse.h"
 #endif
 #include "ccs-server.h"
+#include "conv-ccs.h"
 
 extern "C" void CcsHandleRequest(CcsImplHeader *hdr,const char *reqData);
 
@@ -42,6 +43,39 @@ extern "C" void req_fw_handler(char *msg)
 }
 
 extern "C" void CcsSendReply(int replyLen, const void *replyData);
+extern int rep_fw_handler_idx;
+/**
+ * Decide if the reply is ready to be forwarded to the waiting client,
+ * or if combination is required (for broadcast/multicast CCS requests.
+ */
+extern "C" int CcsReply(CcsImplHeader *rep,int repLen,const void *repData) {
+  int repPE = (int)ChMessageInt(rep->pe);
+  if (repPE <= -1) {
+    /* Reduce the message to get the final reply */
+    CcsHandlerRec *fn;
+    int len=CmiReservedHeaderSize+sizeof(CcsImplHeader)+repLen;
+    char *msg=(char*)CmiAlloc(len);
+    char *r=msg+CmiReservedHeaderSize;
+    char *handlerStr;
+    rep->len = ChMessageInt_new(repLen);
+    *(CcsImplHeader *)r=*rep; r+=sizeof(CcsImplHeader);
+    memcpy(r,repData,repLen);
+    CmiSetHandler(msg,rep_fw_handler_idx);
+    handlerStr=rep->handler;
+    fn=(CcsHandlerRec *)CcsGetHandler(handlerStr);
+    if (fn->mergeFn == NULL) CmiAbort("Called CCS broadcast with NULL merge function!\n");
+    if (repPE == -1) {
+      /* CCS Broadcast */
+      CkReduce(msg, len, fn->mergeFn);
+    } else {
+      /* CCS Multicast */
+      CmiListReduce(-repPE, (int*)(rep+1), msg, len, fn->mergeFn, fn->redID);
+    }
+  } else {
+    CcsImpl_reply(rep, repLen, repData);
+  }
+}
+
 /**********************************************
   "ccs_getinfo"-- takes no data
     Return the number of parallel nodes, and