DiskdIOStrategy.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Squid-side DISKD I/O functions. */
10 
11 #include "squid.h"
12 #include "comm.h"
13 #include "comm/Loops.h"
14 #include "compat/select.h"
15 #include "ConfigOption.h"
16 #include "diomsg.h"
17 #include "DiskdFile.h"
18 #include "DiskdIOStrategy.h"
19 #include "DiskIO/DiskFile.h"
20 #include "fd.h"
21 #include "SquidConfig.h"
22 #include "SquidIpc.h"
23 #include "StatCounters.h"
24 #include "Store.h"
25 #include "unlinkd.h"
26 
27 #include <cerrno>
28 #if HAVE_SYS_IPC_H
29 #include <sys/ipc.h>
30 #endif
31 #if HAVE_SYS_MSG_H
32 #include <sys/msg.h>
33 #endif
34 #if HAVE_SYS_SHM_H
35 #include <sys/shm.h>
36 #endif
37 
39 
41 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
42 
43 size_t
45 {
46  return ++nextInstanceID;
47 }
48 
49 bool
51 {
52  /*
53  * Fail on open() if there are too many requests queued.
54  */
55 
56  if (away > magic1) {
57  debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
58 
59  return true;
60  }
61 
62  return false;
63 }
64 
65 int
67 {
68  /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
69  /* the parse function guarantees magic2 is positive */
70  return away * 1000 / magic2;
71 }
72 
73 void
75 {
77 }
78 
80 DiskdIOStrategy::newFile(char const *path)
81 {
82  if (shedLoad()) {
83  openFailed();
84  return nullptr;
85  }
86 
87  return new DiskdFile (path, this);
88 }
89 
90 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
91 {}
92 
93 bool
95 {
96  return true;
97 }
98 
99 void
101 {
102  if (shedLoad()) {
103  /* Damn, we need to issue a sync unlink here :( */
104  debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
105  unlinkdUnlink(path);
106  return;
107  }
108 
109  /* We can attempt a diskd unlink */
110  int x;
111 
112  ssize_t shm_offset;
113 
114  char *buf;
115 
116  buf = (char *)shm.get(&shm_offset);
117 
118  if (!buf) {
119  unlinkdUnlink(path);
120  return;
121  }
122 
123  xstrncpy(buf, path, SHMBUF_BLKSZ);
124 
125  x = send(_MQD_UNLINK,
126  0,
127  (StoreIOState::Pointer )nullptr,
128  0,
129  0,
130  shm_offset);
131 
132  if (x < 0) {
133  int xerrno = errno;
134  debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
135  ::unlink(buf); /* XXX EWW! */
136  // shm.put (shm_offset);
137  }
138 
140 }
141 
142 void
144 {
145  int pid;
146  void * hIpc;
147  int rfd;
148  int ikey;
149  const char *args[5];
150  char skey1[32];
151  char skey2[32];
152  char skey3[32];
153  Ip::Address localhost;
154 
155  ikey = (getpid() << 10) + (instanceID << 2);
156  ikey &= 0x7fffffff;
157  smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
158 
159  if (smsgid < 0) {
160  int xerrno = errno;
161  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
162  fatal("msgget failed");
163  }
164 
165  rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
166 
167  if (rmsgid < 0) {
168  int xerrno = errno;
169  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
170  fatal("msgget failed");
171  }
172 
173  shm.init(ikey, magic2);
174  snprintf(skey1, 32, "%d", ikey);
175  snprintf(skey2, 32, "%d", ikey + 1);
176  snprintf(skey3, 32, "%d", ikey + 2);
177  args[0] = "diskd";
178  args[1] = skey1;
179  args[2] = skey2;
180  args[3] = skey3;
181  args[4] = nullptr;
182  localhost.setLocalhost();
185  args,
186  "diskd",
187  localhost,
188  &rfd,
189  &wfd,
190  &hIpc);
191 
192  if (pid < 0)
193  fatalf("execl: %s", Config.Program.diskd);
194 
195  if (rfd != wfd)
196  comm_close(rfd);
197 
198  fd_note(wfd, "squid -> diskd");
199 
203 }
204 
205 /*
206  * SHM manipulation routines
207  */
208 void
209 SharedMemory::put(ssize_t offset)
210 {
211  int i;
212  assert(offset >= 0);
213  assert(offset < nbufs * SHMBUF_BLKSZ);
214  i = offset / SHMBUF_BLKSZ;
215  assert(i < nbufs);
217  CBIT_CLR(inuse_map, i);
219 }
220 
221 void *
222 
223 SharedMemory::get(ssize_t * shm_offset)
224 {
225  char *aBuf = nullptr;
226  int i;
227 
228  for (i = 0; i < nbufs; ++i) {
229  if (CBIT_TEST(inuse_map, i))
230  continue;
231 
232  CBIT_SET(inuse_map, i);
233 
234  *shm_offset = i * SHMBUF_BLKSZ;
235 
236  aBuf = buf + (*shm_offset);
237 
238  break;
239  }
240 
241  if (!aBuf) {
242  debugs(79, DBG_IMPORTANT, "ERROR: out of shared-memory buffers");
243  return nullptr;
244  }
245 
246  assert(aBuf);
247  assert(aBuf >= buf);
248  assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
250 
253 
254  return aBuf;
255 }
256 
257 void
258 SharedMemory::init(int ikey, int magic2)
259 {
260  nbufs = (int)(magic2 * 1.3);
261  id = shmget((key_t) (ikey + 2),
262  nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
263 
264  if (id < 0) {
265  int xerrno = errno;
266  debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
267  fatal("shmget failed");
268  }
269 
270  buf = (char *)shmat(id, nullptr, 0);
271 
272  if (buf == (void *) -1) {
273  int xerrno = errno;
274  debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
275  fatal("shmat failed");
276  }
277 
278  inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
280 
281  for (int i = 0; i < nbufs; ++i) {
282  CBIT_SET(inuse_map, i);
283  put (i * SHMBUF_BLKSZ);
284  }
285 }
286 
287 void
289 {
290  debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
291  ++statCounter.syscalls.disk.unlinks;
292 
293  if (M->status < 0)
295  else
297 }
298 
299 void
301 {
303  /* I.e. already closed file
304  * - say when we have a error opening after
305  * a read was already queued
306  */
307  debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
309  return;
310  }
311 
312  /* set errno passed from diskd. makes debugging more meaningful */
313  if (M->status < 0)
314  errno = -M->status;
315 
316  if (M->newstyle) {
317  DiskdFile *theFile = (DiskdFile *)M->callback_data;
318  theFile->unlock();
319  theFile->completed (M);
320  } else
321  switch (M->mtype) {
322 
323  case _MQD_OPEN:
324 
325  case _MQD_CREATE:
326 
327  case _MQD_CLOSE:
328 
329  case _MQD_READ:
330 
331  case _MQD_WRITE:
332  assert (0);
333  break;
334 
335  case _MQD_UNLINK:
336  unlinkDone(M);
337  break;
338 
339  default:
340  assert(0);
341  break;
342  }
343 
345 }
346 
347 int
348 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
349 {
350  diomsg M;
351  M.callback_data = cbdataReference(theFile);
352  theFile->lock();
353  M.requestor = requestor;
354  M.newstyle = true;
355 
356  if (requestor)
357  requestor->lock();
358 
359  return SEND(&M, mtype, id, size, offset, shm_offset);
360 }
361 
362 int
363 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
364 {
365  diomsg M;
367  M.newstyle = false;
368 
369  return SEND(&M, mtype, id, size, offset, shm_offset);
370 }
371 
372 int
373 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
374 {
375  static int send_errors = 0;
376  static int last_seq_no = 0;
377  static int seq_no = 0;
378  int x;
379 
380  M->mtype = mtype;
381  M->size = size;
382  M->offset = offset;
383  M->status = -1;
384  M->shm_offset = (int) shm_offset;
385  M->id = id;
386  M->seq_no = ++seq_no;
387 
388  if (M->seq_no < last_seq_no)
389  debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
390 
391  x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
392 
393  last_seq_no = M->seq_no;
394 
395  if (0 == x) {
397  ++away;
398  } else {
399  int xerrno = errno;
400  debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
402  ++send_errors;
403  assert(send_errors < 100);
404  if (shm_offset > -1)
405  shm.put(shm_offset);
406  }
407 
408  /*
409  * We have to drain the queue here if necessary. If we don't,
410  * then we can have a lot of messages in the queue (probably
411  * up to 2*magic1) and we can run out of shared memory buffers.
412  */
413  /*
414  * Note that we call Store::Root().callbackk (for all SDs), rather
415  * than callback for just this SD, so that while
416  * we're "blocking" on this SD we can also handle callbacks
417  * from other SDs that might be ready.
418  */
419 
420  struct timeval delay = {0, 1};
421 
422  while (away > magic2) {
423  xselect(0, nullptr, nullptr, nullptr, &delay);
424  Store::Root().callback();
425 
426  if (delay.tv_usec < 1000000)
427  delay.tv_usec <<= 1;
428  }
429 
430  return x;
431 }
432 
433 ConfigOption *
435 {
439  return result;
440 }
441 
442 bool
443 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
444 {
445  if (strcmp(name, "Q1") != 0)
446  return false;
447 
448  int old_magic1 = magic1;
449 
450  magic1 = atoi(value);
451 
452  if (!isaReconfig)
453  return true;
454 
455  if (old_magic1 < magic1) {
456  /*
457  * This is because shm.nbufs is computed at startup, when
458  * we call shmget(). We can't increase the Q1/Q2 parameters
459  * beyond their initial values because then we might have
460  * more "Q2 messages" than shared memory chunks, and this
461  * will cause an assertion in storeDiskdShmGet().
462  */
463  /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
464  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
465  magic1 = old_magic1;
466  return true;
467  }
468 
469  if (old_magic1 != magic1)
470  debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
471 
472  return true;
473 }
474 
475 void
477 {
478  storeAppendPrintf(e, " Q1=%d", magic1);
479 }
480 
481 bool
482 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
483 {
484  if (strcmp(name, "Q2") != 0)
485  return false;
486 
487  int old_magic2 = magic2;
488 
489  magic2 = atoi(value);
490 
491  if (!isaReconfig)
492  return true;
493 
494  if (old_magic2 < magic2) {
495  /* See comments in Q1 function above */
496  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
497  magic2 = old_magic2;
498  return true;
499  }
500 
501  if (old_magic2 != magic2)
502  debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
503 
504  return true;
505 }
506 
507 void
509 {
510  storeAppendPrintf(e, " Q2=%d", magic2);
511 }
512 
513 /*
514  * Sync any pending data. We just sit around and read the queue
515  * until the data has finished writing.
516  */
517 void
519 {
520  static time_t lastmsg = 0;
521 
522  while (away > 0) {
523  if (squid_curtime > lastmsg) {
524  debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
525  lastmsg = squid_curtime;
526  }
527 
528  callback();
529  }
530 }
531 
532 /*
533  * Handle callbacks. If we have more than magic2 requests away, we block
534  * until the queue is below magic2. Otherwise, we simply return when we
535  * don't get a message.
536  */
537 
538 int
540 {
541  diomsg M;
542  int x;
543  int retval = 0;
544 
545  if (away >= magic2) {
547  retval = 1;
548  /* We might not have anything to do, but our queue
549  * is full.. */
550  }
551 
555  }
556 
557  while (1) {
558  x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
559 
560  if (x < 0)
561  break;
562  else if (x != diomsg::msg_snd_rcv_sz) {
563  debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
564  break;
565  }
566 
568  --away;
569  handle(&M);
570  retval = 1; /* Return that we've actually done some work */
571 
572  if (M.shm_offset > -1)
573  shm.put ((off_t) M.shm_offset);
574  }
575 
576  return retval;
577 }
578 
579 void
581 {
582  storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
583 }
584 
void fatal(const char *message)
Definition: fatal.cc:28
const char * xstrerr(int error)
Definition: xstrerror.cc:83
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71
#define DBG_CRITICAL
Definition: Stream.h:37
char * diskd
Definition: SquidConfig.h:208
void setLocalhost()
Definition: Address.cc:275
void fd_note(int fd, const char *s)
Definition: fd.cc:211
#define SHMBUF_BLKSZ
#define CBIT_CLR(mask, bit)
Definition: defines.h:73
void unlinkFile(char const *) override
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
static size_t newInstance()
void unlinkDone(diomsg *M)
void lock() const
Definition: Lock.h:34
int commSetNonBlocking(int fd)
Definition: comm.cc:1044
int callback() override
called once every main loop iteration; TODO: Move to UFS code.
Definition: Controller.cc:223
#define comm_close(x)
Definition: comm.h:36
C * getRaw() const
Definition: RefCount.h:89
long mtyp_t
Definition: types.h:141
char * xstrncpy(char *dst, const char *src, size_t n)
Definition: xstring.cc:37
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
struct SquidConfig::@83 Program
@ _MQD_READ
Definition: diomsg.h:23
#define cbdataReference(var)
Definition: cbdata.h:348
struct diskd_stats_t::@34 unlink
int SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
int status
Definition: diomsg.h:38
Lock * requestor
Definition: diomsg.h:35
@ _MQD_CREATE
Definition: diomsg.h:21
SharedMemory shm
static void * hIpc
Definition: IcmpSquid.cc:34
static pid_t pid
Definition: IcmpSquid.cc:35
void init(int ikey, int magic2)
void optionQ2Dump(StoreEntry *e) const
std::vector< ConfigOption * > options
Definition: ConfigOption.h:74
@ _MQD_WRITE
Definition: diomsg.h:24
ConfigOption * getOptionTree() const override
int size
Definition: ModDevPoll.cc:70
void commUnsetFdTimeout(int fd)
clear a timeout handler by FD number
Definition: comm.cc:581
static size_t nextInstanceID
void completed(diomsg *)
Definition: DiskdFile.cc:227
void init() override
int seq_no
Definition: diomsg.h:33
void * get(ssize_t *)
int callback() override
static const int msg_snd_rcv_sz
Definition: diomsg.h:41
void statfs(StoreEntry &sentry) const override
#define assert(EX)
Definition: assert.h:17
size_t size
Definition: diomsg.h:36
diskd_stats_t diskd_stats
#define CBIT_TEST(mask, bit)
Definition: defines.h:74
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
@ _MQD_CLOSE
Definition: diomsg.h:22
int load() override
@ _MQD_UNLINK
Definition: diomsg.h:25
#define cbdataReferenceDone(var)
Definition: cbdata.h:357
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:63
time_t squid_curtime
Definition: stub_libtime.cc:20
void handle(diomsg *M)
bool optionQ2Parse(char const *option, const char *value, int reconfiguring)
int shm_offset
Definition: diomsg.h:40
void put(ssize_t)
int xselect(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout)
POSIX select(2) equivalent.
Definition: select.h:22
int send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
bool unlinkdUseful() const override
off_t offset
Definition: diomsg.h:37
void sync() override
void unlinkdUnlink(const char *path)
Definition: unlinkd.cc:41
struct StatCounters::@112::@116 disk
int id
Definition: diomsg.h:32
RefCount< DiskFile > newFile(char const *path) override
Definition: diomsg.h:30
#define DBG_IMPORTANT
Definition: Stream.h:38
bool shedLoad() override
#define MYNAME
Definition: Stream.h:219
@ _MQD_OPEN
Definition: diomsg.h:20
#define CBIT_SET(mask, bit)
Definition: defines.h:72
bool newstyle
Definition: diomsg.h:39
void optionQ1Dump(StoreEntry *e) const
#define IPC_STREAM
Definition: defines.h:104
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void * callback_data
Definition: diomsg.h:34
bool optionQ1Parse(char const *option, const char *value, int reconfiguring)
mtyp_t mtype
Definition: diomsg.h:31
Definition: Lock.h:25
void QuickPollRequired(void)
Definition: ModDevPoll.cc:414
class SquidConfig Config
Definition: SquidConfig.cc:12
struct StatCounters::@112 syscalls
int unsigned int
Definition: stub_fd.cc:19
StatCounters statCounter
Definition: StatCounters.cc:12
Controller & Root()
safely access controller singleton
Definition: Controller.cc:926

 

Introduction

Documentation

Support

Miscellaneous