DiskdIOStrategy.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Squid-side DISKD I/O functions. */
10 
11 #include "squid.h"
12 #include "comm.h"
13 #include "comm/Loops.h"
14 #include "compat/select.h"
15 #include "ConfigOption.h"
16 #include "diomsg.h"
17 #include "DiskdFile.h"
18 #include "DiskdIOStrategy.h"
19 #include "DiskIO/DiskFile.h"
20 #include "fd.h"
21 #include "SquidConfig.h"
22 #include "SquidIpc.h"
23 #include "StatCounters.h"
24 #include "Store.h"
25 #include "unlinkd.h"
26 
27 #include <cerrno>
28 #if HAVE_SYS_IPC_H
29 #include <sys/ipc.h>
30 #endif
31 #if HAVE_SYS_MSG_H
32 #include <sys/msg.h>
33 #endif
34 #if HAVE_SYS_SHM_H
35 #include <sys/shm.h>
36 #endif
37 
39 
41 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
42 
43 size_t
45 {
46  return ++nextInstanceID;
47 }
48 
49 bool
51 {
52  /*
53  * Fail on open() if there are too many requests queued.
54  */
55 
56  if (away > magic1) {
57  debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
58 
59  return true;
60  }
61 
62  return false;
63 }
64 
65 int
67 {
68  /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
69  /* the parse function guarantees magic2 is positive */
70  return away * 1000 / magic2;
71 }
72 
73 void
75 {
77 }
78 
80 DiskdIOStrategy::newFile(char const *path)
81 {
82  if (shedLoad()) {
83  openFailed();
84  return nullptr;
85  }
86 
87  return new DiskdFile (path, this);
88 }
89 
90 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
91 {}
92 
93 bool
95 {
96  return true;
97 }
98 
99 void
101 {
102  if (shedLoad()) {
103  /* Damn, we need to issue a sync unlink here :( */
104  debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
105  unlinkdUnlink(path);
106  return;
107  }
108 
109  /* We can attempt a diskd unlink */
110  int x;
111 
112  ssize_t shm_offset;
113 
114  char *buf;
115 
116  buf = (char *)shm.get(&shm_offset);
117 
118  xstrncpy(buf, path, SHMBUF_BLKSZ);
119 
120  x = send(_MQD_UNLINK,
121  0,
122  (StoreIOState::Pointer )nullptr,
123  0,
124  0,
125  shm_offset);
126 
127  if (x < 0) {
128  int xerrno = errno;
129  debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
130  ::unlink(buf); /* XXX EWW! */
131  // shm.put (shm_offset);
132  }
133 
135 }
136 
137 void
139 {
140  int pid;
141  void * hIpc;
142  int rfd;
143  int ikey;
144  const char *args[5];
145  char skey1[32];
146  char skey2[32];
147  char skey3[32];
148  Ip::Address localhost;
149 
150  ikey = (getpid() << 10) + (instanceID << 2);
151  ikey &= 0x7fffffff;
152  smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
153 
154  if (smsgid < 0) {
155  int xerrno = errno;
156  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
157  fatal("msgget failed");
158  }
159 
160  rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
161 
162  if (rmsgid < 0) {
163  int xerrno = errno;
164  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
165  fatal("msgget failed");
166  }
167 
168  shm.init(ikey, magic2);
169  snprintf(skey1, 32, "%d", ikey);
170  snprintf(skey2, 32, "%d", ikey + 1);
171  snprintf(skey3, 32, "%d", ikey + 2);
172  args[0] = "diskd";
173  args[1] = skey1;
174  args[2] = skey2;
175  args[3] = skey3;
176  args[4] = nullptr;
177  localhost.setLocalhost();
180  args,
181  "diskd",
182  localhost,
183  &rfd,
184  &wfd,
185  &hIpc);
186 
187  if (pid < 0)
188  fatalf("execl: %s", Config.Program.diskd);
189 
190  if (rfd != wfd)
191  comm_close(rfd);
192 
193  fd_note(wfd, "squid -> diskd");
194 
198 }
199 
200 /*
201  * SHM manipulation routines
202  */
203 void
204 SharedMemory::put(ssize_t offset)
205 {
206  int i;
207  assert(offset >= 0);
208  assert(offset < nbufs * SHMBUF_BLKSZ);
209  i = offset / SHMBUF_BLKSZ;
210  assert(i < nbufs);
212  CBIT_CLR(inuse_map, i);
214 }
215 
216 void *
217 
218 SharedMemory::get(ssize_t * shm_offset)
219 {
220  char *aBuf = nullptr;
221  int i;
222 
223  for (i = 0; i < nbufs; ++i) {
224  if (CBIT_TEST(inuse_map, i))
225  continue;
226 
227  CBIT_SET(inuse_map, i);
228 
229  *shm_offset = i * SHMBUF_BLKSZ;
230 
231  aBuf = buf + (*shm_offset);
232 
233  break;
234  }
235 
236  assert(aBuf);
237  assert(aBuf >= buf);
238  assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
240 
243 
244  return aBuf;
245 }
246 
247 void
248 SharedMemory::init(int ikey, int magic2)
249 {
250  nbufs = (int)(magic2 * 1.3);
251  id = shmget((key_t) (ikey + 2),
252  nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
253 
254  if (id < 0) {
255  int xerrno = errno;
256  debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
257  fatal("shmget failed");
258  }
259 
260  buf = (char *)shmat(id, nullptr, 0);
261 
262  if (buf == (void *) -1) {
263  int xerrno = errno;
264  debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
265  fatal("shmat failed");
266  }
267 
268  inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
270 
271  for (int i = 0; i < nbufs; ++i) {
272  CBIT_SET(inuse_map, i);
273  put (i * SHMBUF_BLKSZ);
274  }
275 }
276 
277 void
279 {
280  debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
281  ++statCounter.syscalls.disk.unlinks;
282 
283  if (M->status < 0)
285  else
287 }
288 
289 void
291 {
293  /* I.e. already closed file
294  * - say when we have a error opening after
295  * a read was already queued
296  */
297  debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
299  return;
300  }
301 
302  /* set errno passed from diskd. makes debugging more meaningful */
303  if (M->status < 0)
304  errno = -M->status;
305 
306  if (M->newstyle) {
307  DiskdFile *theFile = (DiskdFile *)M->callback_data;
308  theFile->unlock();
309  theFile->completed (M);
310  } else
311  switch (M->mtype) {
312 
313  case _MQD_OPEN:
314 
315  case _MQD_CREATE:
316 
317  case _MQD_CLOSE:
318 
319  case _MQD_READ:
320 
321  case _MQD_WRITE:
322  assert (0);
323  break;
324 
325  case _MQD_UNLINK:
326  unlinkDone(M);
327  break;
328 
329  default:
330  assert(0);
331  break;
332  }
333 
335 }
336 
337 int
338 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
339 {
340  diomsg M;
341  M.callback_data = cbdataReference(theFile);
342  theFile->lock();
343  M.requestor = requestor;
344  M.newstyle = true;
345 
346  if (requestor)
347  requestor->lock();
348 
349  return SEND(&M, mtype, id, size, offset, shm_offset);
350 }
351 
352 int
353 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
354 {
355  diomsg M;
357  M.newstyle = false;
358 
359  return SEND(&M, mtype, id, size, offset, shm_offset);
360 }
361 
362 int
363 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
364 {
365  static int send_errors = 0;
366  static int last_seq_no = 0;
367  static int seq_no = 0;
368  int x;
369 
370  M->mtype = mtype;
371  M->size = size;
372  M->offset = offset;
373  M->status = -1;
374  M->shm_offset = (int) shm_offset;
375  M->id = id;
376  M->seq_no = ++seq_no;
377 
378  if (M->seq_no < last_seq_no)
379  debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
380 
381  x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
382 
383  last_seq_no = M->seq_no;
384 
385  if (0 == x) {
387  ++away;
388  } else {
389  int xerrno = errno;
390  debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
392  ++send_errors;
393  assert(send_errors < 100);
394  if (shm_offset > -1)
395  shm.put(shm_offset);
396  }
397 
398  /*
399  * We have to drain the queue here if necessary. If we don't,
400  * then we can have a lot of messages in the queue (probably
401  * up to 2*magic1) and we can run out of shared memory buffers.
402  */
403  /*
404  * Note that we call Store::Root().callbackk (for all SDs), rather
405  * than callback for just this SD, so that while
406  * we're "blocking" on this SD we can also handle callbacks
407  * from other SDs that might be ready.
408  */
409 
410  struct timeval delay = {0, 1};
411 
412  while (away > magic2) {
413  xselect(0, nullptr, nullptr, nullptr, &delay);
414  Store::Root().callback();
415 
416  if (delay.tv_usec < 1000000)
417  delay.tv_usec <<= 1;
418  }
419 
420  return x;
421 }
422 
423 ConfigOption *
425 {
429  return result;
430 }
431 
432 bool
433 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
434 {
435  if (strcmp(name, "Q1") != 0)
436  return false;
437 
438  int old_magic1 = magic1;
439 
440  magic1 = atoi(value);
441 
442  if (!isaReconfig)
443  return true;
444 
445  if (old_magic1 < magic1) {
446  /*
447  * This is because shm.nbufs is computed at startup, when
448  * we call shmget(). We can't increase the Q1/Q2 parameters
449  * beyond their initial values because then we might have
450  * more "Q2 messages" than shared memory chunks, and this
451  * will cause an assertion in storeDiskdShmGet().
452  */
453  /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
454  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
455  magic1 = old_magic1;
456  return true;
457  }
458 
459  if (old_magic1 != magic1)
460  debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
461 
462  return true;
463 }
464 
465 void
467 {
468  storeAppendPrintf(e, " Q1=%d", magic1);
469 }
470 
471 bool
472 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
473 {
474  if (strcmp(name, "Q2") != 0)
475  return false;
476 
477  int old_magic2 = magic2;
478 
479  magic2 = atoi(value);
480 
481  if (!isaReconfig)
482  return true;
483 
484  if (old_magic2 < magic2) {
485  /* See comments in Q1 function above */
486  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
487  magic2 = old_magic2;
488  return true;
489  }
490 
491  if (old_magic2 != magic2)
492  debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
493 
494  return true;
495 }
496 
497 void
499 {
500  storeAppendPrintf(e, " Q2=%d", magic2);
501 }
502 
503 /*
504  * Sync any pending data. We just sit around and read the queue
505  * until the data has finished writing.
506  */
507 void
509 {
510  static time_t lastmsg = 0;
511 
512  while (away > 0) {
513  if (squid_curtime > lastmsg) {
514  debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
515  lastmsg = squid_curtime;
516  }
517 
518  callback();
519  }
520 }
521 
522 /*
523  * Handle callbacks. If we have more than magic2 requests away, we block
524  * until the queue is below magic2. Otherwise, we simply return when we
525  * don't get a message.
526  */
527 
528 int
530 {
531  diomsg M;
532  int x;
533  int retval = 0;
534 
535  if (away >= magic2) {
537  retval = 1;
538  /* We might not have anything to do, but our queue
539  * is full.. */
540  }
541 
545  }
546 
547  while (1) {
548  x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
549 
550  if (x < 0)
551  break;
552  else if (x != diomsg::msg_snd_rcv_sz) {
553  debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
554  break;
555  }
556 
558  --away;
559  handle(&M);
560  retval = 1; /* Return that we've actually done some work */
561 
562  if (M.shm_offset > -1)
563  shm.put ((off_t) M.shm_offset);
564  }
565 
566  return retval;
567 }
568 
569 void
571 {
572  storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
573 }
574 
void fatal(const char *message)
Definition: fatal.cc:28
const char * xstrerr(int error)
Definition: xstrerror.cc:83
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71
#define DBG_CRITICAL
Definition: Stream.h:37
struct diskd_stats_t::@37 unlink
char * diskd
Definition: SquidConfig.h:208
void setLocalhost()
Definition: Address.cc:275
@ _MQD_WRITE
Definition: diomsg.h:24
void fd_note(int fd, const char *s)
Definition: fd.cc:211
#define SHMBUF_BLKSZ
#define CBIT_CLR(mask, bit)
Definition: defines.h:73
void unlinkFile(char const *) override
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
static size_t newInstance()
void unlinkDone(diomsg *M)
@ _MQD_CREATE
Definition: diomsg.h:21
void lock() const
Definition: Lock.h:34
int commSetNonBlocking(int fd)
Definition: comm.cc:1044
int callback() override
called once every main loop iteration; TODO: Move to UFS code.
Definition: Controller.cc:223
#define comm_close(x)
Definition: comm.h:36
C * getRaw() const
Definition: RefCount.h:89
long mtyp_t
Definition: types.h:141
char * xstrncpy(char *dst, const char *src, size_t n)
Definition: xstring.cc:37
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
#define cbdataReference(var)
Definition: cbdata.h:348
int SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
int status
Definition: diomsg.h:38
Lock * requestor
Definition: diomsg.h:35
SharedMemory shm
static void * hIpc
Definition: IcmpSquid.cc:34
static pid_t pid
Definition: IcmpSquid.cc:35
void init(int ikey, int magic2)
void optionQ2Dump(StoreEntry *e) const
std::vector< ConfigOption * > options
Definition: ConfigOption.h:74
ConfigOption * getOptionTree() const override
int size
Definition: ModDevPoll.cc:70
void commUnsetFdTimeout(int fd)
clear a timeout handler by FD number
Definition: comm.cc:581
static size_t nextInstanceID
void completed(diomsg *)
Definition: DiskdFile.cc:208
void init() override
@ _MQD_CLOSE
Definition: diomsg.h:22
int seq_no
Definition: diomsg.h:33
void * get(ssize_t *)
int callback() override
@ _MQD_UNLINK
Definition: diomsg.h:25
static const int msg_snd_rcv_sz
Definition: diomsg.h:41
void statfs(StoreEntry &sentry) const override
#define assert(EX)
Definition: assert.h:17
size_t size
Definition: diomsg.h:36
diskd_stats_t diskd_stats
#define CBIT_TEST(mask, bit)
Definition: defines.h:74
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
struct SquidConfig::@86 Program
int load() override
#define cbdataReferenceDone(var)
Definition: cbdata.h:357
@ _MQD_OPEN
Definition: diomsg.h:20
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:69
time_t squid_curtime
Definition: stub_libtime.cc:20
@ _MQD_READ
Definition: diomsg.h:23
void handle(diomsg *M)
bool optionQ2Parse(char const *option, const char *value, int reconfiguring)
int shm_offset
Definition: diomsg.h:40
void put(ssize_t)
int xselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
POSIX select(2) equivalent.
Definition: select.h:22
int send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
bool unlinkdUseful() const override
off_t offset
Definition: diomsg.h:37
void sync() override
void unlinkdUnlink(const char *path)
Definition: unlinkd.cc:41
struct StatCounters::@115::@119 disk
int id
Definition: diomsg.h:32
RefCount< DiskFile > newFile(char const *path) override
Definition: diomsg.h:30
#define DBG_IMPORTANT
Definition: Stream.h:38
bool shedLoad() override
#define MYNAME
Definition: Stream.h:219
struct StatCounters::@115 syscalls
#define CBIT_SET(mask, bit)
Definition: defines.h:72
bool newstyle
Definition: diomsg.h:39
void optionQ1Dump(StoreEntry *e) const
#define IPC_STREAM
Definition: defines.h:104
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void * callback_data
Definition: diomsg.h:34
bool optionQ1Parse(char const *option, const char *value, int reconfiguring)
mtyp_t mtype
Definition: diomsg.h:31
Definition: Lock.h:25
void QuickPollRequired(void)
Definition: ModDevPoll.cc:414
class SquidConfig Config
Definition: SquidConfig.cc:12
int unsigned int
Definition: stub_fd.cc:19
StatCounters statCounter
Definition: StatCounters.cc:12
Controller & Root()
safely access controller singleton
Definition: Controller.cc:926

 

Introduction

Documentation

Support

Miscellaneous