fix bug in node level persistent
authorGengbin Zheng <gzheng@illinois.edu>
Mon, 2 Apr 2012 02:51:27 +0000 (21:51 -0500)
committerGengbin Zheng <gzheng@illinois.edu>
Mon, 2 Apr 2012 02:51:27 +0000 (21:51 -0500)
src/arch/gemini_gni/machine.c
src/arch/util/machine-common-core.c
src/arch/util/persist-comm.c

index 08928d3384d0b2b7d3ef9d7bcacc6e91c4267191..320037d400e70450ae51fad7da257623b62d9b95 100644 (file)
@@ -51,7 +51,7 @@
 
 #if CMK_SMP
 #define MULTI_THREAD_SEND          0
-#define COMM_THREAD_SEND           1
+#define COMM_THREAD_SEND           (!MULTI_THREAD_SEND)
 #endif
 
 #if MULTI_THREAD_SEND
@@ -64,6 +64,7 @@
 #define CMI_EXERT_SEND_CAP     0
 #define        CMI_EXERT_RECV_CAP      0
 #define CMI_EXERT_RDMA_CAP      0
+
 #if CMI_EXERT_SEND_CAP
 int SEND_large_cap = 100;
 int SEND_large_pending = 0;
@@ -159,11 +160,13 @@ static CmiInt8  MAX_REG_MEM    =  25*oneMB;
 
 #if MULTI_THREAD_SEND 
 #define     CMI_GNI_LOCK(x)       CmiLock(x);
+#define     CMI_GNI_TRYLOCK(x)       CmiTryLock(x)
 #define     CMI_GNI_UNLOCK(x)        CmiUnlock(x);
 #define     CMI_PCQUEUEPOP_LOCK(Q)   CmiLock((Q)->lock);
 #define     CMI_PCQUEUEPOP_UNLOCK(Q)    CmiUnlock((Q)->lock);
 #else
 #define     CMI_GNI_LOCK(x)
+#define     CMI_GNI_TRYLOCK(x)         (0)
 #define     CMI_GNI_UNLOCK(x)
 #define     CMI_PCQUEUEPOP_LOCK(Q)   
 #define     CMI_PCQUEUEPOP_UNLOCK(Q)
@@ -466,7 +469,7 @@ typedef struct  rmda_msg
 
 
 #if CMK_SMP
-#define SMP_LOCKS               0
+#define SMP_LOCKS                       0
 #define ONE_SEND_QUEUE                  0
 PCQueue sendRdmaBuf;
 typedef struct  msg_list_index
@@ -713,8 +716,7 @@ static void IndexPool_init(IndexPool *pool)
 static
 inline int IndexPool_getslot(IndexPool *pool, void *addr, int type)
 {
-    int i;
-    int s;
+    int s, i;
 #if MULTI_THREAD_SEND  
     CmiLock(pool->lock);
 #endif
@@ -1838,7 +1840,7 @@ void LrtsFreeListSendFn(int npes, int *pes, int len, char *msg)
       return;
   }
 #if CMK_PERSISTENT_COMM
