Hiya,
        As promised here's the final patch (tested for both poll and select)
against base 1.2beta22 code that makes the comm_incoming stuff work better.
See code for comments.
        It splits HTTP and ICP incoming apart and targets an average of
1 operation per set of FD's.  This means that 2/3s of the time we will
have activity on one of these sockets.  This has been tested and it works.
System calls have dropped by 33% over similarly loaded caches running the
beta22 base code.  Dropped by 40% over beta21 code.
        This patch supercedes and cancels my previous patch.
        Stew.
--- structs.h	1998/06/05 01:46:58	1.1
+++ structs.h	1998/06/05 01:47:55
@@ -1252,7 +1252,8 @@
     int select_loops;
     double cputime;
     struct timeval timestamp;
-    StatHist comm_incoming;
+    StatHist comm_icp_incoming;
+    StatHist comm_http_incoming;
 };
 
 struct _tlv {
--- comm.c	1998/06/04 06:06:49	1.1
+++ comm.c	1998/06/05 04:40:43
@@ -113,7 +113,7 @@
 #endif
 
 #if USE_ASYNC_IO
-#define MAX_POLL_TIME 50
+#define MAX_POLL_TIME 10
 #else
 #define MAX_POLL_TIME 1000
 #endif
@@ -133,7 +133,6 @@
 } ConnectStateData;
 
 /* STATIC */
-static int incame = 0;
 static int commBind(int s, struct in_addr, u_short port);
 #if !HAVE_POLL
 static int examine_select(fd_set *, fd_set *);
@@ -141,7 +140,6 @@
 static void checkTimeouts(void);
 static void commSetReuseAddr(int);
 static void commSetNoLinger(int);
-static void comm_incoming(void);
 static void CommWriteStateCallbackAndFree(int fd, int code);
 #ifdef TCP_NODELAY
 static void commSetTcpNoDelay(int);
@@ -169,47 +167,54 @@
  * of incoming ICP, then we need to check these sockets more than
  * if we just have HTTP.
  *
- * The variable 'incoming_interval' determines how many normal I/O
- * events to process before checking incoming sockets again.  
- * Note we store the incoming_interval multipled by a factor
- * of 16 (e.g. <<4) to have some pseudo-floating point precision.
+ * The variables 'incoming_icp_interval' and 'incoming_http_interval' 
+ * determine how many normal I/O events to process before checking
+ * incoming sockets again.  Note we store the incoming_interval
+ * multipled by a factor of (2^INCOMING_FACTOR) to have some
+ * pseudo-floating point precision.
  *
- * The variable 'io_events' counts how many normal I/O events have
- * been processed.  When io_events > incoming_interval, its time
- * to check incoming sockets.
+ * The variable 'icp_io_events' and 'http_io_events' counts how many normal
+ * I/O events have been processed since the last check on the incoming
+ * sockets.  When io_events > incoming_interval, its time to check incoming
+ * sockets.
  *
  * Every time we check incoming sockets, we count how many new messages
  * or connections were processed.  This is used to adjust the
  * incoming_interval for the next iteration.  The new incoming_interval
- * is calculated as the average of the current incoming_interval and
- * 32 divided by the number of incoming events just processed. e.g.
+ * is calculated as the current incoming_interval plus what we would
+ * like to see as an average number of events minus the number of
+ * events just processed.
  *
- *                      1                      1       32 
- *  incoming_interval = - incoming_interval  + - -----------------
- *                      2                      2  incoming_events
+ *  incoming_interval = incoming_interval + 1 - number_of_events_processed
+ *
+ * There are separate incoming_interval counters for both HTTP and ICP events
  * 
- * You can see the current value of incoming_interval, as well as
+ * You can see the current values of the incoming_interval's, as well as
  * a histogram of 'incoming_events' by asking the cache manager
  * for 'comm_incoming', e.g.:
  *
  *      % ./client mgr:comm_incoming
  *
