enabled async io for gm version. (setitimer does not work with gm though)
authorGengbin Zheng <gzheng@illinois.edu>
Mon, 27 Sep 2004 18:34:52 +0000 (18:34 +0000)
committerGengbin Zheng <gzheng@illinois.edu>
Mon, 27 Sep 2004 18:34:52 +0000 (18:34 +0000)
CommunicationServer now was modified to take another parameter to indicate where it is called (they can be from smp thread, interrupt or worker thread).
This eliminated the wierd CommunicationServerThread() version of CommunicationServer.

src/arch/net/machine-dgram.c
src/arch/net/machine-eth.c
src/arch/net/machine-gm.c
src/arch/net/machine-tcp.c
src/arch/net/machine.c

index e64ea3b6616b3b5fb065748e13eaca9571570956..f16781ebe28e3a57b850ca4a2865fd2a9e566ce5 100644 (file)
@@ -551,7 +551,7 @@ static void CommunicationsClockCaller(void *ignored)
 
 static void CommunicationPeriodic(void) 
 { /*Poll on the communications server*/
-  CommunicationServerThread(0);
+  CommunicationServer(0, 0);
 }
 
 static void CommunicationPeriodicCaller(void *ignored)
index de9ada677f080af2ea7b544e0d822bbcbd6bf428..519275401ede6c5f97fb6431ab97043d677fff29 100644 (file)
@@ -5,7 +5,6 @@
   * CmiNotifyIdle()
   * DeliverViaNetwork()
   * CommunicationServer()
-  * CommunicationServerThread()
 
   moved from machine.c by 
   Gengbin Zheng, gzheng@uiuc.edu  4/22/2001
@@ -43,7 +42,7 @@ static void CmiNotifyStillIdle(CmiIdleState *s)
 #if CMK_SHARED_VARS_UNAVAILABLE
   /*No comm. thread-- listen on sockets for incoming messages*/
   MACHSTATE(3,"idle commserver {")
-  CommunicationServerThread(10);
+  CommunicationServer(10, 0);
   MACHSTATE(3,"} idle commserver")
 #else
   int nSpins=20; /*Number of times to spin before sleeping*/
@@ -696,7 +695,12 @@ void ReceiveDatagram()
  ***********************************************************************/
 void CmiHandleImmediate();
 
-static void CommunicationServer(int sleepTime)
+/*
+0: from smp thread
+1: from interrupt
+2: from worker thread
+*/
+static void CommunicationServer(int sleepTime, int where)
 {
   unsigned int nTimes=0; /* Loop counter */
   LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
@@ -735,21 +739,18 @@ static void CommunicationServer(int sleepTime)
     }
   }
   CmiCommUnlock();
-  MACHSTATE(1,"} CommunicationServer") 
-}
 
-/* similar to CommunicationServer, but it is called by communication thread
-   or in interrupt */
-static void CommunicationServerThread(int sleepTime)
-{
-  MACHSTATE(2,"CommunicationServerThread");
-  CommunicationServer(sleepTime);
+  /* when called by communication thread or in interrupt */
+  if (where == 0 || where == 1) {
 #if CMK_IMMEDIATE_MSG
   CmiHandleImmediate();
 #endif
 #if CMK_PERSISTENT_COMM
   PumpPersistent();
 #endif
+  }
+
+  MACHSTATE(1,"} CommunicationServer") 
 }
 
 void CmiMachineInit(char **argv)
index 2de2e2b7a0e56e819ea8311e9448b1410a872dc7..8892dd8d2855e6d28f6f87d82c3cb2f4a7d6bbc3 100644 (file)
@@ -5,7 +5,6 @@
   * CmiNotifyIdle()
   * DeliverViaNetwork()
   * CommunicationServer()
-  * CommunicationServerThread()
 
   written by 
   Gengbin Zheng, gzheng@uiuc.edu  4/22/2001
@@ -239,16 +238,24 @@ int CheckSocketsReady(int withDelayMs)
  *
  ***********************************************************************/
 
