DiskdIOStrategy.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Squid-side DISKD I/O functions. */
10 
11 #include "squid.h"
12 #include "comm.h"
13 #include "comm/Loops.h"
14 #include "ConfigOption.h"
15 #include "diomsg.h"
16 #include "DiskdFile.h"
17 #include "DiskdIOStrategy.h"
18 #include "DiskIO/DiskFile.h"
19 #include "fd.h"
20 #include "SquidConfig.h"
21 #include "SquidIpc.h"
22 #include "StatCounters.h"
23 #include "Store.h"
24 #include "unlinkd.h"
25 
26 #include <cerrno>
27 #if HAVE_SYS_IPC_H
28 #include <sys/ipc.h>
29 #endif
30 #if HAVE_SYS_MSG_H
31 #include <sys/msg.h>
32 #endif
33 #if HAVE_SYS_SHM_H
34 #include <sys/shm.h>
35 #endif
36 
38 
40 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
41 
42 size_t
44 {
45  return ++nextInstanceID;
46 }
47 
48 bool
50 {
51  /*
52  * Fail on open() if there are too many requests queued.
53  */
54 
55  if (away > magic1) {
56  debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
57 
58  return true;
59  }
60 
61  return false;
62 }
63 
64 int
66 {
67  /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
68  /* the parse function guarantees magic2 is positivie */
69  return away * 1000 / magic2;
70 }
71 
72 void
74 {
76 }
77 
79 DiskdIOStrategy::newFile(char const *path)
80 {
81  if (shedLoad()) {
82  openFailed();
83  return nullptr;
84  }
85 
86  return new DiskdFile (path, this);
87 }
88 
89 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
90 {}
91 
92 bool
94 {
95  return true;
96 }
97 
98 void
99 DiskdIOStrategy::unlinkFile(char const *path)
100 {
101  if (shedLoad()) {
102  /* Damn, we need to issue a sync unlink here :( */
103  debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
104  unlinkdUnlink(path);
105  return;
106  }
107 
108  /* We can attempt a diskd unlink */
109  int x;
110 
111  ssize_t shm_offset;
112 
113  char *buf;
114 
115  buf = (char *)shm.get(&shm_offset);
116 
117  xstrncpy(buf, path, SHMBUF_BLKSZ);
118 
119  x = send(_MQD_UNLINK,
120  0,
121  (StoreIOState::Pointer )nullptr,
122  0,
123  0,
124  shm_offset);
125 
126  if (x < 0) {
127  int xerrno = errno;
128  debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
129  ::unlink(buf); /* XXX EWW! */
130  // shm.put (shm_offset);
131  }
132 
134 }
135 
136 void
138 {
139  int pid;
140  void * hIpc;
141  int rfd;
142  int ikey;
143  const char *args[5];
144  char skey1[32];
145  char skey2[32];
146  char skey3[32];
147  Ip::Address localhost;
148 
149  ikey = (getpid() << 10) + (instanceID << 2);
150  ikey &= 0x7fffffff;
151  smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
152 
153  if (smsgid < 0) {
154  int xerrno = errno;
155  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
156  fatal("msgget failed");
157  }
158 
159  rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
160 
161  if (rmsgid < 0) {
162  int xerrno = errno;
163  debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
164  fatal("msgget failed");
165  }
166 
167  shm.init(ikey, magic2);
168  snprintf(skey1, 32, "%d", ikey);
169  snprintf(skey2, 32, "%d", ikey + 1);
170  snprintf(skey3, 32, "%d", ikey + 2);
171  args[0] = "diskd";
172  args[1] = skey1;
173  args[2] = skey2;
174  args[3] = skey3;
175  args[4] = nullptr;
176  localhost.setLocalhost();
179  args,
180  "diskd",
181  localhost,
182  &rfd,
183  &wfd,
184  &hIpc);
185 
186  if (pid < 0)
187  fatalf("execl: %s", Config.Program.diskd);
188 
189  if (rfd != wfd)
190  comm_close(rfd);
191 
192  fd_note(wfd, "squid -> diskd");
193 
197 }
198 
199 /*
200  * SHM manipulation routines
201  */
202 void
203 SharedMemory::put(ssize_t offset)
204 {
205  int i;
206  assert(offset >= 0);
207  assert(offset < nbufs * SHMBUF_BLKSZ);
208  i = offset / SHMBUF_BLKSZ;
209  assert(i < nbufs);
211  CBIT_CLR(inuse_map, i);
213 }
214 
215 void *
216 
217 SharedMemory::get(ssize_t * shm_offset)
218 {
219  char *aBuf = nullptr;
220  int i;
221 
222  for (i = 0; i < nbufs; ++i) {
223  if (CBIT_TEST(inuse_map, i))
224  continue;
225 
226  CBIT_SET(inuse_map, i);
227 
228  *shm_offset = i * SHMBUF_BLKSZ;
229 
230  aBuf = buf + (*shm_offset);
231 
232  break;
233  }
234 
235  assert(aBuf);
236  assert(aBuf >= buf);
237  assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
239 
242 
243  return aBuf;
244 }
245 
246 void
247 SharedMemory::init(int ikey, int magic2)
248 {
249  nbufs = (int)(magic2 * 1.3);
250  id = shmget((key_t) (ikey + 2),
251  nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
252 
253  if (id < 0) {
254  int xerrno = errno;
255  debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
256  fatal("shmget failed");
257  }
258 
259  buf = (char *)shmat(id, nullptr, 0);
260 
261  if (buf == (void *) -1) {
262  int xerrno = errno;
263  debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
264  fatal("shmat failed");
265  }
266 
267  inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
269 
270  for (int i = 0; i < nbufs; ++i) {
271  CBIT_SET(inuse_map, i);
272  put (i * SHMBUF_BLKSZ);
273  }
274 }
275 
276 void
278 {
279  debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
280  ++statCounter.syscalls.disk.unlinks;
281 
282  if (M->status < 0)
284  else
286 }
287 
288 void
290 {
292  /* I.e. already closed file
293  * - say when we have a error opening after
294  * a read was already queued
295  */
296  debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
298  return;
299  }
300 
301  /* set errno passed from diskd. makes debugging more meaningful */
302  if (M->status < 0)
303  errno = -M->status;
304 
305  if (M->newstyle) {
306  DiskdFile *theFile = (DiskdFile *)M->callback_data;
307  theFile->unlock();
308  theFile->completed (M);
309  } else
310  switch (M->mtype) {
311 
312  case _MQD_OPEN:
313 
314  case _MQD_CREATE:
315 
316  case _MQD_CLOSE:
317 
318  case _MQD_READ:
319 
320  case _MQD_WRITE:
321  assert (0);
322  break;
323 
324  case _MQD_UNLINK:
325  unlinkDone(M);
326  break;
327 
328  default:
329  assert(0);
330  break;
331  }
332 
334 }
335 
336 int
337 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
338 {
339  diomsg M;
340  M.callback_data = cbdataReference(theFile);
341  theFile->lock();
342  M.requestor = requestor;
343  M.newstyle = true;
344 
345  if (requestor)
346  requestor->lock();
347 
348  return SEND(&M, mtype, id, size, offset, shm_offset);
349 }
350 
351 int
352 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
353 {
354  diomsg M;
356  M.newstyle = false;
357 
358  return SEND(&M, mtype, id, size, offset, shm_offset);
359 }
360 
361 int
362 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
363 {
364  static int send_errors = 0;
365  static int last_seq_no = 0;
366  static int seq_no = 0;
367  int x;
368 
369  M->mtype = mtype;
370  M->size = size;
371  M->offset = offset;
372  M->status = -1;
373  M->shm_offset = (int) shm_offset;
374  M->id = id;
375  M->seq_no = ++seq_no;
376 
377  if (M->seq_no < last_seq_no)
378  debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
379 
380  x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
381 
382  last_seq_no = M->seq_no;
383 
384  if (0 == x) {
386  ++away;
387  } else {
388  int xerrno = errno;
389  debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
391  ++send_errors;
392  assert(send_errors < 100);
393  if (shm_offset > -1)
394  shm.put(shm_offset);
395  }
396 
397  /*
398  * We have to drain the queue here if necessary. If we don't,
399  * then we can have a lot of messages in the queue (probably
400  * up to 2*magic1) and we can run out of shared memory buffers.
401  */
402  /*
403  * Note that we call Store::Root().callbackk (for all SDs), rather
404  * than callback for just this SD, so that while
405  * we're "blocking" on this SD we can also handle callbacks
406  * from other SDs that might be ready.
407  */
408 
409  struct timeval delay = {0, 1};
410 
411  while (away > magic2) {
412  select(0, nullptr, nullptr, nullptr, &delay);
413  Store::Root().callback();
414 
415  if (delay.tv_usec < 1000000)
416  delay.tv_usec <<= 1;
417  }
418 
419  return x;
420 }
421 
422 ConfigOption *
424 {
428  return result;
429 }
430 
431 bool
432 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
433 {
434  if (strcmp(name, "Q1") != 0)
435  return false;
436 
437  int old_magic1 = magic1;
438 
439  magic1 = atoi(value);
440 
441  if (!isaReconfig)
442  return true;
443 
444  if (old_magic1 < magic1) {
445  /*
446  * This is because shm.nbufs is computed at startup, when
447  * we call shmget(). We can't increase the Q1/Q2 parameters
448  * beyond their initial values because then we might have
449  * more "Q2 messages" than shared memory chunks, and this
450  * will cause an assertion in storeDiskdShmGet().
451  */
452  /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
453  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
454  magic1 = old_magic1;
455  return true;
456  }
457 
458  if (old_magic1 != magic1)
459  debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
460 
461  return true;
462 }
463 
464 void
466 {
467  storeAppendPrintf(e, " Q1=%d", magic1);
468 }
469 
470 bool
471 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
472 {
473  if (strcmp(name, "Q2") != 0)
474  return false;
475 
476  int old_magic2 = magic2;
477 
478  magic2 = atoi(value);
479 
480  if (!isaReconfig)
481  return true;
482 
483  if (old_magic2 < magic2) {
484  /* See comments in Q1 function above */
485  debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
486  magic2 = old_magic2;
487  return true;
488  }
489 
490  if (old_magic2 != magic2)
491  debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
492 
493  return true;
494 }
495 
496 void
498 {
499  storeAppendPrintf(e, " Q2=%d", magic2);
500 }
501 
502 /*
503  * Sync any pending data. We just sit around and read the queue
504  * until the data has finished writing.
505  */
506 void
508 {
509  static time_t lastmsg = 0;
510 
511  while (away > 0) {
512  if (squid_curtime > lastmsg) {
513  debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
514  lastmsg = squid_curtime;
515  }
516 
517  callback();
518  }
519 }
520 
521 /*
522  * Handle callbacks. If we have more than magic2 requests away, we block
523  * until the queue is below magic2. Otherwise, we simply return when we
524  * don't get a message.
525  */
526 
527 int
529 {
530  diomsg M;
531  int x;
532  int retval = 0;
533 
534  if (away >= magic2) {
536  retval = 1;
537  /* We might not have anything to do, but our queue
538  * is full.. */
539  }
540 
544  }
545 
546  while (1) {
547  x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
548 
549  if (x < 0)
550  break;
551  else if (x != diomsg::msg_snd_rcv_sz) {
552  debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
553  break;
554  }
555 
557  --away;
558  handle(&M);
559  retval = 1; /* Return that we've actually done some work */
560 
561  if (M.shm_offset > -1)
562  shm.put ((off_t) M.shm_offset);
563  }
564 
565  return retval;
566 }
567 
568 void
570 {
571  storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
572 }
573 
void fatal(const char *message)
Definition: fatal.cc:28
const char * xstrerr(int error)
Definition: xstrerror.cc:83
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71
#define DBG_CRITICAL
Definition: Stream.h:37
@ _MQD_CLOSE
Definition: diomsg.h:22
char * diskd
Definition: SquidConfig.h:208
void setLocalhost()
Definition: Address.cc:275
void fd_note(int fd, const char *s)
Definition: fd.cc:216
#define SHMBUF_BLKSZ
#define CBIT_CLR(mask, bit)
Definition: defines.h:73
void unlinkFile(char const *) override
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
struct SquidConfig::@90 Program
static size_t newInstance()
void unlinkDone(diomsg *M)
void lock() const
Definition: Lock.h:34
struct StatCounters::@119::@123 disk
int commSetNonBlocking(int fd)
Definition: comm.cc:1054
int callback() override
called once every main loop iteration; TODO: Move to UFS code.
Definition: Controller.cc:223
@ _MQD_OPEN
Definition: diomsg.h:20
#define comm_close(x)
Definition: comm.h:36
struct diskd_stats_t::@41 unlink
C * getRaw() const
Definition: RefCount.h:89
long mtyp_t
Definition: types.h:141
char * xstrncpy(char *dst, const char *src, size_t n)
Definition: xstring.cc:37
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
#define cbdataReference(var)
Definition: cbdata.h:348
int SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
int status
Definition: diomsg.h:38
Lock * requestor
Definition: diomsg.h:35
SharedMemory shm
static void * hIpc
Definition: IcmpSquid.cc:33
static pid_t pid
Definition: IcmpSquid.cc:34
@ _MQD_UNLINK
Definition: diomsg.h:25
struct StatCounters::@119 syscalls
void init(int ikey, int magic2)
void optionQ2Dump(StoreEntry *e) const
std::vector< ConfigOption * > options
Definition: ConfigOption.h:74
ConfigOption * getOptionTree() const override
int size
Definition: ModDevPoll.cc:69
void commUnsetFdTimeout(int fd)
clear a timeout handler by FD number
Definition: comm.cc:579
static size_t nextInstanceID
void completed(diomsg *)
Definition: DiskdFile.cc:208
void init() override
int seq_no
Definition: diomsg.h:33
@ _MQD_READ
Definition: diomsg.h:23
void * get(ssize_t *)
int callback() override
static const int msg_snd_rcv_sz
Definition: diomsg.h:41
void statfs(StoreEntry &sentry) const override
#define assert(EX)
Definition: assert.h:17
size_t size
Definition: diomsg.h:36
diskd_stats_t diskd_stats
#define CBIT_TEST(mask, bit)
Definition: defines.h:74
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
int load() override
#define cbdataReferenceDone(var)
Definition: cbdata.h:357
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:70
time_t squid_curtime
Definition: stub_libtime.cc:20
void handle(diomsg *M)
bool optionQ2Parse(char const *option, const char *value, int reconfiguring)
int shm_offset
Definition: diomsg.h:40
void put(ssize_t)
int send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
bool unlinkdUseful() const override
off_t offset
Definition: diomsg.h:37
@ _MQD_CREATE
Definition: diomsg.h:21
void sync() override
void unlinkdUnlink(const char *path)
Definition: unlinkd.cc:39
int id
Definition: diomsg.h:32
RefCount< DiskFile > newFile(char const *path) override
Definition: diomsg.h:30
#define DBG_IMPORTANT
Definition: Stream.h:38
bool shedLoad() override
#define MYNAME
Definition: Stream.h:219
#define CBIT_SET(mask, bit)
Definition: defines.h:72
@ _MQD_WRITE
Definition: diomsg.h:24
bool newstyle
Definition: diomsg.h:39
void optionQ1Dump(StoreEntry *e) const
#define IPC_STREAM
Definition: defines.h:104
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void * callback_data
Definition: diomsg.h:34
bool optionQ1Parse(char const *option, const char *value, int reconfiguring)
mtyp_t mtype
Definition: diomsg.h:31
Definition: Lock.h:25
void QuickPollRequired(void)
Definition: ModDevPoll.cc:414
class SquidConfig Config
Definition: SquidConfig.cc:12
int unsigned int
Definition: stub_fd.cc:19
StatCounters statCounter
Definition: StatCounters.cc:12
Controller & Root()
safely access controller singleton
Definition: Controller.cc:926

 

Introduction

Documentation

Support

Miscellaneous