- * Bugs:
- *
- *      - We have 32 as a magic upper limit on incoming_interval.
- *      - INCOMING_TOTAL_MAX = INCOMING_ICP_MAX + INCOMING_HTTP_MAX,
- *        but this assumes only one ICP socket and one HTTP socket.
- *        If there are multiple incoming HTTP sockets, the we could
- *        conceivably process more than INCOMING_TOTAL_MAX events
- *        in comm_incoming().
+ * Caveats:
  *
- * The 'invert32[]' array is a pre-calculated array of division for 32/i
+ *      - We have MAX_INCOMING_INTEGER as a magic upper limit on
+ *	  incoming_interval for both types of sockets.  At the
+ *	  largest value the cache will effectively be idling.
  *
+ *	- The higher the INCOMING_FACTOR, the slower the algorithm will
+ *	  respond to load spikes/increases/decreases in demand. A value
+ *	  between 3 and 8 is recommended.
  */
-static int io_events = 0;
-static int incoming_interval = 16 << 4;
-static int invert32[INCOMING_TOTAL_MAX];
-#define commCheckIncoming (++io_events > (incoming_interval>>4))
+
+#define MAX_INCOMING_INTEGER 256
+#define INCOMING_FACTOR 5
+#define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR)
+static int icp_io_events = 0;
+static int http_io_events = 0;
+static int incoming_icp_interval = 16 << INCOMING_FACTOR;
+static int incoming_http_interval = 16 << INCOMING_FACTOR;
+#define commCheckICPIncoming (++icp_io_events > (incoming_icp_interval>>
INCOMING_FACTOR))
+#define commCheckHTTPIncoming (++http_io_events > (incoming_http_interval>>
INCOMING_FACTOR))
 
 static void
 CommWriteStateCallbackAndFree(int fd, int code)
@@ -763,35 +768,21 @@
     return F->defer_check(fd, F->defer_data);
 }
 
-static void
-comm_incoming(void)
-{
-    int j;
-    incame = 0;
-    io_events = 0;
-    if (theInIcpConnection > 0) {
-	icpHandleUdp(theInIcpConnection, &incame);
-	if (theInIcpConnection != theOutIcpConnection)
-	    icpHandleUdp(theOutIcpConnection, &incame);
-    }
-    for (j = 0; j < NHttpSockets; j++) {
-	if (HttpSockets[j] < 0)
-	    continue;
-	httpAccept(HttpSockets[j], &incame);
-    }
-    statHistCount(&Counter.comm_incoming, incame);
-    if (incame < INCOMING_TOTAL_MAX)
-	incoming_interval = (incoming_interval >> 1) + (invert32[incame] << 3);
-}
 
 static int
