merge first
authorYanhua Sun <yanhuas@jyc1.(none)>
Sat, 18 Feb 2012 00:10:01 +0000 (18:10 -0600)
committerYanhua Sun <yanhuas@jyc1.(none)>
Sat, 18 Feb 2012 00:10:01 +0000 (18:10 -0600)
src/arch/gemini_gni/machine.c
src/arch/util/machine.h

index 4cd665e8f9facec0a72f9a813d9f4e43bf22d8e7..13bc442ef5c55e19b427eb0d1e5b07fddc1dbc35 100644 (file)
@@ -166,14 +166,17 @@ onesided_md_t    omdh;
 #else
 uint8_t   onesided_hnd, omdh;
 #if REMOTE_EVENT
-#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status) status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
-      if(status == GNI_RC_SUCCESS) register_mempool += size; 
+#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size>= MAX_REG_MEM) { \
+                                                                                         status = GNI_RC_ERROR_NOMEM;} \
+        else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
+                if(status == GNI_RC_SUCCESS) register_memory_size += size;} 
 #else
 #define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
-      if(status == GNI_RC_SUCCESS) register_mempool += size;
+      if(status == GNI_RC_SUCCESS) register_memory_size += size;
 #endif
-#define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  GNI_MemDeregister(nic_hndl, (mem_hndl)); \
-         register_mempool -= size;
+#define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size, status)  status = GNI_MemDeregister(nic_hndl, (mem_hndl)); \
+                                                                                if( status == GNI_RC_SUCCESS) register_memory_size -= size; \
+                                                                                else GNI_RC_CHECK(status, "deregister");
 #endif
 
 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
@@ -653,7 +656,7 @@ static void SendRdmaMsg();
 static void PumpNetworkSmsg();
 static void PumpLocalRdmaTransactions();
 static int SendBufferMsg();
-static     int register_mempool = 0;
+static     int register_memory_size = 0;
 
 #if MACHINE_DEBUG_LOG
 FILE *debugLog = NULL;
@@ -675,37 +678,35 @@ printf("[n %d] sweep_mempool slot END .\n", myrank);
 }
 
 inline
-static  gni_return_t deregisterMempool(mempool_type *mptr, block_header **from)
+static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
 {
     gni_return_t status;
     block_header *current = *from;
 
-    while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
-       current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
+    while(register_memory_size>= MAX_REG_MEM)
+    {
+        while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
+            current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
 
-    *from = current;
-    if(current == NULL) return GNI_RC_ERROR_RESOURCE;
-    MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current));
-    SetMemHndlZero(GetMemHndlFromHeader(current));
+        if(current == NULL) return GNI_RC_ERROR_RESOURCE;
+        *from = current;
+        MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current), status)
+        SetMemHndlZero(GetMemHndlFromHeader(current));
+    }
     return GNI_RC_SUCCESS;
 }
 
 inline 
-static gni_return_t registerFromMempool(mempool_type *mptr, void *msg)
+static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int size, gni_mem_handle_t  *memhndl)
 {
     gni_return_t status = GNI_RC_SUCCESS;
-    int size = GetMempoolsize(msg);
-    void *blockaddr = GetMempoolBlockPtr(msg);
-    gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
+    //int size = GetMempoolsize(msg);
+    //void *blockaddr = GetMempoolBlockPtr(msg);
+    //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
    
     block_header *current = &(mptr->block_head);
 
-    while(register_mempool > MAX_REG_MEM)
-    {
-        status = deregisterMempool(mptr, &current);
-        if (status != GNI_RC_SUCCESS) break;
-    };
-    MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_mempool); 
+    MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
     while(1)
     {
         MEMORY_REGISTER(onesided_hnd, nic_hndl, blockaddr, size, memhndl, &omdh, status)
@@ -719,7 +720,7 @@ static gni_return_t registerFromMempool(mempool_type *mptr, void *msg)
         }
         else
         {
-            status = deregisterMempool(mptr, &current);
+            status = deregisterMemory(mptr, &current);
             if (status != GNI_RC_SUCCESS) break;
         }
     }; 
@@ -727,22 +728,23 @@ static gni_return_t registerFromMempool(mempool_type *mptr, void *msg)
 }
 
 inline 
