added CmiDirect in Gemini
authorYanhua Sun <sun51@hopper05.(none)>
Tue, 7 Feb 2012 21:20:22 +0000 (13:20 -0800)
committerYanhua Sun <sun51@hopper05.(none)>
Tue, 7 Feb 2012 21:20:22 +0000 (13:20 -0800)
src/arch/gemini_gni/machine.c
src/conv-core/cmidirect.h
tests/charm++/pingpong/pingpong.C

index d316ae80db8ea3aaccb7ca4377225b800742829e..5231f3d8bbe00a2d9370bc84d2b1631965eaa149 100644 (file)
@@ -196,7 +196,8 @@ static int LOCAL_QUEUE_ENTRIES=20480;
 #endif
 
 #define BIG_MSG_TAG  0x26
-#define PUT_DONE_TAG      0x29
+#define PUT_DONE_TAG      0x28
+#define DIRECT_PUT_DONE_TAG      0x29
 #define ACK_TAG           0x30
 /* SMSG is data message */
 #define SMALL_DATA_TAG          0x31
@@ -290,6 +291,13 @@ typedef struct control_msg
     struct control_msg *next;
 }CONTROL_MSG;
 
+#ifdef CMK_DIRECT
+typedef struct{
+    void *recverBuf;
+    void (*callbackFnPtr)(void *);
+    void *callbackData;
+}CMK_DIRECT_HEADER;
+#endif
 typedef struct  rmda_msg
 {
     int                   destNode;
@@ -1171,7 +1179,6 @@ static void PumpNetworkSmsg()
     int                 init_flag;
     CONTROL_MSG         *control_msg_tmp, *header_tmp;
     uint64_t            source_addr;
-
 #if CMK_SMP && !COMM_THREAD_SEND
     while(1)
     {
@@ -1287,13 +1294,16 @@ static void PumpNetworkSmsg()
 
 #if CMK_PERSISTENT_COMM
             case PUT_DONE_TAG: //persistent message
-            {
-                void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
-                int size = ((CONTROL_MSG *) header)->length;
-                CmiReference(msg);
-                handleOneRecvedMsg(size, msg); 
-                break;
-            }
+            void *msg = (void *)((CONTROL_MSG *) header)->source_addr;
+            int size = ((CONTROL_MSG *) header)->length;
+            CmiReference(msg);
+            handleOneRecvedMsg(size, msg); 
+            break;
+#endif
+#ifdef CMK_DIRECT
+            case DIRECT_PUT_DONE_TAG:  //cmi direct 
+            (*(((CMK_DIRECT_HEADER*) header)->callbackFnPtr))(((CMK_DIRECT_HEADER*) header)->callbackData);
+           break;
 #endif
             default: {
                 printf("weird tag problem\n");
@@ -1319,6 +1329,8 @@ static void PumpNetworkSmsg()
 static void printDesc(gni_post_descriptor_t *pd)
 {
     printf(" addr=%p, ", pd->local_addr); 
+    printf(" remote addr=%p, ", pd->remote_addr);
+    printf(" local %lld %lld, remote %lld, %lld\n", pd->local_mem_hndl.qword1, pd->local_mem_hndl.qword2, pd->remote_mem_hndl.qword1, pd->remote_mem_hndl.qword2); 
 }
 
 // for BIG_MSG called on receiver side for receiving control message
@@ -1519,7 +1531,9 @@ static void PumpLocalRdmaTransactions()
     MSG_LIST                *ptr;
     CONTROL_MSG             *ack_msg_tmp;
     uint8_t             msg_tag;
-
+#ifdef CMK_DIRECT
+    CMK_DIRECT_HEADER       *cmk_direct_done_msg;
+#endif
 #if CMK_SMP && !COMM_THREAD_SEND
     while(1) {
         CmiLock(tx_cq_lock);
@@ -1546,8 +1560,24 @@ static void PumpLocalRdmaTransactions()
 #if CMK_SMP && !COMM_THREAD_SEND
             CmiUnlock(tx_cq_lock);
 #endif
+#ifdef CMK_DIRECT
+            if(tmp_pd->amo_cmd == 1)
+            {
+                cmk_direct_done_msg = (CMK_DIRECT_HEADER*) malloc(sizeof(CMK_DIRECT_HEADER));
+                cmk_direct_done_msg->callbackFnPtr = (void*)( tmp_pd->first_operand);
+                cmk_direct_done_msg->recverBuf = (void*)(tmp_pd->remote_addr);
+                cmk_direct_done_msg->callbackData = (void*)(tmp_pd->second_operand); 
+            }
+            else{
+                MallocControlMsg(ack_msg_tmp);
+                ack_msg_tmp->source_addr = tmp_pd->remote_addr;
+                ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
+            }
+#else
             MallocControlMsg(ack_msg_tmp);
             ack_msg_tmp->source_addr = tmp_pd->remote_addr;
+            ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
+#endif
             ////Message is sent, free message , put is not used now
             switch (tmp_pd->type) {
 #if CMK_PERSISTENT_COMM
@@ -1559,6 +1589,13 @@ static void PumpLocalRdmaTransactions()
                 CmiFree((void *)tmp_pd->local_addr);
                 msg_tag = PUT_DONE_TAG;
                 break;
+#endif
+#ifdef CMK_DIRECT
+            case GNI_POST_RDMA_PUT:
+            case GNI_POST_FMA_PUT:
+                //sender ACK to receiver to trigger it is done
+                msg_tag = DIRECT_PUT_DONE_TAG;
+                break;
 #endif
             case GNI_POST_RDMA_GET:
             case GNI_POST_FMA_GET:
@@ -1587,11 +1624,20 @@ static void PumpLocalRdmaTransactions()
                 CmiPrintf("type=%d\n", tmp_pd->type);
                 CmiAbort("PumpLocalRdmaTransactions: unknown type!");
             }
-            ack_msg_tmp->source_mem_hndl    = tmp_pd->remote_mem_hndl;
-            status = send_smsg_message(inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
+#if CMK_DIRECT
+            if(tmp_pd->amo_cmd == 1)
+                status = send_smsg_message(inst_id, cmk_direct_done_msg, sizeof(CMK_DIRECT_HEADER), msg_tag, 0); 
+            else
+#endif
+                status = send_smsg_message(inst_id, ack_msg_tmp, sizeof(CONTROL_MSG), msg_tag, 0);  
             if(status == GNI_RC_SUCCESS)
             {
-                FreeControlMsg(ack_msg_tmp);
+#if CMK_DIRECT
+                if(tmp_pd->amo_cmd == 1)
+                    free(cmk_direct_done_msg); 
+                else
+#endif
+                    FreeControlMsg(ack_msg_tmp);
             }
 #if CMK_PERSISTENT_COMM
             if (tmp_pd->type == GNI_POST_RDMA_GET || tmp_pd->type == GNI_POST_FMA_GET)
@@ -1817,6 +1863,16 @@ static int SendBufferMsg()
                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
                 }
                 break;
+#ifdef CMK_DIRECT
+            case DIRECT_PUT_DONE_TAG:
+                status = send_smsg_message( ptr->destNode, ptr->msg, sizeof(CMK_DIRECT_HEADER), ptr->tag, 1);  
+                if(status == GNI_RC_SUCCESS)
+                {
+                    free((CMK_DIRECT_HEADER*)ptr->msg);
+                }
+                break;
+
+#endif
             default:
                 printf("Weird tag\n");
                 CmiAbort("should not happen\n");
@@ -2671,8 +2727,9 @@ int CmiBarrier()
     return status;
 
 }
-
-
+#if CMK_DIRECT
+#include "machine-cmidirect.c"
+#endif
 #if CMK_PERSISTENT_COMM
 #include "machine-persistent.c"
 #endif
index 968513e970a290ddfcc86252cd6693ee78edc0e8..5e7467dbe66d731985c3b082acf6e7523bb51336 100644 (file)
@@ -19,8 +19,10 @@ typedef struct {
 */
 #ifdef CMK_BLUEGENEP
 #include "dcmf.h"
+#elif CMK_CONVERSE_GEMINI_UGNI
+#include "gni_pub.h"
 #endif
-struct infiDirectUserHandle{
+typedef struct infiDirectUserHandle{
     int handle;
     int senderNode;
     int recverNode;
@@ -36,11 +38,17 @@ struct infiDirectUserHandle{
     DCMF_Memregion_t DCMF_recverMemregion;
     DCMF_Memregion_t DCMF_senderMemregion;
     DCMF_Callback_t DCMF_notify_cb;
+#elif  CMK_CONVERSE_GEMINI_UGNI
+    void *senderBuf;
+    void (*callbackFnPtr)(void *);
+    void *callbackData;
+    gni_mem_handle_t    senderMdh;
+    gni_mem_handle_t    recverMdh;
 #else
        char recverKey[64];
 #endif
        double initialValue;
-};
+} CmiDirectUserHandle;
 
 
 /* functions */
@@ -51,7 +59,7 @@ extern "C" {
 /**
  To be called on the receiver to create a handle and return its number
 **/
-struct infiDirectUserHandle CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue);
+CmiDirectUserHandle* CmiDirect_createHandle(int senderNode,void *recvBuf, int recvBufSize, void (*callbackFnPtr)(void *), void *callbackData,double initialValue);
 
 /****
  To be called on the sender to attach the sender's buffer to this handle
index 1da9269831b43800dff4ccccee274ca7e07fd591..b3bde07053e277d1f29ae43894aa34c2afa1206a 100644 (file)
@@ -108,7 +108,7 @@ public:
     arrF[CkArrayIndexFancy("first")].insert(P1);
     arrF[CkArrayIndexFancy("second")].insert(P2);
     arrF.doneInserting();
-    phase=0;
+    phase=6;
     mainProxy.maindone();
     delete m;
   };
@@ -140,7 +140,7 @@ public:
         break;
 #else
       case 6:
-         ngid[0].startRDMA();
+        //ngid[0].startRDMA();
          break;
 #endif
       default:
@@ -195,7 +195,7 @@ class PingN : public CBase_PingN
   int niter;
   int me, nbr;
 #ifdef USE_RDMA 
-  struct infiDirectUserHandle shandle,rhandle;
+  CmiDirectUserHandle *shandle,*rhandle;
   char *rbuff;
   char *sbuff;
 #endif
@@ -212,13 +212,14 @@ public:
 
     niter = 0;
 #ifdef USE_RDMA 
-    rbuff=(char *) malloc(payload*sizeof(char));
-    sbuff=(char *) malloc(payload*sizeof(char));
+    rbuff=(char *) CmiAlloc(payload*sizeof(char));
+    sbuff=(char *) CmiAlloc(payload*sizeof(char));
     bzero(sbuff,payload);
+    bzero(rbuff,payload);
     // setup persistent comm sender and receiver side
     double OOB=9999999999.0;
     rhandle=CmiDirect_createHandle(nbr,rbuff,payload*sizeof(char),PingN::Wrapper_To_CallBack,(void *) this,OOB);
-    thisProxy[nbr].recvHandle((char*) &rhandle,sizeof(struct infiDirectUserHandle));
+    thisProxy[nbr].recvHandle((char*) rhandle,sizeof(CmiDirectUserHandle));
 #endif
   }
   PingN(CkMigrateMessage *m) {}
@@ -226,9 +227,11 @@ public:
   {
 
 #ifdef USE_RDMA 
-    struct infiDirectUserHandle *_shandle=(struct infiDirectUserHandle *) ptr;
-    shandle=*_shandle;
-    CmiDirect_assocLocalBuffer(&shandle,sbuff,payload);
+    CmiDirectUserHandle *_shandle=(CmiDirectUserHandle *) ptr;
+    shandle=_shandle;
+    CmiDirect_assocLocalBuffer(shandle,sbuff,payload);
+    if(CkMyNode() == 0)
+        startRDMA();
 #endif
   }
   void start(void)
@@ -241,7 +244,7 @@ public:
     niter=0;
     start_time = CkWallTimer();
 #ifdef USE_RDMA 
-    CmiDirect_put(&shandle);
+    CmiDirect_put(shandle);
 #else
     CkAbort("do not call startRDMA if you don't actually have RDMA");
 #endif
@@ -280,8 +283,9 @@ public:
   void recvRDMA()
   {
 #ifdef USE_RDMA 
-    CmiDirect_ready(&rhandle);
+    CmiDirect_ready(rhandle);
 #endif
+    CkPrintf("Received on [%d]", CmiMyNode());
     if(me==0) {
       niter++;
       if(niter==iterations) {
@@ -292,14 +296,14 @@ public:
         mainProxy.maindone();
       } else {
 #ifdef USE_RDMA 
-       CmiDirect_put(&shandle);
+       CmiDirect_put(shandle);
 #else
        CkAbort("do not call startRDMA if you don't actually have RDMA");
 #endif
       }
     } else {
 #ifdef USE_RDMA 
-      CmiDirect_put(&shandle);
+      CmiDirect_put(shandle);
 #else
       CkAbort("do not call startRDMA if you don't actually have RDMA");
 #endif