-fdIsHttpOrIcp(int fd)
+fdIsIcp(int fd)
 {
-    int j;
     if (fd == theInIcpConnection)
         return 1;
     if (fd == theOutIcpConnection)
         return 1;
+    return 0;
+}
+
+static int
+fdIsHttp(int fd)
+{
+    int j;
     for (j = 0; j < NHttpSockets; j++) {
         if (fd == HttpSockets[j])
             return 1;
@@ -800,6 +791,115 @@
 }
 
 #if HAVE_POLL
+
+int
+comm_check_incoming_poll_handlers(int nfds, int *fds)
+{
+    int i;
+    int fd;
+    int incame = 0;
+    PF *hdl = NULL;
+    int npfds;
+    struct pollfd pfds[3 + MAXHTTPPORTS];
+
+    for (i = npfds = 0; i < nfds; i++) {
+	int events;
+	fd = fds[i];
+	events = 0;
+	if (fd_table[fd].read_handler)
+	    events |= POLLRDNORM;
+	if (fd_table[fd].write_handler)
+	    events |= POLLWRNORM;
+	if (events) {
+	    pfds[npfds].fd = fd;
+	    pfds[npfds].events = events;
+	    pfds[npfds].revents = 0;
+	    npfds++;
+	}
+    }
+    if (!nfds)
+	return incame;
+#if !ALARM_UPDATES_TIME
+    getCurrentTime();
+#endif
+    if(poll(pfds, npfds, 0) < 1)
+	return incame;
+    for (i = 0; i < npfds; i++) {
+	int revents;
+	if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
+	    continue;
+	if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
+	    if (hdl = fd_table[fd].read_handler) {
+		fd_table[fd].read_handler = NULL;
+		hdl(fd, &incame);
+	    } else
+		debug(5, 1) ("comm_poll_incoming: NULL read handler\n");
+	}
+	if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
+	    if (hdl = fd_table[fd].write_handler) {
+		fd_table[fd].write_handler = NULL;
+		hdl(fd, &incame);
+	    } else
+		debug(5, 1) ("comm_poll_incoming: NULL write handler\n");
+	}
+    }
+    return incame;
+}
+
+static void
+comm_poll_icp_incoming(void)
+{
+    int nfds = 0;
+    int fds[2];
+    int nevents;
+
+    icp_io_events = 0;
+    if (theInIcpConnection >= 0)
+	fds[nfds++] = theInIcpConnection;
+    if (theInIcpConnection != theOutIcpConnection)
+	if (theOutIcpConnection >= 0)
+	    fds[nfds++] = theOutIcpConnection;
+    if(nfds == 0)
+	return;
+    nevents = comm_check_incoming_poll_handlers(nfds, fds);
+    incoming_icp_interval = incoming_icp_interval + 1 - nevents;
+    if (incoming_icp_interval < 0)
+	incoming_icp_interval = 0;
+    if (incoming_icp_interval > MAX_INCOMING_INTERVAL)
+	incoming_icp_interval = MAX_INCOMING_INTERVAL;
+    if(nevents > INCOMING_ICP_MAX)
+	nevents = INCOMING_ICP_MAX;
+    statHistCount(&Counter.comm_icp_incoming, nevents);
+}
+
+static void
+comm_poll_http_incoming(void)
+{
+    int nfds = 0;
+    int fds[MAXHTTPPORTS];
+    int j;
+    int nevents;
+
+    http_io_events = 0;
+    for (j = 0; j < NHttpSockets; j++) {
+	if (HttpSockets[j] < 0)
+	    continue;
+	if (commDeferRead(HttpSockets[j]))
+	    continue;
+	fds[nfds++] = HttpSockets[j];
+    }
+    nevents = comm_check_incoming_poll_handlers(nfds, fds);
+    incoming_http_interval = incoming_http_interval + 1 - nevents;
+    if (incoming_http_interval < 0)
+	incoming_http_interval = 0;
+    if (incoming_http_interval > MAX_INCOMING_INTERVAL)
+	incoming_http_interval = MAX_INCOMING_INTERVAL;
+    if(nevents > INCOMING_HTTP_MAX)
+	nevents = INCOMING_HTTP_MAX;
+    statHistCount(&Counter.comm_http_incoming, nevents);
+}
+
+
 /* poll all sockets; call handlers for those that are ready. */
 int
 comm_poll(int msec)
@@ -811,8 +911,8 @@
     int maxfd;
     unsigned long nfds;
     int num;
+    int callicp = 0, callhttp = 0;
     static time_t last_timeout = 0;
-    static int lastinc = 0;
     double timeout = current_dtime + (msec / 1000.0);
     do {
 #if !ALARM_UPDATES_TIME
@@ -833,7 +933,11 @@
 #if USE_ASYNC_IO
         aioCheckCallbacks();
 #endif
-	comm_incoming();
+	if (commCheckICPIncoming)
+	    comm_poll_icp_incoming();
+	if (commCheckHTTPIncoming)
+	    comm_poll_http_incoming();
+	callicp = callhttp = 0;
         nfds = 0;
         maxfd = Biggest_FD + 1;
         for (i = 0; i < maxfd; i++) {
@@ -884,16 +988,24 @@
             int revents;
             if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
                 continue;
-	    if (fdIsHttpOrIcp(fd))
+	    if (fdIsIcp(fd)) {
+		callicp = 1;
+		continue;
+	    }
+	    if (fdIsHttp(fd)) {
+		callhttp = 1;
                 continue;
+	    }
             if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
                 debug(5, 6) ("comm_poll: FD %d ready for reading\n", fd);
                 if ((hdl = fd_table[fd].read_handler)) {
                     fd_table[fd].read_handler = NULL;
                     hdl(fd, fd_table[fd].read_data);
                 }
-		if (commCheckIncoming)
-		    comm_incoming();
+		if (commCheckICPIncoming)
+		    comm_poll_icp_incoming();
+		if (commCheckHTTPIncoming)
+		    comm_poll_http_incoming();
             }
             if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
                 debug(5, 5) ("comm_poll: FD %d ready for writing\n", fd);
@@ -901,8 +1013,10 @@
                     fd_table[fd].write_handler = NULL;
                     hdl(fd, fd_table[fd].write_data);
                 }
