fixed potential bugs with mem register
authorYanhua Sun <yanhuas@jyc1.(none)>
Sat, 18 Feb 2012 08:30:20 +0000 (02:30 -0600)
committerYanhua Sun <yanhuas@jyc1.(none)>
Sat, 18 Feb 2012 08:30:20 +0000 (02:30 -0600)
src/arch/gemini_gni/machine.c

index 13bc442ef5c55e19b427eb0d1e5b07fddc1dbc35..552b064b21cbf056e28eaac92e3d46e454a852ae 100644 (file)
@@ -171,12 +171,13 @@ uint8_t   onesided_hnd, omdh;
         else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, smsg_rx_cqh,  GNI_MEM_READWRITE, -1, mem_hndl); \
                 if(status == GNI_RC_SUCCESS) register_memory_size += size;} 
 #else
-#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status ) status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
-      if(status == GNI_RC_SUCCESS) register_memory_size += size;
+#define  MEMORY_REGISTER(handler, nic_hndl, msg, size, mem_hndl, myomdh, status)    if(register_memory_size>= MAX_REG_MEM) { \
+                                                                                         status = GNI_RC_ERROR_NOMEM;} \
+        else {status = GNI_MemRegister(nic_hndl, (uint64_t)msg,  (uint64_t)size, NULL,  GNI_MEM_READWRITE, -1, mem_hndl); \
+                if(status == GNI_RC_SUCCESS) register_memory_size += size;} 
 #endif
-#define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size, status)  status = GNI_MemDeregister(nic_hndl, (mem_hndl)); \
-                                                                                if( status == GNI_RC_SUCCESS) register_memory_size -= size; \
-                                                                                else GNI_RC_CHECK(status, "deregister");
+#define  MEMORY_DEREGISTER(handler, nic_hndl, mem_hndl, myomdh, size)  if( GNI_MemDeregister(nic_hndl, (mem_hndl) ) == GNI_RC_SUCCESS) register_memory_size -= size; \
+                                                                                                                           else CmiAbort("MEM_DEregister");;
 #endif
 
 #define   IncreaseMsgInRecv(x) (((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv)++
@@ -194,8 +195,8 @@ uint8_t   onesided_hnd, omdh;
 #define   NoMsgInSend(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send == 0
 #define   NoMsgInRecv(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv == 0
 #define   NoMsgInFlight(x)  ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_send + ((block_header*)(((mempool_header*)((char*)x-ALIGNBUF))->mempool_ptr))->msgs_in_recv  == 0
-#define   IsMemHndlZero(x)  (x.qword1 == 0 && x.qword2 == 0)
-#define   SetMemHndlZero(x)  x.qword1 = 0; x.qword2 = 0
+#define   IsMemHndlZero(x)  ((x).qword1 == 0 && (x).qword2 == 0)
+#define   SetMemHndlZero(x)  (x).qword1 = 0; (x).qword2 = 0 ;
 #define   NotRegistered(x)  IsMemHndlZero(((block_header*)x)->mem_hndl)
 
 #define CmiGetMsgSize(m)  ((CmiMsgHeaderExt*)m)->size
@@ -683,16 +684,16 @@ static  gni_return_t deregisterMemory(mempool_type *mptr, block_header **from)
     gni_return_t status;
     block_header *current = *from;
 
-    while(register_memory_size>= MAX_REG_MEM)
-    {
+    //while(register_memory_size>= MAX_REG_MEM)
+    //{
         while( current!= NULL && ((current->msgs_in_send+current->msgs_in_recv)>0 || IsMemHndlZero(current->mem_hndl) ))
             current = current->block_next?(block_header *)((char*)mptr+current->block_next):NULL;
 
         if(current == NULL) return GNI_RC_ERROR_RESOURCE;
         *from = current;
-        MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current), status)
+        MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &(GetMemHndlFromHeader(current)) , &omdh, GetSizeFromHeader(current));
         SetMemHndlZero(GetMemHndlFromHeader(current));
-    }
+    //}
     return GNI_RC_SUCCESS;
 }
 