-static gni_return_t registerMempool(void *msg)
+static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
 {
     static int rank = -1;
     int i;
     gni_return_t status;
-    mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
+    mempool_type *mptr1 = CpvAccess(mempool);//mempool_type*)GetMempoolPtr(msg);
+    //mempool_type *mptr1 = (mempool_type*)GetMempoolPtr(msg);
     mempool_type *mptr;
 
-    status = registerFromMempool(mptr1, msg);
+    status = registerFromMempool(mptr1, msg, size, t);
     if (status == GNI_RC_SUCCESS) return status;
 #if CMK_SMP
     for (i=0; i<CmiMyNodeSize()+1; i++) {
-      rank = (rank+1)%(CmiMyNodeSize()+1);
-      mptr = CpvAccessOther(mempool, rank);
+      //rank = (rank+1)%(CmiMyNodeSize()+1);
+      mptr = CpvAccessOther(mempool, i);
       if (mptr == mptr1) continue;
-      status = registerFromMempool(mptr, msg);
+      status = registerFromMempool(mptr, msg, size, t);
       if (status == GNI_RC_SUCCESS) return status;
     }
 #endif
@@ -1008,23 +1010,25 @@ static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL
     int                 offset = 0;
     uint64_t            source_addr;
     int                 register_size; 
+    void                *msg;
 
     size    =   control_msg_tmp->total_length;
     source_addr = control_msg_tmp->source_addr;
     register_size = control_msg_tmp->length;
 
 #if     USE_LRTS_MEMPOOL
+    if(buffered_send_msg >= MAX_BUFF_SEND)
+    {
+        if(!inbuff)
+            buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+        return GNI_RC_ERROR_NOMEM;
+    }
     if( control_msg_tmp->seq_id == 0 ){
-        if(buffered_send_msg >= MAX_BUFF_SEND)
-        {
-            if(!inbuff)
-                buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
-            return GNI_RC_ERROR_NOMEM;
-        }
         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
         {
             //register the corresponding mempool
-            status = registerMempool((void*)source_addr);
+            msg = (void*)source_addr;
+            status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
             if(status == GNI_RC_SUCCESS)
             {
                 control_msg_tmp->source_mem_hndl = GetMemHndl(source_addr);
@@ -1043,8 +1047,9 @@ static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL
         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
         source_addr += offset;
         size = control_msg_tmp->length;
-        MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
-        register_size = 0;  
+        status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl)); 
+        //MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
+        register_size = ALIGN64(size);  
     }
 
     if(status == GNI_RC_SUCCESS)
@@ -1058,7 +1063,7 @@ static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL
                 IncreaseMsgInSend(source_addr);
             }
             FreeControlMsg(control_msg_tmp);
-            MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_mempool, LMSG_INIT_TAG); 
+            MACHSTATE5(8, "GO SMSG LARGE to %d (%d,%d,%d) tag=%d\n", destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, LMSG_INIT_TAG); 
         }else
             status = GNI_RC_ERROR_RESOURCE;
 
@@ -1105,6 +1110,7 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
     int                 oob = ( mode & OUT_OF_BAND);
     SMSG_QUEUE          *queue;
 
+    MACHSTATE5(8, "GO LrtsSendFn %d(%d) (%d,%d, %d) \n", destNode, size, buffered_send_msg, buffered_recv_msg, register_memory_size); 
 #if CMK_USE_OOB
     queue = oob? &smsg_oob_queue : &smsg_queue;
 #else
@@ -1415,7 +1421,7 @@ static void PumpNetworkSmsg()
 #endif
                 if(NoMsgInSend(msg))
                     buffered_send_msg -= GetMempoolsize(msg);
-                MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
+                MACHSTATE5(8, "GO send done to %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
                 CmiFree((void*)((CONTROL_MSG *) header)->source_addr);
                 break;
             }
@@ -1426,6 +1432,7 @@ static void PumpNetworkSmsg()
                 int cur_seq = CmiGetMsgSeq(msg);
                 int offset = ONE_SEG*(cur_seq+1);
                 MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(header_tmp->source_mem_hndl), &omdh, header_tmp->length);
+                buffered_send_msg -= header_tmp->length;
                 int remain_size = CmiGetMsgSize(msg) - header_tmp->length;
                 if (remain_size < 0) remain_size = 0;
                 CmiSetMsgSize(msg, remain_size);
@@ -1515,7 +1522,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     // initial a get to transfer data from the sender side */
     request_msg = (CONTROL_MSG *) header;
     size = request_msg->total_length;