-		if (commCheckIncoming)
-		    comm_incoming();
+		if (commCheckICPIncoming)
+		    comm_poll_icp_incoming();
+		if (commCheckHTTPIncoming)
+		    comm_poll_http_incoming();
             }
             if (revents & POLLNVAL) {
                 close_handler *ch;
@@ -929,8 +1043,11 @@
                 if (F->open != 0)
                     fd_close(fd);
             }
-	    lastinc = incame;
         }
+	if(callicp)
+	    comm_poll_icp_incoming();
+	if(callhttp)
+	    comm_poll_http_incoming();
         return COMM_OK;
     } while (timeout > current_dtime);
     debug(5, 8) ("comm_poll: time out: %d.\n", squid_curtime);
@@ -939,6 +1056,114 @@
 
 #else
 
+int
+comm_check_incoming_select_handlers(int nfds, int *fds)
+{
+    int i;
+    int fd;
+    int incame = 0;
+    int maxfd = 0;
+    PF *hdl = NULL;
+    fd_set read_mask;
+    fd_set write_mask;
+
+    FD_ZERO(&read_mask);
+    FD_ZERO(&write_mask);
+    for (i = 0; i < nfds; i++) {
+	fd = fds[i];
+	if (fd_table[fd].read_handler) {
+	    FD_SET(fd, &read_mask);
+	    if (fd > maxfd)
+		maxfd = fd;
+	}
+	if (fd_table[fd].write_handler) {
+	    FD_SET(fd, &write_mask);
+	    if (fd > maxfd)
+		maxfd = fd;
+	}
+    }
+    if (maxfd++ == 0)
+	return incame;
+#if !ALARM_UPDATES_TIME
+    getCurrentTime();
+#endif
+    if (select(maxfd, &read_mask, &write_mask, NULL, &zero_tv) < 1)
+	return incame;
+    for (i = 0; i < nfds; i++) {
+	fd = fds[i];
+	if (FD_ISSET(fd, &read_mask)) {
+	    if ((hdl = fd_table[fd].read_handler) != NULL) {
+		fd_table[fd].read_handler = NULL;
+		hdl(fd, &incame);
+	    } else {
+		debug(5, 1) ("comm_select_incoming: NULL read handler\n");
+	    }
+	}
+	if (FD_ISSET(fd, &write_mask)) {
+	    if ((hdl = fd_table[fd].write_handler) != NULL) {
+		fd_table[fd].write_handler = NULL;
+		hdl(fd, &incame);
+	    } else {
+		debug(5, 1) ("comm_select_incoming: NULL write handler\n");
+	    }
+	}
+    }
+    return incame;
+}
+
+static void
+comm_select_icp_incoming(void)
+{
+    int nfds = 0;
+    int fds[2];
+    int nevents;
+
+    icp_io_events = 0;
+    if (theInIcpConnection >= 0)
+	fds[nfds++] = theInIcpConnection;
+    if (theInIcpConnection != theOutIcpConnection)
+	if (theOutIcpConnection >= 0)
+	    fds[nfds++] = theOutIcpConnection;
+    if(nfds == 0)
+	return;
+    nevents = comm_check_incoming_select_handlers(nfds, fds);
+    incoming_icp_interval = incoming_icp_interval + 1 - nevents;
+    if (incoming_icp_interval < 0)
+	incoming_icp_interval = 0;
+    if (incoming_icp_interval > MAX_INCOMING_INTERVAL)
+	incoming_icp_interval = MAX_INCOMING_INTERVAL;
+    if(nevents > INCOMING_ICP_MAX)
+	nevents = INCOMING_ICP_MAX;
+    statHistCount(&Counter.comm_icp_incoming, nevents);
+}
+
+static void
+comm_select_http_incoming(void)
+{
+    int nfds = 0;
+    int fds[MAXHTTPPORTS];
+    int j;
+    int nevents;
+
+    http_io_events = 0;
+    for (j = 0; j < NHttpSockets; j++) {
+	if (HttpSockets[j] < 0)
+	    continue;
+	if (commDeferRead(HttpSockets[j]))
+	    continue;
+	fds[nfds++] = HttpSockets[j];
+    }
+    nevents = comm_check_incoming_select_handlers(nfds, fds);
+    incoming_http_interval = incoming_http_interval + 1 - nevents;
+    if (incoming_http_interval < 0)
+	incoming_http_interval = 0;
+    if (incoming_http_interval > MAX_INCOMING_INTERVAL)
+	incoming_http_interval = MAX_INCOMING_INTERVAL;
+    if(nevents > INCOMING_HTTP_MAX)
+	nevents = INCOMING_HTTP_MAX;
+    statHistCount(&Counter.comm_http_incoming, nevents);
+}
+
 /* Select on all sockets; call handlers for those that are ready. */
 int
 comm_select(int msec)
