Merge branch 'charm' of charmgit:charm into charm
authorGengbin Zheng <gzheng@illinois.edu>
Sat, 18 Feb 2012 00:02:08 +0000 (18:02 -0600)
committerGengbin Zheng <gzheng@illinois.edu>
Sat, 18 Feb 2012 00:02:08 +0000 (18:02 -0600)
src/arch/gemini_gni/machine.c

index e61a7a231060a8d66ac29113cc183c141dc04590..4cd665e8f9facec0a72f9a813d9f4e43bf22d8e7 100644 (file)
 
     CHARM_UGNI_MEMPOOL_MAX can be maximum_register_mem/number_of_processes
     CHARM_UGNI_SEND_MAX can be half of CHARM_UGNI_MEMPOOL_MAX
+
+    other environment variables:
+
+    export CHARM_UGNI_NO_DEADLOCK_CHECK=yes      # disable checking deadlock
  */
 /*@{*/
 
@@ -30,8 +34,6 @@
 
 #include "converse.h"
 
-#define USE_OOB                     0
-
 #define PRINT_SYH  0
 
 // Trace communication thread
@@ -74,7 +76,8 @@ CpvStaticDeclare(double, projTraceStart);
 #define RECV_CAP 2
 #endif
 
-
+static int _checkProgress = 1;             /* check deadlock */
+static int _detected_hang = 0;
 
 #define             SMSG_ATTR_SIZE      sizeof(gni_smsg_attr_t)
 
@@ -102,13 +105,11 @@ static dynamic_smsg_mailbox_t  *mailbox_list;
 #define USE_LRTS_MEMPOOL                  1
 
 #if USE_LRTS_MEMPOOL
-#if CMK_SMP
-#define STEAL_MEMPOOL                     0
-#endif
 
 #define oneMB (1024ll*1024)
 static CmiInt8 _mempool_size = 8*oneMB;
 static CmiInt8 _expand_mem =  4*oneMB;
+
 #endif
 
 #if CMK_SMP
@@ -132,11 +133,14 @@ static CmiInt8 buffered_send_msg = 0;
 
 int         rdma_id = 0;
 
+static CmiUInt8  smsg_send_count = 0,  last_smsg_send_count = 0;
+static CmiUInt8  smsg_recv_count = 0,  last_smsg_recv_count = 0;
+
 #if PRINT_SYH
-int         lrts_smsg_success = 0;
 int         lrts_send_msg_id = 0;
-int         lrts_send_rdma_success = 0;
 int         lrts_local_done_msg = 0;
+int         lrts_smsg_success = 0;
+int         lrts_send_rdma_success = 0;
 #endif
 
 #include "machine.h"
@@ -657,6 +661,7 @@ static CmiInt8 buffered_recv_msg = 0;
 int         lrts_smsg_success = 0;
 int         lrts_received_msg = 0;
 #endif
+
 static void sweep_mempool(mempool_type *mptr)
 {
     block_header *current = &(mptr->block_head);
@@ -956,6 +961,7 @@ inline static gni_return_t send_smsg_message(SMSG_QUEUE *queue, int destNode, vo
                 TRACE_COMM_CREATION(CpvAccess(projTraceStart), real_data);
             }
 #endif
+            smsg_send_count ++;
             return status;
         }else
             status = GNI_RC_ERROR_RESOURCE;
@@ -1099,7 +1105,7 @@ CmiCommHandle LrtsSendFunc(int destNode, int size, char *msg, int mode)
     int                 oob = ( mode & OUT_OF_BAND);
     SMSG_QUEUE          *queue;
 
-#if USE_OOB
+#if CMK_USE_OOB
     queue = oob? &smsg_oob_queue : &smsg_queue;
 #else
     queue = &smsg_queue;
@@ -1157,6 +1163,52 @@ static void registerUserTraceEvents() {
 #endif
 }
 
+static void ProcessDeadlock()
+{
+    static CmiUInt8 *ptr = NULL;
+    static CmiUInt8  last = 0, mysum, sum;
+    static int count = 0;
+    gni_return_t status;
+    int i;
+
+//printf("[%d] comm thread detected hang %d %d %d\n", CmiMyPe(), smsg_send_count, smsg_recv_count, count);
+    if (ptr == NULL) ptr = (CmiUInt8*)malloc(mysize * sizeof(CmiUInt8));
+    mysum = smsg_send_count + smsg_recv_count;
+    status = PMI_Allgather(&mysum,ptr,sizeof(CmiUInt8));
+    GNI_RC_CHECK("PMI_Allgather", status);
+    sum = 0;
+    for (i=0; i<mysize; i++)  sum+= ptr[i];
+    if (last == 0 || sum == last) 
+        count++;
+    else
+        count = 0;
+    last = sum;
+    if (count == 2) { 
+        /* detected twice, it is a real deadlock */
+        if (myrank == 0)  {
+            CmiPrintf("Charm++> network progress engine stalled, program may hang. Try set environment variables CHARM_UGNI_MEMPOOL_MAX and CHARM_UGNI_SEND_MAX to limit the registered memory usage.\n");
+            CmiAbort("Fatal> Deadlock detected.");
+        }
+    }
+    _detected_hang = 0;
+}
+
+static void CheckProgress()
+{
+    if (smsg_send_count == last_smsg_send_count &&
+        smsg_recv_count == last_smsg_recv_count ) 
+    {
+        _detected_hang = 1;
+#if !CMK_SMP
+        if (_detected_hang) ProcessDeadlock();
+#endif
+    }
+    else {
+        last_smsg_send_count = smsg_send_count;
+        last_smsg_recv_count = smsg_recv_count;
+    }
+}
+
 void LrtsPostCommonInit(int everReturn)
 {
 #if CMI_MPI_TRACE_USEREVENTS && CMK_TRACE_ENABLED && !CMK_TRACE_IN_CHARM
@@ -1176,6 +1228,12 @@ void LrtsPostCommonInit(int everReturn)
     if (useDynamicSMSG)
     CcdCallOnConditionKeep(CcdPERIODIC_10ms, (CcdVoidFn) PumpDatagramConnection, NULL);
 #endif
+
+    if (_checkProgress)
+#if CMK_SMP
+    if (CmiMyRank() == 0)
+#endif
+    CcdCallOnConditionKeep(CcdPERIODIC_2minute, (CcdVoidFn) CheckProgress, NULL);
 }
 
 /* this is called by worker thread */