-    MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
+    MACHSTATE4(8, "GO Get request from %d (%d,%d, %d) \n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
     if(request_msg->seq_id < 2)   {
         msg_data = CmiAlloc(size);
         CmiSetMsgSeq(msg_data, 0);
@@ -1534,7 +1541,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
         transaction_size = ALIGN64(size);
         if(IsMemHndlZero(pd->local_mem_hndl))
         {   
-            status = registerMempool((void*)(msg_data));
+            status = registerMemory( GetMempoolBlockPtr(msg_data), GetMempoolsize(msg_data), &(GetMemHndl(msg_data)));
             if(status == GNI_RC_SUCCESS)
             {
                 pd->local_mem_hndl = GetMemHndl(msg_data);
@@ -1551,7 +1558,8 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     }
     else{
         transaction_size = ALIGN64(request_msg->length);
-        MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh, status)
+        status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
+        //MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh, status)
         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
         {
             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
@@ -1597,7 +1605,7 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
             {
 #if MACHINE_DEBUG_LOG
                 buffered_recv_msg += register_size;
-                MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool); 
+                MACHSTATE4(8, "GO request from %d (%d,%d, %d)\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size); 
 #endif
                 IncreaseMsgInRecv(msg_data);
 
@@ -1818,7 +1826,7 @@ static void PumpLocalRdmaTransactions()
 #if MACHINE_DEBUG_LOG
                     if(NoMsgInRecv((void*)(tmp_pd->local_addr)))
                         buffered_recv_msg -= GetMempoolsize((void*)(tmp_pd->local_addr));
-                    MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_mempool, msg_tag); 
+                    MACHSTATE5(8, "GO Recv done ack send from %d (%d,%d, %d) tag=%d\n", inst_id, buffered_send_msg, buffered_recv_msg, register_memory_size, msg_tag); 
 #endif
 #if CMK_SMP_TRACE_COMMTHREAD
                     TRACE_COMM_CREATION(CpvAccess(projTraceStart), (void*)tmp_pd->local_addr);
@@ -1857,7 +1865,7 @@ static void  SendRdmaMsg()
     RDMA_REQUEST *ptr = 0, *tmp_ptr;
     RDMA_REQUEST *pre = 0;
     int i, register_size = 0;
-
+    void                    *msg;
 #if CMK_SMP
     int len = PCQueueLength(sendRdmaBuf);
     for (i=0; i<len; i++)
@@ -1869,7 +1877,7 @@ static void  SendRdmaMsg()
     while (ptr!=0)
     {
 #endif 
-        MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool); 
+        MACHSTATE4(8, "noempty-rdma  %d (%lld,%lld,%d) \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size); 
         gni_post_descriptor_t *pd = ptr->pd;
         status = GNI_RC_SUCCESS;
         
@@ -1877,7 +1885,8 @@ static void  SendRdmaMsg()
         {
             if(IsMemHndlZero((GetMemHndl(pd->local_addr))))
             {
-                status = registerMempool((void*)(pd->local_addr));
+                msg = (void*)(pd->local_addr);
+                status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
                 if(status == GNI_RC_SUCCESS)
                 {
                     pd->local_mem_hndl = GetMemHndl((void*)(pd->local_addr));
@@ -1892,7 +1901,8 @@ static void  SendRdmaMsg()
                 register_size = 0;
         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
         {
-            MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh, status)
+            status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
+            //MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh, status)
         }
         if(status == GNI_RC_SUCCESS)        //mem register good
         {
@@ -1981,7 +1991,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
         for (i=0; i<len; i++) 
         {
             ptr = (MSG_LIST*)PCQueuePop(queue->smsg_msglist_index[index].sendSmsgBuf);
-            if (ptr == NULL) break;
+            if (ptr == 0) break;
 #else
     while(index != -1)
     {
@@ -2098,7 +2108,7 @@ static void ProcessDeadlock();
 void LrtsAdvanceCommunication(int whileidle)
 {
     /*  Receive Msg first */
-    MACHSTATE(8, "calling advance comm \n") ; 
+    //MACHSTATE(8, "calling advance comm \n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     double startT, endT;
 #endif
@@ -2118,7 +2128,7 @@ void LrtsAdvanceCommunication(int whileidle)
     startT = CmiWallTimer();
 #endif
     PumpNetworkSmsg();
-    MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
+    //MACHSTATE(8, "after PumpNetworkSmsg \n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(20, startT, endT);
@@ -2128,7 +2138,7 @@ void LrtsAdvanceCommunication(int whileidle)
     startT = CmiWallTimer();
 #endif
     PumpLocalRdmaTransactions();
-    MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
+    //MACHSTATE(8, "after PumpLocalRdmaTransactions\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(30, startT, endT);
@@ -2141,7 +2151,7 @@ void LrtsAdvanceCommunication(int whileidle)
     if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
     SendBufferMsg(&smsg_queue);
-    MACHSTATE(8, "after SendBufferMsg\n") ; 
+    //MACHSTATE(8, "after SendBufferMsg\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(40, startT, endT);
@@ -2151,7 +2161,7 @@ void LrtsAdvanceCommunication(int whileidle)
     startT = CmiWallTimer();
 #endif
     SendRdmaMsg();
-    MACHSTATE(8, "after SendRdmaMsg\n") ; 
+    //MACHSTATE(8, "after SendRdmaMsg\n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
@@ -2268,10 +2278,10 @@ static void _init_static_smsg()
             GNI_MEM_READWRITE,   
             vmdh_index,
             &my_smsg_mdh_mailbox);
-    register_mempool += smsg_memlen*(mysize);
+    register_memory_size += smsg_memlen*(mysize);
     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
 
-    if (myrank == 0 && register_mempool>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
+    if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
 
     base_infor.addr =  (uint64_t)smsg_mailbox_base;
     base_infor.mdh =  my_smsg_mdh_mailbox;
index ce3e6c27fd1f508f30997c65a7e7067346c2e4c7..9ac520b37bf796ed642abd501321bdd18b6f0b62 100644 (file)
@@ -50,7 +50,7 @@ extremely verbose, 2 shows most procedure entrance/exits,
 3 shows most communication, and 5 only shows rare or unexpected items.
 Displaying lower priority messages doesn't stop higher priority ones.
 */
-#define MACHINE_DEBUG_PRIO 3
+#define MACHINE_DEBUG_PRIO 8
 #define MACHINE_DEBUG_LOG 1 /**Controls whether output goes to log file*/
 
 extern FILE *debugLog;