+static void ServiceCharmrun()
+{
+  int again = 1;
+  CmiCommLock();
+  while (again)
+  {
+  again = 0;
+  CheckSocketsReady(0);
+  if (ctrlskt_ready_read) { ctrl_getone(); again=1; }
+  if (CmiStdoutNeedsService()) { CmiStdoutService(); }
+  }
+  CmiCommUnlock();
+}
 
 static void CommunicationServer_nolock(int withDelayMs) {
   gm_recv_event_t *e;
-  int size, len;
-  char *msg, *buf;
-  while (1) {
-    CheckSocketsReady(0);
-    if (ctrlskt_ready_read) { ctrl_getone(); }
-    if (CmiStdoutNeedsService()) { CmiStdoutService(); }
 
+  while (1) {
     MACHSTATE(3,"Non-blocking receive {")
     e = gm_receive(gmport);
     MACHSTATE(3,"} Non-blocking receive")
@@ -256,30 +263,32 @@ static void CommunicationServer_nolock(int withDelayMs) {
   }
 }
 
-static void CommunicationServer(int withDelayMs)
+/*
+0: from smp thread
+1: from interrupt
+2: from worker thread
+*/
+static void CommunicationServer(int withDelayMs, int where)
 {
-  MACHSTATE1(2,"CommunicationServer(%d)",withDelayMs)
-  LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
-
   /* standalone mode */
   if (Cmi_charmrun_pid == 0 && gmport == NULL) return;
 
-  CmiCommLock();
+  ServiceCharmrun();
+  if (where == 1) return;
 
-  CommunicationServer_nolock(withDelayMs);
+  MACHSTATE1(2,"CommunicationServer(%d)",withDelayMs)
+  LOG(GetClock(), Cmi_nodestart, 'I', 0, 0);
 
+  CmiCommLock();
+  CommunicationServer_nolock(withDelayMs);
   CmiCommUnlock();
-  MACHSTATE(2,"} CommunicationServer")
-}
 
-/* similar to CommunicationServer, but it is called by communication thread
-   or in interrupt */
-static void CommunicationServerThread(int sleepTime)
-{
-  CommunicationServer(sleepTime);
 #if CMK_IMMEDIATE_MSG
+  if (where == 0)
   CmiHandleImmediate();
 #endif
+
+  MACHSTATE(2,"} CommunicationServer")
 }
 
 static void processMessage(char *msg, int len)
index e4c3d1242fd408724796ff9dd3c73bee4d6f8a58..da08e04a63b6a4c4dd15874acd8adbbe4042ab3c 100644 (file)
@@ -6,7 +6,6 @@
   * CmiNotifyIdle()
   * DeliverViaNetwork()
   * CommunicationServer()
-  * CommunicationServerThread()
 
   written by 
   Gengbin Zheng, 12/21/2001
@@ -63,7 +62,7 @@ static void CmiNotifyBeginIdle(CmiIdleState *s)
 static void CmiNotifyStillIdle(CmiIdleState *s)
 {
 #if CMK_SHARED_VARS_UNAVAILABLE
-  CommunicationServerThread(1);
+  CommunicationServer(1, 0);
 #else
   int nSpins=20; /*Number of times to spin before sleeping*/
   s->nIdles++;
@@ -242,8 +241,12 @@ here-- WSAEINVAL, WSAENOTSOCK-- yet everything is actually OK.
  *
  ***********************************************************************/
 
-
-static void CommunicationServer(int sleepTime)
+/*
+0: from smp thread
+1: from interrupt
+2: from worker thread
+*/
+static void CommunicationServer(int sleepTime, int where)
 {
   unsigned int nTimes=0; /* Loop counter */
   CmiCommLockOrElse({
@@ -281,19 +284,19 @@ static void CommunicationServer(int sleepTime)
     }
   }
   CmiCommUnlock();
-  MACHSTATE(2,"} CommunicationServer") 
-}
 
-/* similar to CommunicationServer, but it is called by communication thread
-   or in interrupt */
-static void CommunicationServerThread(int sleepTime)
-{
-  CommunicationServer(sleepTime);
+  /* when called by communication thread or in interrupt */
+  if (where == 0 || where == 1)
+  {
 #if CMK_IMMEDIATE_MSG
   CmiHandleImmediate();
 #endif
+  }
+
+  MACHSTATE(2,"} CommunicationServer") 
 }
 
+
 #if FRAGMENTATION
 /* keep one buffer of PACKET_MAX size to ensure copy free operation 
    1. for short message that is less than PACKET_MAX, 
index 220254e56a793d7b13a5388224633958df435bb1..ae0c8e608bb46004a9132b4fc9fcaa5bec927584 100644 (file)
@@ -252,8 +252,13 @@ int  portFinish = 0;
 
 #define PRINTBUFSIZE 16384
 
-static void CommunicationServer(int withDelayMs);
-static void CommunicationServerThread(int withDelayMs);
+/*
+0: from smp thread
+1: from interrupt
+2: from worker thread
+*/
+static void CommunicationServer(int withDelayMs, int where);
+
 void CmiHandleImmediate();
 extern int CmemInsideMem();
 extern void CmemCallWhenMemAvail();
@@ -578,6 +583,7 @@ static int        Cmi_charmrun_pid;
 static int        Cmi_charmrun_fd=-1;
 
 static int    Cmi_netpoll;
+static int    Cmi_asyncio;
 static int    Cmi_idlepoll;
 static int    Cmi_syncprint;
 static int Cmi_print_stats = 0;
@@ -781,7 +787,7 @@ static void CommunicationInterrupt(int ignored)
     /*Make sure any malloc's we do in here are NOT migratable:*/
     CmiIsomallocBlockList *oldList=CmiIsomallocBlockListActivate(NULL);
 /*    _Cmi_myrank=1; */
-    CommunicationServerThread(0);
+    CommunicationServer(0, 1);     /* from interrupt */
 /*    _Cmi_myrank=0; */
     CmiIsomallocBlockListActivate(oldList);
   }
@@ -805,7 +811,8 @@ static void CmiStartThreads(char **argv)
   _Cmi_myrank=0;
   
 #if !CMK_ASYNC_NOT_NEEDED
-  if (!Cmi_netpoll) {
+  if (Cmi_asyncio)
+  {
     CmiSignal(SIGIO, 0, 0, CommunicationInterrupt);
     if (dataskt!=-1) CmiEnableAsyncIO(dataskt);
     if (Cmi_charmrun_fd!=-1) CmiEnableAsyncIO(Cmi_charmrun_fd);
@@ -1205,8 +1212,9 @@ static void CmiStdoutInit(void) {
 #if 0 /*Keep writes from blocking.  This just drops excess output, which is bad.*/
                CmiEnableNonblockingIO(srcFd);
 #endif
-#if CMK_SHARED_VARS_UNAVAILABLE && !CMK_USE_GM
-                if (!Cmi_netpoll) {
+#if CMK_SHARED_VARS_UNAVAILABLE
+                if (Cmi_asyncio)
+               {
   /*No communication thread-- get a SIGIO on each write(), which keeps the buffer clean*/
                        CmiEnableAsyncIO(readStdout[i]);
                }
@@ -1618,7 +1626,7 @@ CmiCommHandle CmiGeneralNodeSend(int node, int size, int freemode, char *data)
   DeliverOutgoingNodeMessage(ogm);
   CmiCommUnlock();
   /* Check if any packets have arrived recently (preserves kernel network buffers). */
-  CommunicationServer(0);
+  CommunicationServer(0, 2);
   return (CmiCommHandle)ogm;
 }
 
@@ -1686,7 +1694,7 @@ CmiCommHandle CmiGeneralSend(int pe, int size, int freemode, char *data)
   DeliverOutgoingMessage(ogm);
   CmiCommUnlock();
   /* Check if any packets have arrived recently (preserves kernel network buffers). */
-  CommunicationServer(0);
+  CommunicationServer(0, 2);
   return (CmiCommHandle)ogm;
 }
 
@@ -1858,16 +1866,16 @@ void CmiFreeListSendFn(int npes, int *pes, int len, char *msg)
 #if CMK_IMMEDIATE_MSG
 void CmiProbeImmediateMsg()
 {
-  CommunicationServerThread(0);
+  CommunicationServer(0, 0);
 }
 #endif
 */
 
 /* Network progress function is used to poll the network when for
-   messages. This flushes receive buffers on some  implementations*/ 
+   messages. This flushes receive buffers on some implementations*/ 
 void CmiMachineProgressImpl()
 {
-    CommunicationServerThread(0);
+    CommunicationServer(0, 0);
 }
 
 /******************************************************************************
@@ -1954,7 +1962,7 @@ static void ConverseRunPE(int everReturn)
 
 #if ! CMK_USE_GM && ! CMK_USE_TCP
     /*Occasionally check for retransmissions, outgoing acks, etc.*/
-    /*no need in GM case */
+    /*no need for GM case */
     CcdCallFnAfter((CcdVoidFn)CommunicationsClockCaller,NULL,Cmi_comm_clock_delay);
 #endif
 #endif
@@ -1974,7 +1982,7 @@ static void ConverseRunPE(int everReturn)
   if (CmiMyRank() == CmiMyNodeSize()) {
     Cmi_startfn(CmiGetArgc(CmiMyArgv), CmiMyArgv);
     if (Cmi_charmrun_fd!=-1)
-          while (1) CommunicationServerThread(5);
+          while (1) CommunicationServer(5, 0);
   }
   else
   if (!everReturn) {
@@ -2001,7 +2009,7 @@ void ConverseExit(void)
   if (Cmi_charmrun_fd!=-1) {
        ctrl_sendone_locking("ending",NULL,0,NULL,0); /* this causes charmrun to go away */
 #if CMK_SHARED_VARS_UNAVAILABLE
-       while (1) CommunicationServer(500);
+       while (1) CommunicationServer(500, 2);
 #endif
   }
 /*Comm. thread will kill us.*/
@@ -2072,6 +2080,13 @@ void ConverseInit(int argc, char **argv, CmiStartFn fn, int usc, int everReturn)
   if (CmiGetArgFlagDesc(argv,"+idlesleep","Make sleep calls when idle")) Cmi_idlepoll = 0;
   Cmi_syncprint = CmiGetArgFlagDesc(argv,"+syncprint", "Flush each CmiPrintf to the terminal");
 
+  Cmi_asyncio= 1;
+  if (Cmi_netpoll) Cmi_asyncio = 0;     /* netpoll turn off async io */
+#if CMK_USE_GM
+  Cmi_asyncio = 1;                     /* gm use async io */
+#endif
+  if (CmiGetArgFlagDesc(argv,"+asyncio","Use async IO")) Cmi_asyncio = 1;
+
   MACHSTATE2(5,"Init: (netpoll=%d), (idlepoll=%d)",Cmi_netpoll,Cmi_idlepoll);
 
   skt_init();