-  if (CpvAccess(phs) && len > 1024) {
+  if (CpvAccess(phs) && len > PERSIST_MIN_SIZE) {
       int i;
       for(i=0;i<npes;i++) {
         if (pes[i] == CmiMyPe())
@@ -2035,13 +2037,13 @@ void LrtsPostNonLocal(){
     PumpRemoteTransactions();
 #endif
 
+#if CMK_WORKER_SINGLE_TASK
+    if (CmiMyRank() % 6 == 4)
+#endif
 #if CMK_USE_OOB
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
     {
-#if CMK_WORKER_SINGLE_TASK
-    if (CmiMyRank() % 6 == 4)
-#endif
         SendBufferMsg(&smsg_queue);
     }
 
@@ -2665,9 +2667,11 @@ static void PumpRemoteTransactions()
     int                     inst_id, index, type, size;
 
     while(1) {
-        CMI_GNI_LOCK(global_gni_lock)
+        CMI_GNI_LOCK(rdma_tx_cq_lock)
+//        CMI_GNI_LOCK(global_gni_lock)
         status = GNI_CqGetEvent(rdma_rx_cqh, &ev);
-        CMI_GNI_UNLOCK(global_gni_lock)
+//        CMI_GNI_UNLOCK(global_gni_lock)
+        CMI_GNI_UNLOCK(rdma_tx_cq_lock)
 
         if(status != GNI_RC_SUCCESS) break;
 
@@ -2677,10 +2681,12 @@ static void PumpRemoteTransactions()
         switch (type) {
         case 0:    // ACK
             CmiAssert(index>=0 && index<ackPool.size);
+            CMI_GNI_LOCK(ackPool.lock);
             CmiAssert(GetIndexType(ackPool, index) == 1);
             msg = GetIndexAddress(ackPool, index);
+            CMI_GNI_UNLOCK(ackPool.lock);
 #if PRINT_SYH
-        printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
+            printf("[%d] PumpRemoteTransactions: ack: %p index: %d type: %d.\n", myrank, GetMempoolBlockPtr(msg), index, type);
 #endif
 #if ! USE_LRTS_MEMPOOL
            // MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(((ACK_MSG *)header)->source_mem_hndl), &omdh, ((ACK_MSG *)header)->length);
@@ -2924,7 +2930,7 @@ static void  SendRdmaMsg()
     for (i=0; i<len; i++)
     {
 #if CMI_EXERT_RDMA_CAP
-         if( RDMA_pending >= RDMA_cap) break;
+        if( RDMA_pending >= RDMA_cap) break;
 #endif
         CMI_PCQUEUEPOP_LOCK(sendRdmaBuf)
         ptr = (RDMA_REQUEST*)PCQueuePop(sendRdmaBuf);
@@ -3251,7 +3257,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
         if(!PCQueueEmpty(current_queue) && current_list->pushed == 0)
         {
             current_list->pushed = 1;
-            PCQueuePush(nonEmptyQueues, current_list);
+            PCQueuePush(nonEmptyQueues, (char*)current_list);
         }
         CmiUnlock(current_list->lock); 
 #endif
index 59cfbd6fd4cfdb1edac87e9da80b1a2d045faa20..a950e5ea1d01abc5dba0e703c928ceeb66fc0fdf 100644 (file)
@@ -573,7 +573,7 @@ if (  MSG_STATISTIC)
         msg_histogram[ret_log]++;
 }
 #endif
-        return LrtsSendFunc(destPE, size, msg, P2P_ASYNC);
+        return CmiSendNetworkFunc(destPE, size, msg, P2P_ASYNC);
     }
 }
 #endif
@@ -613,8 +613,11 @@ if (  MSG_STATISTIC)
     msg_histogram[ret_log]++;
 }
 #endif
-        LrtsSendFunc(destNode, size, msg, P2P_SYNC);
+        CmiSendNetworkFunc(destNode, size, msg, P2P_SYNC);
     }
+#if CMK_PERSISTENT_COMM
+    if (CpvAccess(phs)) CpvAccess(curphs)++;
+#endif
 }
 #endif
 
@@ -632,7 +635,7 @@ if (  MSG_STATISTIC)
         msg_histogram[ret_log]++;
 }
 #endif
-        return LrtsSendFunc(destNode, size, msg, P2P_ASYNC);
+        return CmiSendNetworkFunc(destNode, size, msg, P2P_ASYNC);
     }
 }
 #endif
index 906fc4a1f91f91dc8a4ba63841435a2c872909bc..42c7a743fb9d884ddfb36a417c0dc05a278a0959 100644 (file)
@@ -183,7 +183,7 @@ PersistentHandle CmiCreateNodePersistent(int destNode, int maxBytes)
     /* randomly pick one rank on the destination node is fine for setup.
        actual message will be handled by comm thread anyway */
   int pe = CmiNodeFirst(destNode) + rand()/RAND_MAX * CmiMyNodeSize();
-  return CmiCreatePersistent(CmiNodeFirst(destNode), maxBytes);
+  return CmiCreatePersistent(pe, maxBytes);
 }
 
 static void persistentRequestHandler(void *env)