@@ -705,6 +706,12 @@ static gni_return_t registerFromMempool(mempool_type *mptr, void *blockaddr, int
     //gni_mem_handle_t  *memhndl =   &(GetMemHndl(msg));
    
     block_header *current = &(mptr->block_head);
+    while(register_memory_size>= MAX_REG_MEM)
+    {
+        status = deregisterMemory(mptr, &current);
+        if (status != GNI_RC_SUCCESS) break;
+    }
+    if(register_memory_size>= MAX_REG_MEM) return status;
 
     MACHSTATE3(8, "mempool (%lld,%lld,%d) \n", buffered_send_msg, buffered_recv_msg, register_memory_size); 
     while(1)
@@ -741,8 +748,8 @@ static gni_return_t registerMemory(void *msg, int size, gni_mem_handle_t *t)
     if (status == GNI_RC_SUCCESS) return status;
 #if CMK_SMP
     for (i=0; i<CmiMyNodeSize()+1; i++) {
-      //rank = (rank+1)%(CmiMyNodeSize()+1);
-      mptr = CpvAccessOther(mempool, i);
+      rank = (rank+1)%(CmiMyNodeSize()+1);
+      mptr = CpvAccessOther(mempool, rank);
       if (mptr == mptr1) continue;
       status = registerFromMempool(mptr, msg, size, t);
       if (status == GNI_RC_SUCCESS) return status;
@@ -1017,15 +1024,15 @@ static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL
     register_size = control_msg_tmp->length;
 
 #if     USE_LRTS_MEMPOOL
-    if(buffered_send_msg >= MAX_BUFF_SEND)
-    {
-        if(!inbuff)
-            buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
-        return GNI_RC_ERROR_NOMEM;
-    }
     if( control_msg_tmp->seq_id == 0 ){
         if(IsMemHndlZero(GetMemHndl(source_addr))) //it is in mempool, it is possible to be de-registered by others
         {
+            if(buffered_send_msg >= MAX_BUFF_SEND)
+            {
+                if(!inbuff)
+                    buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+                return GNI_RC_ERROR_NOMEM;
+            }
             //register the corresponding mempool
             msg = (void*)source_addr;
             status = registerMemory(GetMempoolBlockPtr(msg), GetMempoolsize(msg), &(GetMemHndl(msg)));
@@ -1047,9 +1054,23 @@ static gni_return_t send_large_messages(SMSG_QUEUE *queue, int destNode, CONTROL
         int offset = ONE_SEG*(control_msg_tmp->seq_id-1);
         source_addr += offset;
         size = control_msg_tmp->length;
-        status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl)); 
-        //MEMORY_REGISTER(onesided_hnd, nic_hndl, source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl), &omdh, status)
-        register_size = ALIGN64(size);  
+        
+        if(IsMemHndlZero(control_msg_tmp->source_mem_hndl))
+        {
+            if(buffered_send_msg >= MAX_BUFF_SEND)
+            {
+                if(!inbuff)
+                    buffer_small_msgs(queue, control_msg_tmp, sizeof(CONTROL_MSG), destNode, LMSG_INIT_TAG);
+                return GNI_RC_ERROR_NOMEM;
+            }
+            status = registerMemory((void*)source_addr, ALIGN64(size), &(control_msg_tmp->source_mem_hndl));
+            if(status == GNI_RC_SUCCESS) buffered_send_msg += ALIGN64(size);
+        }
+        else
+        {
+            status = GNI_RC_SUCCESS;
+        }
+        register_size = 0;  
     }
 
     if(status == GNI_RC_SUCCESS)
@@ -1180,6 +1201,7 @@ static void ProcessDeadlock()
 //printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
     if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
     mysum = smsg_send_count + smsg_recv_count;
+    MACHSTATE5(9,"Before allgather Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
     status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
     GNI_RC_CHECK("PMI_Allgather", status);
     sum = 0;
@@ -1189,6 +1211,7 @@ static void ProcessDeadlock()
     else
         count = 0;
     last = sum;
+    MACHSTATE5(9,"Progress Deadlock (%d,%d)  (%d,%d)(%d)\n", buffered_send_msg, register_memory_size, last, sum, count); 
     if (count == 2) { 
         /* detected twice, it is a real deadlock */
         if (myrank == 0)  {
@@ -1208,10 +1231,13 @@ static void CheckProgress()
 #if !CMK_SMP
         if (_detected_hang) ProcessDeadlock();
 #endif
+
     }
     else {
+        //MACHSTATE5(9,"--Check Progress %d(%d, %d) (%d,%d)\n", mycount, buffered_send_msg, register_memory_size, smsg_send_count, smsg_recv_count); 
         last_smsg_send_count = smsg_send_count;
         last_smsg_recv_count = smsg_recv_count;
+        _detected_hang = 0;
     }
 }
 
@@ -1342,7 +1368,7 @@ static void PumpNetworkSmsg()
     uint64_t            inst_id;
     int                 ret;
     gni_cq_entry_t      event_data;
-    gni_return_t        status;
+    gni_return_t        status, status2;
     void                *header;
     uint8_t             msg_tag;
     int                 msg_nbytes;
@@ -1447,6 +1473,7 @@ static void PumpNetworkSmsg()
                     control_msg_tmp->dest_addr    = header_tmp->dest_addr;
                     control_msg_tmp->total_length   = header_tmp->total_length; 
                     control_msg_tmp->length         = header_tmp->total_length - offset;
+                    SetMemHndlZero(control_msg_tmp->source_mem_hndl); 
                     if (control_msg_tmp->length >= ONE_SEG) control_msg_tmp->length = ONE_SEG;
                     control_msg_tmp->seq_id         = cur_seq+1+1;
                     //send next seg
@@ -1559,7 +1586,6 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
     else{
         transaction_size = ALIGN64(request_msg->length);
         status = registerMemory(msg_data, transaction_size, &(pd->local_mem_hndl)); 
-        //MEMORY_REGISTER(onesided_hnd, nic_hndl, msg_data, transaction_size, &(pd->local_mem_hndl), &omdh, status)
         if (status == GNI_RC_INVALID_PARAM || status == GNI_RC_PERMISSION_ERROR) 
         {
             GNI_RC_CHECK("Invalid/permission Mem Register in post", status);
@@ -1613,14 +1639,14 @@ static void getLargeMsgRequest(void* header, uint64_t inst_id )
         }
     }else
     {
-        SetMemHndlZero(pd->local_mem_hndl);
+        SetMemHndlZero((pd->local_mem_hndl));
     }
     if(status == GNI_RC_ERROR_RESOURCE|| status == GNI_RC_ERROR_NOMEM )
     {
         bufferRdmaMsg(inst_id, pd); 
     }else {
-        /* printf("source: %d pd:%p\n", source, pd); */
-        GNI_RC_CHECK("AFter posting", status);
+         //printf("source: %d pd:(%p,%p)(%p,%p)\n", source, (pd->local_mem_hndl).qword1, (pd->local_mem_hndl).qword2, (pd->remote_mem_hndl).qword1, (pd->remote_mem_hndl).qword2);
+        GNI_RC_CHECK("GetLargeAFter posting", status);
     }
 #else
     CONTROL_MSG         *request_msg;
@@ -1778,6 +1804,7 @@ static void PumpLocalRdmaTransactions()
                 {
                     msg_tag = BIG_MSG_TAG; 
                     MEMORY_DEREGISTER(onesided_hnd, nic_hndl, &tmp_pd->local_mem_hndl, &omdh, tmp_pd->length)
+                    
                     ack_msg_tmp->dest_addr = tmp_pd->local_addr - ONE_SEG*(ack_msg_tmp->seq_id-1);
                     ack_msg_tmp->source_addr -= ONE_SEG*(ack_msg_tmp->seq_id-1);
                 } 
@@ -1902,7 +1929,6 @@ static void  SendRdmaMsg()
         }else if( IsMemHndlZero(pd->local_mem_hndl)) //big msg, can not fit into memory pool
         {
             status = registerMemory((void*)(pd->local_addr), pd->length, &(pd->local_mem_hndl)); 
-            //MEMORY_REGISTER(onesided_hnd, nic_hndl, pd->local_addr, pd->length, &(pd->local_mem_hndl), &omdh, status)
         }
         if(status == GNI_RC_SUCCESS)        //mem register good
         {
@@ -2000,7 +2026,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
         while(ptr != 0)
         {
 #endif
-            MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool, ptr->tag); 
+            MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_memory_size, ptr->tag); 
             status = GNI_RC_ERROR_RESOURCE;
             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
                 /* connection not exists yet */
@@ -2104,11 +2130,9 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
 }
 
 static void ProcessDeadlock();
-
 void LrtsAdvanceCommunication(int whileidle)
 {
     /*  Receive Msg first */
-    //MACHSTATE(8, "calling advance comm \n") ; 
 #if CMK_SMP_TRACE_COMMTHREAD
     double startT, endT;
 #endif
@@ -2281,6 +2305,7 @@ static void _init_static_smsg()
     register_memory_size += smsg_memlen*(mysize);
     GNI_RC_CHECK("GNI_GNI_MemRegister mem buffer", status);
 
+    if (myrank == 0)  printf("Charm++> SMSG memory: %d\n", smsg_memlen*(mysize));
     if (myrank == 0 && register_memory_size>=MAX_REG_MEM ) printf("Charm++> FATAL ERROR your program has risk of hanging \n please set CHARM_UGNI_MEMPOOL_MAX  to a larger value or use Dynamic smsg\n");
 
     base_infor.addr =  (uint64_t)smsg_mailbox_base;
@@ -2739,7 +2764,7 @@ void* LrtsAlloc(int n_bytes, int header)
         CmiAssert(header+sizeof(mempool_header) <= ALIGNBUF);
 #if     USE_LRTS_MEMPOOL
         n_bytes = ALIGN64(n_bytes);
-        if(n_bytes <= BIG_MSG)
+        if(n_bytes < BIG_MSG)
         {
             char *res = mempool_malloc(CpvAccess(mempool), ALIGNBUF+n_bytes-sizeof(mempool_header), 1);
             ptr = res - sizeof(mempool_header) + ALIGNBUF - header;
@@ -2763,7 +2788,7 @@ void  LrtsFree(void *msg)
     int size = SIZEFIELD((char*)msg+sizeof(CmiChunkHeader));
     if (size <= SMSG_MAX_MSG)
       free(msg);
-    else if(size>BIG_MSG)
+    else if(size>=BIG_MSG)
     {
         free((char*)msg + sizeof(CmiChunkHeader) - ALIGNBUF);