ModKqueue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 05 Socket Functions */
10 
11 /*
12  * This code was originally written by Benno Rice and hacked on quite
13  * a bit by Adrian. Adrian then took it to the hybrid-ircd project to use
14  * in their new IO subsystem. After a year of modifications and some
15  * rather interesting changes (event aggregation) its back in squid.
16  * Thanks to the ircd-hybrid guys.
17  */
18 
19 /*
20  * XXX Currently not implemented / supported by this module XXX
21  *
22  * - delay pools
23  * - deferred reads
24  * - flags.read_pending
25  *
26  * So, its not entirely useful in a production setup since if a read
27  * is meant to be deferred it isn't (we're not even throwing the event
28  * away here). Eventually the rest of the code will be rewritten
29  * so deferred reads aren't required.
30  * -- adrian
31  */
32 #include "squid.h"
33 
34 #if USE_KQUEUE
35 #include "comm/Loops.h"
36 #include "fde.h"
37 #include "globals.h"
38 #include "StatCounters.h"
39 #include "Store.h"
40 
41 #include <cerrno>
42 #if HAVE_SYS_EVENT_H
43 #include <sys/event.h>
44 #endif
45 
46 #define KE_LENGTH 128
47 
48 static void kq_update_events(int, short, PF *);
49 static int kq;
50 
51 static struct timespec zero_timespec;
52 
53 static struct kevent *kqlst; /* kevent buffer */
54 static int kqmax; /* max structs to buffer */
55 static int kqoff; /* offset into the buffer */
56 static int max_poll_time = 1000;
57 
58 static void commKQueueRegisterWithCacheManager(void);
59 
60 /* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
61 /* Private functions */
62 
63 void
64 kq_update_events(int fd, short filter, PF * handler)
65 {
66  PF *cur_handler;
67  int kep_flags;
68 
69  switch (filter) {
70 
71  case EVFILT_READ:
72  cur_handler = fd_table[fd].read_handler;
73  break;
74 
75  case EVFILT_WRITE:
76  cur_handler = fd_table[fd].write_handler;
77  break;
78 
79  default:
80  /* XXX bad! -- adrian */
81  return;
82  break;
83  }
84 
85  if ((cur_handler == NULL && handler != NULL)
86  || (cur_handler != NULL && handler == NULL)) {
87 
88  struct kevent *kep;
89 
90  kep = kqlst + kqoff;
91 
92  if (handler != NULL) {
93  kep_flags = (EV_ADD | EV_ONESHOT);
94  } else {
95  kep_flags = EV_DELETE;
96  }
97 
98  EV_SET(kep, (uintptr_t) fd, filter, kep_flags, 0, 0, 0);
99 
100  /* Check if we've used the last one. If we have then submit them all */
101  if (kqoff == kqmax - 1) {
102  int ret;
103 
104  ret = kevent(kq, kqlst, kqmax, nullptr, 0, &zero_timespec);
105  /* jdc -- someone needs to do error checking... */
106 
107  if (ret == -1) {
108  perror("kq_update_events(): kevent()");
109  return;
110  }
111 
112  kqoff = 0;
113  } else {
114  ++kqoff;
115  }
116  }
117 }
118 
119 /* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
120 /* Public functions */
121 
122 /*
123  * comm_select_init
124  *
125  * This is a needed exported function which will be called to initialise
126  * the network loop code.
127  */
128 void
130 {
131  kq = kqueue();
132 
133  if (kq < 0) {
134  fatal("comm_select_init: Couldn't open kqueue fd!\n");
135  }
136 
137  kqmax = getdtablesize();
138 
139  kqlst = (struct kevent *)xmalloc(sizeof(*kqlst) * kqmax);
140  zero_timespec.tv_sec = 0;
141  zero_timespec.tv_nsec = 0;
142 
144 }
145 
146 /*
147  * comm_setselect
148  *
149  * This is a needed exported function which will be called to register
150  * and deregister interest in a pending IO state for a given FD.
151  */
152 void
153 Comm::SetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout)
154 {
155  fde *F = &fd_table[fd];
156  assert(fd >= 0);
157  assert(F->flags.open || (!handler && !client_data && !timeout));
158  debugs(5, 5, "FD " << fd << ", type=" << type <<
159  ", handler=" << handler << ", client_data=" << client_data <<
160  ", timeout=" << timeout);
161 
162  if (type & COMM_SELECT_READ) {
163  if (F->flags.read_pending)
164  kq_update_events(fd, EVFILT_WRITE, handler);
165 
166  kq_update_events(fd, EVFILT_READ, handler);
167 
168  F->read_handler = handler;
169  F->read_data = client_data;
170  }
171 
172  if (type & COMM_SELECT_WRITE) {
173  kq_update_events(fd, EVFILT_WRITE, handler);
174  F->write_handler = handler;
175  F->write_data = client_data;
176  }
177 
178  if (timeout)
179  F->timeout = squid_curtime + timeout;
180 
181 }
182 
183 /*
184  * Check all connections for new connections and input data that is to be
185  * processed. Also check for connections with data queued and whether we can
186  * write it out.
187  */
188 
189 /*
190  * comm_select
191  *
192  * Called to do the new-style IO, courtesy of of squid (like most of this
193  * new IO code). This routine handles the stuff we've hidden in
194  * comm_setselect and fd_table[] and calls callbacks for IO ready
195  * events.
196  */
197 
199 Comm::DoSelect(int msec)
200 {
201  int num, i;
202 
203  static struct kevent ke[KE_LENGTH];
204 
205  struct timespec poll_time;
206 
207  if (msec > max_poll_time)
208  msec = max_poll_time;
209 
210  poll_time.tv_sec = msec / 1000;
211 
212  poll_time.tv_nsec = (msec % 1000) * 1000000;
213 
214  for (;;) {
215  num = kevent(kq, kqlst, kqoff, ke, KE_LENGTH, &poll_time);
217  kqoff = 0;
218 
219  if (num >= 0)
220  break;
221 
222  if (ignoreErrno(errno))
223  break;
224 
225  getCurrentTime();
226 
227  return Comm::COMM_ERROR;
228 
229  /* NOTREACHED */
230  }
231 
232  getCurrentTime();
233 
234  if (num == 0)
235  return Comm::OK; /* No error.. */
236 
237  for (i = 0; i < num; ++i) {
238  int fd = (int) ke[i].ident;
239  PF *hdl = nullptr;
240  fde *F = &fd_table[fd];
241 
242  if (ke[i].flags & EV_ERROR) {
243  errno = ke[i].data;
244  /* XXX error == bad! -- adrian */
245  continue; /* XXX! */
246  }
247 
248  if (ke[i].filter == EVFILT_READ || F->flags.read_pending) {
249  if ((hdl = F->read_handler) != NULL) {
250  F->read_handler = nullptr;
251  hdl(fd, F->read_data);
252  }
253  }
254 
255  if (ke[i].filter == EVFILT_WRITE) {
256  if ((hdl = F->write_handler) != NULL) {
257  F->write_handler = nullptr;
258  hdl(fd, F->write_data);
259  }
260  }
261 
262  if (ke[i].filter != EVFILT_WRITE && ke[i].filter != EVFILT_READ) {
263  /* Bad! -- adrian */
264  debugs(5, DBG_IMPORTANT, "comm_select: kevent returned " << ke[i].filter << "!");
265  }
266  }
267 
268  return Comm::OK;
269 }
270 
271 void
273 {
274  max_poll_time = 10;
275 }
276 
277 static void
279 {
280 }
281 
282 #endif /* USE_KQUEUE */
283 
void fatal(const char *message)
Definition: fatal.cc:28
#define xmalloc
Comm::Flag DoSelect(int)
Do poll and trigger callback functions as appropriate.
Definition: ModDevPoll.cc:308
static uint32 F(uint32 X, uint32 Y, uint32 Z)
Definition: md4.c:46
static int kqoff
Definition: ModKqueue.cc:55
@ OK
Definition: Flag.h:16
Definition: fde.h:51
static struct kevent * kqlst
Definition: ModKqueue.cc:53
#define NULL
Definition: types.h:145
static struct timespec zero_timespec
Definition: ModKqueue.cc:51
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
static int kqmax
Definition: ModKqueue.cc:54
static void kq_update_events(int, short, PF *)
Definition: ModKqueue.cc:64
#define assert(EX)
Definition: assert.h:17
@ COMM_ERROR
Definition: Flag.h:17
#define COMM_SELECT_READ
Definition: defines.h:24
time_t squid_curtime
Definition: stub_libtime.cc:20
void SelectLoopInit(void)
Initialize the module on Squid startup.
Definition: ModDevPoll.cc:170
Flag
Definition: Flag.h:15
int ignoreErrno(int ierrno)
Definition: comm.cc:1422
#define fd_table
Definition: fde.h:189
unsigned long int select_loops
Definition: StatCounters.h:119
void SetSelect(int, unsigned int, PF *, void *, time_t)
Mark an FD to be watched for its IO status.
Definition: ModDevPoll.cc:220
#define DBG_IMPORTANT
Definition: Stream.h:38
static int max_poll_time
Definition: ModKqueue.cc:56
static int kq
Definition: ModKqueue.cc:49
static void commKQueueRegisterWithCacheManager(void)
Definition: ModKqueue.cc:278
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
#define COMM_SELECT_WRITE
Definition: defines.h:25
#define KE_LENGTH
Definition: ModKqueue.cc:46
void QuickPollRequired(void)
Definition: ModDevPoll.cc:414
void PF(int, void *)
Definition: forward.h:18
int unsigned int
Definition: stub_fd.cc:19
StatCounters statCounter
Definition: StatCounters.cc:12

 

Introduction

Documentation

Support

Miscellaneous