@@ -951,9 +1176,9 @@
     int maxfd;
     int nfds;
     int num;
+    int callicp = 0, callhttp = 0;
     static time_t last_timeout = 0;
     struct timeval poll_time;
-    static int lastinc;
     double timeout = current_dtime + (msec / 1000.0);
 
     do {
@@ -979,7 +1204,11 @@
             else
                 setSocketShutdownLifetimes(1);
         }
-	comm_incoming();
+	if (commCheckICPIncoming)
+	    comm_select_icp_incoming();
+	if (commCheckHTTPIncoming)
+	    comm_select_http_incoming();
+	callicp = callhttp = 0;
         nfds = 0;
         maxfd = Biggest_FD + 1;
         for (i = 0; i < maxfd; i++) {
@@ -1035,8 +1264,14 @@
         for (fd = 0; fd < maxfd; fd++) {
             if (!FD_ISSET(fd, &readfds) && !FD_ISSET(fd, &writefds))
                 continue;
-	    if (fdIsHttpOrIcp(fd))
+	    if (fdIsIcp(fd)) {
+		callicp = 1;
+		continue;
+	    }
+	    if (fdIsHttp(fd)) {
+		callhttp = 1;
                 continue;
+	    }
             if (FD_ISSET(fd, &readfds)) {
                 debug(5, 6) ("comm_select: FD %d ready for reading\n", fd);
                 if (fd_table[fd].read_handler) {
@@ -1044,8 +1279,10 @@
                     fd_table[fd].read_handler = NULL;
                     hdl(fd, fd_table[fd].read_data);
                 }
-		if (commCheckIncoming)
-		    comm_incoming();
+		if (commCheckICPIncoming)
+		    comm_select_icp_incoming();
+		if (commCheckHTTPIncoming)
+		    comm_select_http_incoming();
             }
             if (FD_ISSET(fd, &writefds)) {
                 debug(5, 5) ("comm_select: FD %d ready for writing\n", fd);
@@ -1054,11 +1291,16 @@
                     fd_table[fd].write_handler = NULL;
                     hdl(fd, fd_table[fd].write_data);
                 }
-		if (commCheckIncoming)
-		    comm_incoming();
+		if (commCheckICPIncoming)
+		    comm_select_icp_incoming();
+		if (commCheckHTTPIncoming)
+		    comm_select_http_incoming();
             }
-	    lastinc = incame;
         }
+	if(callicp)
+	    comm_select_icp_incoming();
+	if(callhttp)
+	    comm_select_http_incoming();
         return COMM_OK;
     } while (timeout > current_dtime);
     debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime);
