Hiya,
As promised here's the final patch (tested for both poll and select)
against base 1.2beta22 code that makes the comm_incoming stuff work better.
See code for comments.
It splits HTTP and ICP incoming apart and targets an average of
1 operation per set of FD's. This means that 2/3s of the time we will
have activity on one of these sockets. This has been tested and it works.
System calls have dropped by 33% over similarly loaded caches running the
beta22 base code. Dropped by 40% over beta21 code.
This patch supercedes and cancels my previous patch.
Stew.
--- structs.h 1998/06/05 01:46:58 1.1
+++ structs.h 1998/06/05 01:47:55
@@ -1252,7 +1252,8 @@
int select_loops;
double cputime;
struct timeval timestamp;
- StatHist comm_incoming;
+ StatHist comm_icp_incoming;
+ StatHist comm_http_incoming;
};
struct _tlv {
--- comm.c 1998/06/04 06:06:49 1.1
+++ comm.c 1998/06/05 04:40:43
@@ -113,7 +113,7 @@
#endif
#if USE_ASYNC_IO
-#define MAX_POLL_TIME 50
+#define MAX_POLL_TIME 10
#else
#define MAX_POLL_TIME 1000
#endif
@@ -133,7 +133,6 @@
} ConnectStateData;
/* STATIC */
-static int incame = 0;
static int commBind(int s, struct in_addr, u_short port);
#if !HAVE_POLL
static int examine_select(fd_set *, fd_set *);
@@ -141,7 +140,6 @@
static void checkTimeouts(void);
static void commSetReuseAddr(int);
static void commSetNoLinger(int);
-static void comm_incoming(void);
static void CommWriteStateCallbackAndFree(int fd, int code);
#ifdef TCP_NODELAY
static void commSetTcpNoDelay(int);
@@ -169,47 +167,54 @@
* of incoming ICP, then we need to check these sockets more than
* if we just have HTTP.
*
- * The variable 'incoming_interval' determines how many normal I/O
- * events to process before checking incoming sockets again.
- * Note we store the incoming_interval multipled by a factor
- * of 16 (e.g. <<4) to have some pseudo-floating point precision.
+ * The variables 'incoming_icp_interval' and 'incoming_http_interval'
+ * determine how many normal I/O events to process before checking
+ * incoming sockets again. Note we store the incoming_interval
+ * multipled by a factor of (2^INCOMING_FACTOR) to have some
+ * pseudo-floating point precision.
*
- * The variable 'io_events' counts how many normal I/O events have
- * been processed. When io_events > incoming_interval, its time
- * to check incoming sockets.
+ * The variable 'icp_io_events' and 'http_io_events' counts how many normal
+ * I/O events have been processed since the last check on the incoming
+ * sockets. When io_events > incoming_interval, its time to check incoming
+ * sockets.
*
* Every time we check incoming sockets, we count how many new messages
* or connections were processed. This is used to adjust the
* incoming_interval for the next iteration. The new incoming_interval
- * is calculated as the average of the current incoming_interval and
- * 32 divided by the number of incoming events just processed. e.g.
+ * is calculated as the current incoming_interval plus what we would
+ * like to see as an average number of events minus the number of
+ * events just processed.
*
- * 1 1 32
- * incoming_interval = - incoming_interval + - -----------------
- * 2 2 incoming_events
+ * incoming_interval = incoming_interval + 1 - number_of_events_processed
+ *
+ * There are separate incoming_interval counters for both HTTP and ICP events
*
- * You can see the current value of incoming_interval, as well as
+ * You can see the current values of the incoming_interval's, as well as
* a histogram of 'incoming_events' by asking the cache manager
* for 'comm_incoming', e.g.:
*
* % ./client mgr:comm_incoming
*
- * Bugs:
- *
- * - We have 32 as a magic upper limit on incoming_interval.
- * - INCOMING_TOTAL_MAX = INCOMING_ICP_MAX + INCOMING_HTTP_MAX,
- * but this assumes only one ICP socket and one HTTP socket.
- * If there are multiple incoming HTTP sockets, the we could
- * conceivably process more than INCOMING_TOTAL_MAX events
- * in comm_incoming().
+ * Caveats:
*
- * The 'invert32[]' array is a pre-calculated array of division for 32/i
+ * - We have MAX_INCOMING_INTEGER as a magic upper limit on
+ * incoming_interval for both types of sockets. At the
+ * largest value the cache will effectively be idling.
*
+ * - The higher the INCOMING_FACTOR, the slower the algorithm will
+ * respond to load spikes/increases/decreases in demand. A value
+ * between 3 and 8 is recommended.
*/
-static int io_events = 0;
-static int incoming_interval = 16 << 4;
-static int invert32[INCOMING_TOTAL_MAX];
-#define commCheckIncoming (++io_events > (incoming_interval>>4))
+
+#define MAX_INCOMING_INTEGER 256
+#define INCOMING_FACTOR 5
+#define MAX_INCOMING_INTERVAL (MAX_INCOMING_INTEGER << INCOMING_FACTOR)
+static int icp_io_events = 0;
+static int http_io_events = 0;
+static int incoming_icp_interval = 16 << INCOMING_FACTOR;
+static int incoming_http_interval = 16 << INCOMING_FACTOR;
+#define commCheckICPIncoming (++icp_io_events > (incoming_icp_interval>>
INCOMING_FACTOR))
+#define commCheckHTTPIncoming (++http_io_events > (incoming_http_interval>>
INCOMING_FACTOR))
static void
CommWriteStateCallbackAndFree(int fd, int code)
@@ -763,35 +768,21 @@
return F->defer_check(fd, F->defer_data);
}
-static void
-comm_incoming(void)
-{
- int j;
- incame = 0;
- io_events = 0;
- if (theInIcpConnection > 0) {
- icpHandleUdp(theInIcpConnection, &incame);
- if (theInIcpConnection != theOutIcpConnection)
- icpHandleUdp(theOutIcpConnection, &incame);
- }
- for (j = 0; j < NHttpSockets; j++) {
- if (HttpSockets[j] < 0)
- continue;
- httpAccept(HttpSockets[j], &incame);
- }
- statHistCount(&Counter.comm_incoming, incame);
- if (incame < INCOMING_TOTAL_MAX)
- incoming_interval = (incoming_interval >> 1) + (invert32[incame] << 3);
-}
static int
-fdIsHttpOrIcp(int fd)
+fdIsIcp(int fd)
{
- int j;
if (fd == theInIcpConnection)
return 1;
if (fd == theOutIcpConnection)
return 1;
+ return 0;
+}
+
+static int
+fdIsHttp(int fd)
+{
+ int j;
for (j = 0; j < NHttpSockets; j++) {
if (fd == HttpSockets[j])
return 1;
@@ -800,6 +791,115 @@
}
#if HAVE_POLL
+
+int
+comm_check_incoming_poll_handlers(int nfds, int *fds)
+{
+ int i;
+ int fd;
+ int incame = 0;
+ PF *hdl = NULL;
+ int npfds;
+ struct pollfd pfds[3 + MAXHTTPPORTS];
+
+ for (i = npfds = 0; i < nfds; i++) {
+ int events;
+ fd = fds[i];
+ events = 0;
+ if (fd_table[fd].read_handler)
+ events |= POLLRDNORM;
+ if (fd_table[fd].write_handler)
+ events |= POLLWRNORM;
+ if (events) {
+ pfds[npfds].fd = fd;
+ pfds[npfds].events = events;
+ pfds[npfds].revents = 0;
+ npfds++;
+ }
+ }
+ if (!nfds)
+ return incame;
+#if !ALARM_UPDATES_TIME
+ getCurrentTime();
+#endif
+ if(poll(pfds, npfds, 0) < 1)
+ return incame;
+ for (i = 0; i < npfds; i++) {
+ int revents;
+ if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
+ continue;
+ if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
+ if (hdl = fd_table[fd].read_handler) {
+ fd_table[fd].read_handler = NULL;
+ hdl(fd, &incame);
+ } else
+ debug(5, 1) ("comm_poll_incoming: NULL read handler\n");
+ }
+ if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
+ if (hdl = fd_table[fd].write_handler) {
+ fd_table[fd].write_handler = NULL;
+ hdl(fd, &incame);
+ } else
+ debug(5, 1) ("comm_poll_incoming: NULL write handler\n");
+ }
+ }
+ return incame;
+}
+
+static void
+comm_poll_icp_incoming(void)
+{
+ int nfds = 0;
+ int fds[2];
+ int nevents;
+
+ icp_io_events = 0;
+ if (theInIcpConnection >= 0)
+ fds[nfds++] = theInIcpConnection;
+ if (theInIcpConnection != theOutIcpConnection)
+ if (theOutIcpConnection >= 0)
+ fds[nfds++] = theOutIcpConnection;
+ if(nfds == 0)
+ return;
+ nevents = comm_check_incoming_poll_handlers(nfds, fds);
+ incoming_icp_interval = incoming_icp_interval + 1 - nevents;
+ if (incoming_icp_interval < 0)
+ incoming_icp_interval = 0;
+ if (incoming_icp_interval > MAX_INCOMING_INTERVAL)
+ incoming_icp_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_ICP_MAX)
+ nevents = INCOMING_ICP_MAX;
+ statHistCount(&Counter.comm_icp_incoming, nevents);
+}
+
+static void
+comm_poll_http_incoming(void)
+{
+ int nfds = 0;
+ int fds[MAXHTTPPORTS];
+ int j;
+ int nevents;
+
+ http_io_events = 0;
+ for (j = 0; j < NHttpSockets; j++) {
+ if (HttpSockets[j] < 0)
+ continue;
+ if (commDeferRead(HttpSockets[j]))
+ continue;
+ fds[nfds++] = HttpSockets[j];
+ }
+ nevents = comm_check_incoming_poll_handlers(nfds, fds);
+ incoming_http_interval = incoming_http_interval + 1 - nevents;
+ if (incoming_http_interval < 0)
+ incoming_http_interval = 0;
+ if (incoming_http_interval > MAX_INCOMING_INTERVAL)
+ incoming_http_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_HTTP_MAX)
+ nevents = INCOMING_HTTP_MAX;
+ statHistCount(&Counter.comm_http_incoming, nevents);
+}
+
+
/* poll all sockets; call handlers for those that are ready. */
int
comm_poll(int msec)
@@ -811,8 +911,8 @@
int maxfd;
unsigned long nfds;
int num;
+ int callicp = 0, callhttp = 0;
static time_t last_timeout = 0;
- static int lastinc = 0;
double timeout = current_dtime + (msec / 1000.0);
do {
#if !ALARM_UPDATES_TIME
@@ -833,7 +933,11 @@
#if USE_ASYNC_IO
aioCheckCallbacks();
#endif
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_poll_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_poll_http_incoming();
+ callicp = callhttp = 0;
nfds = 0;
maxfd = Biggest_FD + 1;
for (i = 0; i < maxfd; i++) {
@@ -884,16 +988,24 @@
int revents;
if (((revents = pfds[i].revents) == 0) || ((fd = pfds[i].fd) == -1))
continue;
- if (fdIsHttpOrIcp(fd))
+ if (fdIsIcp(fd)) {
+ callicp = 1;
+ continue;
+ }
+ if (fdIsHttp(fd)) {
+ callhttp = 1;
continue;
+ }
if (revents & (POLLRDNORM | POLLIN | POLLHUP | POLLERR)) {
debug(5, 6) ("comm_poll: FD %d ready for reading\n", fd);
if ((hdl = fd_table[fd].read_handler)) {
fd_table[fd].read_handler = NULL;
hdl(fd, fd_table[fd].read_data);
}
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_poll_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_poll_http_incoming();
}
if (revents & (POLLWRNORM | POLLOUT | POLLHUP | POLLERR)) {
debug(5, 5) ("comm_poll: FD %d ready for writing\n", fd);
@@ -901,8 +1013,10 @@
fd_table[fd].write_handler = NULL;
hdl(fd, fd_table[fd].write_data);
}
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_poll_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_poll_http_incoming();
}
if (revents & POLLNVAL) {
close_handler *ch;
@@ -929,8 +1043,11 @@
if (F->open != 0)
fd_close(fd);
}
- lastinc = incame;
}
+ if(callicp)
+ comm_poll_icp_incoming();
+ if(callhttp)
+ comm_poll_http_incoming();
return COMM_OK;
} while (timeout > current_dtime);
debug(5, 8) ("comm_poll: time out: %d.\n", squid_curtime);
@@ -939,6 +1056,114 @@
#else
+int
+comm_check_incoming_select_handlers(int nfds, int *fds)
+{
+ int i;
+ int fd;
+ int incame = 0;
+ int maxfd = 0;
+ PF *hdl = NULL;
+ fd_set read_mask;
+ fd_set write_mask;
+
+ FD_ZERO(&read_mask);
+ FD_ZERO(&write_mask);
+ for (i = 0; i < nfds; i++) {
+ fd = fds[i];
+ if (fd_table[fd].read_handler) {
+ FD_SET(fd, &read_mask);
+ if (fd > maxfd)
+ maxfd = fd;
+ }
+ if (fd_table[fd].write_handler) {
+ FD_SET(fd, &write_mask);
+ if (fd > maxfd)
+ maxfd = fd;
+ }
+ }
+ if (maxfd++ == 0)
+ return incame;
+#if !ALARM_UPDATES_TIME
+ getCurrentTime();
+#endif
+ if (select(maxfd, &read_mask, &write_mask, NULL, &zero_tv) < 1)
+ return incame;
+ for (i = 0; i < nfds; i++) {
+ fd = fds[i];
+ if (FD_ISSET(fd, &read_mask)) {
+ if ((hdl = fd_table[fd].read_handler) != NULL) {
+ fd_table[fd].read_handler = NULL;
+ hdl(fd, &incame);
+ } else {
+ debug(5, 1) ("comm_select_incoming: NULL read handler\n");
+ }
+ }
+ if (FD_ISSET(fd, &write_mask)) {
+ if ((hdl = fd_table[fd].write_handler) != NULL) {
+ fd_table[fd].write_handler = NULL;
+ hdl(fd, &incame);
+ } else {
+ debug(5, 1) ("comm_select_incoming: NULL write handler\n");
+ }
+ }
+ }
+ return incame;
+}
+
+static void
+comm_select_icp_incoming(void)
+{
+ int nfds = 0;
+ int fds[2];
+ int nevents;
+
+ icp_io_events = 0;
+ if (theInIcpConnection >= 0)
+ fds[nfds++] = theInIcpConnection;
+ if (theInIcpConnection != theOutIcpConnection)
+ if (theOutIcpConnection >= 0)
+ fds[nfds++] = theOutIcpConnection;
+ if(nfds == 0)
+ return;
+ nevents = comm_check_incoming_select_handlers(nfds, fds);
+ incoming_icp_interval = incoming_icp_interval + 1 - nevents;
+ if (incoming_icp_interval < 0)
+ incoming_icp_interval = 0;
+ if (incoming_icp_interval > MAX_INCOMING_INTERVAL)
+ incoming_icp_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_ICP_MAX)
+ nevents = INCOMING_ICP_MAX;
+ statHistCount(&Counter.comm_icp_incoming, nevents);
+}
+
+static void
+comm_select_http_incoming(void)
+{
+ int nfds = 0;
+ int fds[MAXHTTPPORTS];
+ int j;
+ int nevents;
+
+ http_io_events = 0;
+ for (j = 0; j < NHttpSockets; j++) {
+ if (HttpSockets[j] < 0)
+ continue;
+ if (commDeferRead(HttpSockets[j]))
+ continue;
+ fds[nfds++] = HttpSockets[j];
+ }
+ nevents = comm_check_incoming_select_handlers(nfds, fds);
+ incoming_http_interval = incoming_http_interval + 1 - nevents;
+ if (incoming_http_interval < 0)
+ incoming_http_interval = 0;
+ if (incoming_http_interval > MAX_INCOMING_INTERVAL)
+ incoming_http_interval = MAX_INCOMING_INTERVAL;
+ if(nevents > INCOMING_HTTP_MAX)
+ nevents = INCOMING_HTTP_MAX;
+ statHistCount(&Counter.comm_http_incoming, nevents);
+}
+
/* Select on all sockets; call handlers for those that are ready. */
int
comm_select(int msec)
@@ -951,9 +1176,9 @@
int maxfd;
int nfds;
int num;
+ int callicp = 0, callhttp = 0;
static time_t last_timeout = 0;
struct timeval poll_time;
- static int lastinc;
double timeout = current_dtime + (msec / 1000.0);
do {
@@ -979,7 +1204,11 @@
else
setSocketShutdownLifetimes(1);
}
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_select_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_select_http_incoming();
+ callicp = callhttp = 0;
nfds = 0;
maxfd = Biggest_FD + 1;
for (i = 0; i < maxfd; i++) {
@@ -1035,8 +1264,14 @@
for (fd = 0; fd < maxfd; fd++) {
if (!FD_ISSET(fd, &readfds) && !FD_ISSET(fd, &writefds))
continue;
- if (fdIsHttpOrIcp(fd))
+ if (fdIsIcp(fd)) {
+ callicp = 1;
+ continue;
+ }
+ if (fdIsHttp(fd)) {
+ callhttp = 1;
continue;
+ }
if (FD_ISSET(fd, &readfds)) {
debug(5, 6) ("comm_select: FD %d ready for reading\n", fd);
if (fd_table[fd].read_handler) {
@@ -1044,8 +1279,10 @@
fd_table[fd].read_handler = NULL;
hdl(fd, fd_table[fd].read_data);
}
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_select_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_select_http_incoming();
}
if (FD_ISSET(fd, &writefds)) {
debug(5, 5) ("comm_select: FD %d ready for writing\n", fd);
@@ -1054,11 +1291,16 @@
fd_table[fd].write_handler = NULL;
hdl(fd, fd_table[fd].write_data);
}
- if (commCheckIncoming)
- comm_incoming();
+ if (commCheckICPIncoming)
+ comm_select_icp_incoming();
+ if (commCheckHTTPIncoming)
+ comm_select_http_incoming();
}
- lastinc = incame;
}
+ if(callicp)
+ comm_select_icp_incoming();
+ if(callhttp)
+ comm_select_http_incoming();
return COMM_OK;
} while (timeout > current_dtime);
debug(5, 8) ("comm_select: time out: %d\n", (int) squid_curtime);
@@ -1188,6 +1430,7 @@
}
#endif
+
void
comm_init(void)
{
@@ -1200,9 +1443,6 @@
RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
zero_tv.tv_sec = 0;
zero_tv.tv_usec = 0;
- invert32[0] = 32;
- for (i = 1; i < INCOMING_TOTAL_MAX; i++)
- invert32[i] = (int) (32.0 / (double) i + 0.5);
cachemgrRegister("comm_incoming",
"comm_incoming() stats",
commIncomingStats, 0);
@@ -1419,10 +1659,22 @@
commIncomingStats(StoreEntry * sentry)
{
StatCounters *f = &Counter;
- storeAppendPrintf(sentry, "Current incoming_interval: %d\n",
- incoming_interval >> 4);
+ storeAppendPrintf(sentry, "Current incoming_icp_interval: %d\n",
+ incoming_icp_interval >> INCOMING_FACTOR);
+ storeAppendPrintf(sentry, "Current incoming_http_interval: %d\n",
+ incoming_http_interval >> INCOMING_FACTOR);
storeAppendPrintf(sentry, "\n");
- storeAppendPrintf(sentry, "Histogram of number of incoming sockets or\n");
- storeAppendPrintf(sentry, "Messages handled per comm_incoming() call:\n");
- statHistDump(&f->comm_incoming, sentry, statHistIntDumper);
+ storeAppendPrintf(sentry, "Histogram of events per incoming socket
type\n");
+#ifdef HAVE_POLL
+ storeAppendPrintf(sentry, "ICP Messages handled per
comm_poll_icp_incoming() call:\n");
+#else
+ storeAppendPrintf(sentry, "ICP Messages handled per
comm_select_icp_incoming() call:\n");
+#endif
+ statHistDump(&f->comm_icp_incoming, sentry, statHistIntDumper);
+#ifdef HAVE_POLL
+ storeAppendPrintf(sentry, "HTTP Messages handled per
comm_poll_http_incoming() call:\n");
+#else
+ storeAppendPrintf(sentry, "HTTP Messages handled per
comm_select_http_incoming() call:\n");
+#endif
+ statHistDump(&f->comm_http_incoming, sentry, statHistIntDumper);
}
--- stat.c 1998/06/05 02:26:41 1.1
+++ stat.c 1998/06/05 02:29:01
@@ -883,7 +883,8 @@
* Cache Digest Stuff
*/
statHistEnumInit(&C->cd.on_xition_count, CacheDigestHashFuncCount);
- statHistEnumInit(&C->comm_incoming, INCOMING_TOTAL_MAX);
+ statHistEnumInit(&C->comm_icp_incoming, INCOMING_ICP_MAX);
+ statHistEnumInit(&C->comm_http_incoming, INCOMING_HTTP_MAX);
}
/* add special cases here as they arrive */
@@ -899,7 +900,8 @@
statHistClean(&C->icp.reply_svc_time);
statHistClean(&C->dns.svc_time);
statHistClean(&C->cd.on_xition_count);
- statHistClean(&C->comm_incoming);
+ statHistClean(&C->comm_icp_incoming);
+ statHistClean(&C->comm_http_incoming);
}
/* add special cases here as they arrive */
@@ -921,7 +923,8 @@
statHistCopy(&dest->icp.reply_svc_time, &orig->icp.reply_svc_time);
statHistCopy(&dest->dns.svc_time, &orig->dns.svc_time);
statHistCopy(&dest->cd.on_xition_count, &orig->cd.on_xition_count);
- statHistCopy(&dest->comm_incoming, &orig->comm_incoming);
+ statHistCopy(&dest->comm_icp_incoming, &orig->comm_icp_incoming);
+ statHistCopy(&dest->comm_http_incoming, &orig->comm_http_incoming);
}
static void
Received on Tue Jul 29 2003 - 13:15:50 MDT
This archive was generated by hypermail pre-2.1.9 : Tue Dec 09 2003 - 16:11:48 MST