@@ -1418,7 +1476,7 @@ static void PumpNetworkSmsg()
                 printf("weird tag problem\n");
                 CmiAbort("Unknown tag\n");
                      }
-            }
+            }               // end switch
 #if CMK_SMP && !COMM_THREAD_SEND
             CmiLock(tx_cq_lock);
 #endif
@@ -1426,6 +1484,7 @@ static void PumpNetworkSmsg()
 #if CMK_SMP && !COMM_THREAD_SEND
             CmiUnlock(tx_cq_lock);
 #endif
+            smsg_recv_count ++;
             msg_tag = GNI_SMSG_ANY_TAG;
         } //endwhile getNext
     }   //end while GetEvent
@@ -1932,12 +1991,11 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
         {
 #endif
             MACHSTATE5(8, "noempty-smsg  %d (%d,%d,%d) tag=%d \n", ptr->destNode, buffered_send_msg, buffered_recv_msg, register_mempool, ptr->tag); 
+            status = GNI_RC_ERROR_RESOURCE;
             if (useDynamicSMSG && smsg_connected_flag[index] != 2) {   
                 /* connection not exists yet */
-              done = 0;
-              break;
             }
-            status = GNI_RC_ERROR_RESOURCE;
+            else
             switch(ptr->tag)
             {
             case SMALL_DATA_TAG:
@@ -1950,8 +2008,6 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
             case LMSG_INIT_TAG:
                 control_msg_tmp = (CONTROL_MSG*)ptr->msg;
                 status = send_large_messages(queue, ptr->destNode, control_msg_tmp, 1);
-                if(status != GNI_RC_SUCCESS)
-                    done = 0;
                 break;
             case   ACK_TAG:
             case   BIG_MSG_TAG:
@@ -1959,8 +2015,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
                 if(status == GNI_RC_SUCCESS)
                 {
                     FreeControlMsg((CONTROL_MSG*)ptr->msg);
-                }else
-                    done = 0;
+                }
                 break;
 #ifdef CMK_DIRECT
             case DIRECT_PUT_DONE_TAG:
@@ -1975,7 +2030,7 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
             default:
                 printf("Weird tag\n");
                 CmiAbort("should not happen\n");
-            }
+            }       // end switch
             if(status == GNI_RC_SUCCESS)
             {
 #if !CMK_SMP
@@ -2038,6 +2093,8 @@ static int SendBufferMsg(SMSG_QUEUE *queue)
     return done;
 }
 
+static void ProcessDeadlock();
+
 void LrtsAdvanceCommunication(int whileidle)
 {
     /*  Receive Msg first */
@@ -2080,8 +2137,8 @@ void LrtsAdvanceCommunication(int whileidle)
 #if CMK_SMP_TRACE_COMMTHREAD
     startT = CmiWallTimer();
 #endif
-#if USE_OOB
-    SendBufferMsg(&smsg_oob_queue);
+#if CMK_USE_OOB
+    if (SendBufferMsg(&smsg_oob_queue) == 1)
 #endif
     SendBufferMsg(&smsg_queue);
     MACHSTATE(8, "after SendBufferMsg\n") ; 
@@ -2099,6 +2156,10 @@ void LrtsAdvanceCommunication(int whileidle)
     endT = CmiWallTimer();
     if (endT-startT>=TRACE_THRESHOLD) traceUserBracketEvent(50, startT, endT);
 #endif
+
+#if CMK_SMP
+    if (_detected_hang)  ProcessDeadlock();
+#endif
 }
 
 /* useDynamicSMSG */
@@ -2284,7 +2345,7 @@ static void _init_smsg()
     }
 
     _init_send_queue(&smsg_queue);
-#if USE_OOB
+#if CMK_USE_OOB
     _init_send_queue(&smsg_oob_queue);
 #endif
 }
@@ -2631,6 +2692,10 @@ void LrtsInit(int *argc, char ***argv, int *numNodes, int *myNodeID)
     if (MAX_BUFF_SEND > MAX_REG_MEM)  MAX_BUFF_SEND = MAX_REG_MEM;
 #endif
 
+    env = getenv("CHARM_UGNI_NO_DEADLOCK_CHECK");
+    if (env) _checkProgress = 0;
+    if (mysize == 1) _checkProgress = 0;
+
     /* init DMA buffer for medium message */
 
     //_init_DMA_buffer();
@@ -2720,7 +2785,7 @@ void LrtsDrainResources()
 {
     if(mysize == 1) return;
     while (
-#if USE_OOB
+#if CMK_USE_OOB
            !SendBufferMsg(&smsg_oob_queue) ||
 #endif
            !SendBufferMsg(&smsg_queue))
@@ -2817,7 +2882,7 @@ double CmiCpuTimer(void) {
 
 int CmiBarrier()
 {
-    int status;
+    gni_return_t status;
 
 #if CMK_SMP
     /* make sure all ranks reach here, otherwise comm threads may reach barrier ignoring other ranks  */