Adding converse level functions for sending messages to remote replicas
authorNikhil Jain <nikhil@illinois.edu>
Mon, 29 Oct 2012 09:37:36 +0000 (04:37 -0500)
committerNikhil Jain <nikhil@illinois.edu>
Mon, 29 Oct 2012 09:37:36 +0000 (04:37 -0500)
src/arch/util/machine-common-core.c
src/conv-core/converse.h

index d8434d31e00efc2e21c83d8b8d714de05ba643a7..98a959229529e6411391b7339676ca99fa755af4 100644 (file)
@@ -487,6 +487,11 @@ void CmiSyncSendFn(int destPE, int size, char *msg) {
     char *dupmsg = CopyMsg(msg, size);
     CmiFreeSendFn(destPE, size, dupmsg);
 }
+//remote replica send
+void CmiRemoteSyncSendFn(int destPE, int partition, int size, char *msg) {
+    char *dupmsg = CopyMsg(msg, size);
+    CmiRemoteFreeSendFn(destPE, partition, size, dupmsg);
+}
 
 #if CMK_USE_PXSHM
 #include "machine-pxshm.c"
@@ -501,30 +506,34 @@ static int refcount = 0;
 CpvExtern(int, _urgentSend);
 #endif
 
-/* a wrapper of LrtsSendFunc */
-#if CMK_C_INLINE
-inline 
-#endif
-CmiCommHandle CmiSendNetworkFunc(int destPE, int size, char *msg, int mode)
+//declaration so that it can be used
+CmiCommHandle CmiRemoteSendNetworkFunc(int destPE, int partition, int size, char *msg, int mode);
+//I am changing this function to offload task to a generic function - the one
+//that handles sending to any replica
+INLINE_KEYWORD CmiCommHandle CmiSendNetworkFunc(int destPE, int size, char *msg, int mode) {
+  return CmiRemoteSendNetworkFunc(destPE, CmiMyPartition(), size, msg, mode);
+}
+//the generic function that replaces the older one
+CmiCommHandle CmiRemoteSendNetworkFunc(int destPE, int partition, int size, char *msg, int mode)
 {
         int rank;
         int destLocalNode = CmiNodeOf(destPE); 
-        int destNode = CmiGetNodeGlobal(CmiNodeOf(destPE),partitionInfo.myPartition); 
-#if CMK_USE_PXSHM
+        int destNode = CmiGetNodeGlobal(destLocalNode,partition); 
+#if CMK_USE_PXSHM       //not handled yet correctly
         if (CmiValidPxshm(destLocalNode, size)) {
           CmiSendMessagePxshm(msg, size, destLocalNode, &refcount);
           //for (int i=0; i<refcount; i++) CmiReference(msg);
           return 0;
         }
 #endif
-#if CMK_USE_XPMEM
+#if CMK_USE_XPMEM       //not handled yet correctly
         if (CmiValidXpmem(destLocalNode, size)) {
           CmiSendMessageXpmem(msg, size, destLocalNode, &refcount);
           //for (int i=0; i<refcount; i++) CmiReference(msg);
           return 0;
         }
 #endif
-#if CMK_PERSISTENT_COMM
+#if CMK_PERSISTENT_COMM //not handled yet correctly
         if (CpvAccess(phs)) {
           if (size > PERSIST_MIN_SIZE) {
             CmiAssert(CpvAccess(curphs) < CpvAccess(phsSize));
@@ -548,10 +557,17 @@ if (MSG_STATISTIC)
     return LrtsSendFunc(destNode, size, msg, mode);
 }
 
-void CmiFreeSendFn(int destPE, int size, char *msg) {
+//I am changing this function to offload task to a generic function - the one
+//that handles sending to any replica
+INLINE_KEYWORD void CmiFreeSendFn(int destPE, int size, char *msg) {
+    CmiRemoteFreeSendFn(destPE, CmiMyPartition(), size, msg);
+}
+//and the generic implementation - I may be in danger of making the frequent
+//case slower - two extra comparisons may happen
+void CmiRemoteFreeSendFn(int destPE, int partition, int size, char *msg) {
     CMI_SET_BROADCAST_ROOT(msg, 0);
     CQdCreate(CpvAccess(cQdState), 1);
-    if (CmiMyPe()==destPE) {
+    if (CmiMyPe()==destPE && partition == CmiMyPartition()) {
         CmiSendSelf(msg);
 #if CMK_PERSISTENT_COMM
         if (CpvAccess(phs)) CpvAccess(curphs)++;
@@ -561,7 +577,7 @@ void CmiFreeSendFn(int destPE, int size, char *msg) {
         int destNode = CmiNodeOf(destPE);
         int destRank = CmiRankOf(destPE);
 #if CMK_SMP
-        if (CmiMyNode()==destNode) {
+        if (CmiMyNode()==destNode && partition == CmiMyPartition()) {
             CmiPushPE(destRank, msg);
 #if CMK_PERSISTENT_COMM
             if (CpvAccess(phs)) CpvAccess(curphs)++;
@@ -570,7 +586,8 @@ void CmiFreeSendFn(int destPE, int size, char *msg) {
         }
 #endif
         CMI_DEST_RANK(msg) = destRank;
-        CmiSendNetworkFunc(destPE, size, msg, P2P_SYNC);
+        CmiRemoteSendNetworkFunc(destPE, partition, size, msg, P2P_SYNC);
+
 #if CMK_PERSISTENT_COMM
         if (CpvAccess(phs)) CpvAccess(curphs)++;
 #endif
@@ -579,6 +596,7 @@ void CmiFreeSendFn(int destPE, int size, char *msg) {
 #endif
 
 #if USE_COMMON_ASYNC_P2P
+//not implementing it for replica
 CmiCommHandle CmiAsyncSendFn(int destPE, int size, char *msg) {
     int destNode = CmiNodeOf(destPE);
     if (destNode == CmiMyNode()) {
@@ -612,17 +630,28 @@ static void CmiSendNodeSelf(char *msg) {
     CmiUnlock(CsvAccess(NodeState).CmiNodeRecvLock);
 }
 
-#if USE_COMMON_ASYNC_P2P
-void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
+//I think this #if is incorrect - should be SYNC_P2P
+#if USE_COMMON_SYNC_P2P
+INLINE_KEYWORD void CmiSyncNodeSendFn(int destNode, int size, char *msg) {
     char *dupmsg = CopyMsg(msg, size);
     CmiFreeNodeSendFn(destNode, size, dupmsg);
 }
+//send to remote replica
+void CmiRemoteSyncNodeSendFn(int destNode, int partition, int size, char *msg) {
+    char *dupmsg = CopyMsg(msg, size);
+    CmiRemoteFreeNodeSendFn(destNode, partition, size, dupmsg);
+}
 
-void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
+//again, offloading the task to a generic function
+INLINE_KEYWORD void CmiFreeNodeSendFn(int destNode, int size, char *msg) {
+  CmiRemoteFreeNodeSendFn(destNode, CmiMyPartition(), size, msg);
+}
+//and the remote replica function
+void CmiRemoteFreeNodeSendFn(int destNode, int partition, int size, char *msg) {
     CMI_DEST_RANK(msg) = DGRAM_NODEMESSAGE;
     CQdCreate(CpvAccess(cQdState), 1);
     CMI_SET_BROADCAST_ROOT(msg, 0);
-    if (destNode == CmiMyNode()) {
+    if (destNode == CmiMyNode() && CmiMyPartition() == partition) {
         CmiSendNodeSelf(msg);
     } else {
 #if CMK_WITH_STATS
@@ -633,7 +662,7 @@ if (  MSG_STATISTIC)
     msg_histogram[ret_log]++;
 }
 #endif
-        CmiSendNetworkFunc(CmiNodeFirst(destNode), size, msg, P2P_SYNC);
+        CmiRemoteSendNetworkFunc(CmiNodeFirst(destNode), partition, size, msg, P2P_SYNC);
     }
 #if CMK_PERSISTENT_COMM
     if (CpvAccess(phs)) CpvAccess(curphs)++;
@@ -642,6 +671,7 @@ if (  MSG_STATISTIC)
 #endif
 
 #if USE_COMMON_ASYNC_P2P
+//not implementing it for replica
 CmiCommHandle CmiAsyncNodeSendFn(int destNode, int size, char *msg) {
     if (destNode == CmiMyNode()) {
         CmiSyncNodeSendFn(destNode, size, msg);
@@ -1117,5 +1147,3 @@ static char *CopyMsg(char *msg, int len) {
     return copy;
 }
 
-
-
index 98e744624c64cf2bec198d21951397c3235c30d5..fc9ec1935c65ac6ebe43afa5d19f12b71c521486 100644 (file)
@@ -1091,6 +1091,10 @@ void          CmiSyncMulticastFn(CmiGroup, int, char*);
 CmiCommHandle CmiAsyncMulticastFn(CmiGroup, int, char*);
 void          CmiFreeMulticastFn(CmiGroup, int, char*);
 
+//remote replica send counterparts
+void          CmiRemoteSyncSendFn(int, int, int, char *);
+void          CmiRemoteFreeSendFn(int, int, int, char *);
+
 typedef void * (*CmiReduceMergeFn)(int*,void*,void**,int);
 typedef void (*CmiReducePupFn)(void*,void*);
 typedef void (*CmiReduceDeleteFn)(void*);
@@ -1178,6 +1182,13 @@ void          CmiReleaseCommHandle(CmiCommHandle);
 #define CmiAsyncMulticast(g,s,m)        (CmiAsyncMulticastFn((g),(s),(char*)(m)))
 #define CmiSyncMulticastAndFree(g,s,m)  (CmiFreeMulticastFn((g),(s),(char*)(m)))
 
+
+//adding functions for sending to remote replicas - only the sync ones because
+//we do not use the async ones
+#define CmiRemoteSyncSend(pe,p,s,m)              (CmiRemoteSyncSendFn((pe),(p),(s),(char *)(m)))
+#define CmiRemoteSyncSendAndFree(pe,p,s,m)       (CmiRemoteFreeSendFn((pe),(p),(s),(char *)(m)))
+//support for rest may come later if required
+
 #if CMK_NODE_QUEUE_AVAILABLE
 void          CmiSyncNodeSendFn(int, int, char *);
 CmiCommHandle CmiAsyncNodeSendFn(int, int, char *);
@@ -1190,6 +1201,10 @@ void          CmiFreeNodeBroadcastFn(int, char *);
 void          CmiSyncNodeBroadcastAllFn(int, char *);
 CmiCommHandle CmiAsyncNodeBroadcastAllFn(int, char *);
 void          CmiFreeNodeBroadcastAllFn(int, char *);
+
+//if node queue is available, adding remote replica counterparts
+void          CmiRemoteSyncNodeSendFn(int, int, int, char *);
+void          CmiRemoteFreeNodeSendFn(int, int, int, char *);
 #endif
 
 #if CMK_NODE_QUEUE_AVAILABLE
@@ -1202,6 +1217,11 @@ void          CmiFreeNodeBroadcastAllFn(int, char *);
 #define CmiSyncNodeBroadcastAll(s,m)        (CmiSyncNodeBroadcastAllFn((s),(char *)(m)))
 #define CmiAsyncNodeBroadcastAll(s,m)       (CmiAsyncNodeBroadcastAllFn((s),(char *)(m)))
 #define CmiSyncNodeBroadcastAllAndFree(s,m) (CmiFreeNodeBroadcastAllFn((s),(char *)(m)))
+
+//counterparts of remote replica
+#define CmiRemoteSyncNodeSend(pe,p,s,m)         (CmiRemoteSyncNodeSendFn((pe),(p),(s),(char *)(m)))
+#define CmiRemoteSyncNodeSendAndFree(pe,p,s,m)  (CmiRemoteFreeNodeSendFn((pe),(p),(s),(char *)(m)))
+
 #else
 
 #define CmiSyncNodeSend(n,s,m)        CmiSyncSend(CmiNodeFirst(n),s,m)
@@ -1237,6 +1257,9 @@ void          CmiFreeNodeBroadcastAllFn(int, char *);
 #define CmiAsyncNodeBroadcastAll(s,m)       CmiAsyncBroadcastAll(s,m)
 #define CmiSyncNodeBroadcastAllAndFree(s,m) CmiSyncBroadcastAllAndFree(s,m)
 #endif
+//and the remote replica counterparts
+#define CmiRemoteSyncNodeSend(n,p,s,m)          CmiRemoteSyncSend(CmiNodeFirst(n),p,s,m)
+#define CmiRemoteSyncNodeSendAndFree(n,p,s,m)   CmiRemoteSyncSendAndFree(CmiNodeFirst(n),p,s,m)
 #endif
 
 /******** CMI MESSAGE RECEPTION ********/