@@ -1188,6 +1430,7 @@
 }
 #endif
 
+
 void
 comm_init(void)
 {
@@ -1200,9 +1443,6 @@
     RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
     zero_tv.tv_sec = 0;
     zero_tv.tv_usec = 0;
-    invert32[0] = 32;
-    for (i = 1; i < INCOMING_TOTAL_MAX; i++)
-	invert32[i] = (int) (32.0 / (double) i + 0.5);
     cachemgrRegister("comm_incoming",
         "comm_incoming() stats",
         commIncomingStats, 0);
@@ -1419,10 +1659,22 @@
 commIncomingStats(StoreEntry * sentry)
 {
     StatCounters *f = &Counter;
-    storeAppendPrintf(sentry, "Current incoming_interval: %d\n",
-	incoming_interval >> 4);
+    storeAppendPrintf(sentry, "Current incoming_icp_interval: %d\n",
+	incoming_icp_interval >> INCOMING_FACTOR);
+    storeAppendPrintf(sentry, "Current incoming_http_interval: %d\n",
+	incoming_http_interval >> INCOMING_FACTOR);
     storeAppendPrintf(sentry, "\n");
-    storeAppendPrintf(sentry, "Histogram of number of incoming sockets or\n");
-    storeAppendPrintf(sentry, "Messages handled per comm_incoming() call:\n");
-    statHistDump(&f->comm_incoming, sentry, statHistIntDumper);
+    storeAppendPrintf(sentry, "Histogram of events per incoming socket 
type\n");
+#ifdef HAVE_POLL
+    storeAppendPrintf(sentry, "ICP Messages handled per 
comm_poll_icp_incoming() call:\n");
+#else
+    storeAppendPrintf(sentry, "ICP Messages handled per 
comm_select_icp_incoming() call:\n");
+#endif
+    statHistDump(&f->comm_icp_incoming, sentry, statHistIntDumper);
+#ifdef HAVE_POLL
+    storeAppendPrintf(sentry, "HTTP Messages handled per 
comm_poll_http_incoming() call:\n");
+#else
+    storeAppendPrintf(sentry, "HTTP Messages handled per 
comm_select_http_incoming() call:\n");
+#endif
+    statHistDump(&f->comm_http_incoming, sentry, statHistIntDumper);
 }
--- stat.c	1998/06/05 02:26:41	1.1
+++ stat.c	1998/06/05 02:29:01
@@ -883,7 +883,8 @@
      * Cache Digest Stuff
      */
     statHistEnumInit(&C->cd.on_xition_count, CacheDigestHashFuncCount);
-    statHistEnumInit(&C->comm_incoming, INCOMING_TOTAL_MAX);
+    statHistEnumInit(&C->comm_icp_incoming, INCOMING_ICP_MAX);
+    statHistEnumInit(&C->comm_http_incoming, INCOMING_HTTP_MAX);
 }
 
 /* add special cases here as they arrive */
@@ -899,7 +900,8 @@
     statHistClean(&C->icp.reply_svc_time);
     statHistClean(&C->dns.svc_time);
     statHistClean(&C->cd.on_xition_count);
-    statHistClean(&C->comm_incoming);
+    statHistClean(&C->comm_icp_incoming);
+    statHistClean(&C->comm_http_incoming);
 }
 
 /* add special cases here as they arrive */
@@ -921,7 +923,8 @@
     statHistCopy(&dest->icp.reply_svc_time, &orig->icp.reply_svc_time);
     statHistCopy(&dest->dns.svc_time, &orig->dns.svc_time);
     statHistCopy(&dest->cd.on_xition_count, &orig->cd.on_xition_count);
-    statHistCopy(&dest->comm_incoming, &orig->comm_incoming);
+    statHistCopy(&dest->comm_icp_incoming, &orig->comm_icp_incoming);
+    statHistCopy(&dest->comm_http_incoming, &orig->comm_http_incoming);
 }
 
 static void
Received on Tue Jul 29 2003 - 13:15:50 MDT
This archive was generated by hypermail pre-2.1.9 : Tue Dec 09 2003 - 16:11:48 MST