aiops.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 AIOPS */
10 
11 #ifndef _REENTRANT
12 #error "_REENTRANT MUST be defined to build squid async io support."
13 #endif
14 
15 #include "squid.h"
16 #include "compat/socket.h"
17 #include "compat/unistd.h"
19 #include "DiskThreads.h"
20 #include "SquidConfig.h"
21 #include "Store.h"
22 
23 /*
24  * struct stat and squidaio_xstrdup use explicit pool alloc()/freeOne().
25  * XXX: convert to MEMPROXY_CLASS() API
26  */
27 #include "mem/Allocator.h"
28 #include "mem/Pool.h"
29 
30 #include <cerrno>
31 #include <csignal>
32 #include <sys/stat.h>
33 #include <fcntl.h>
34 #include <pthread.h>
35 #include <dirent.h>
36 #if HAVE_SCHED_H
37 #include <sched.h>
38 #endif
39 
40 #define RIDICULOUS_LENGTH 4096
41 
48 };
50 
51 typedef struct squidaio_request_t {
52 
55  int cancelled;
56  char *path;
57  int oflag;
59  int fd;
60  char *bufferp;
61  size_t buflen;
62  off_t offset;
63  int whence;
64  int ret;
65  int err;
66 
67  struct stat *tmpstatp;
68 
69  struct stat *statp;
72 
73 typedef struct squidaio_request_queue_t {
74  pthread_mutex_t mutex;
75  pthread_cond_t cond;
77  squidaio_request_t *volatile *volatile tailp;
78  unsigned long requests;
79  unsigned long blocked; /* main failed to lock the queue */
81 
83 
86  pthread_t thread;
88 
90  unsigned long requests;
91 };
92 
95 void *squidaio_thread_loop(void *);
100 static void squidaio_do_stat(squidaio_request_t *);
102 #if AIO_OPENDIR
103 static void *squidaio_do_opendir(squidaio_request_t *);
104 #endif
105 static void squidaio_debug(squidaio_request_t *);
106 static void squidaio_poll_queues(void);
107 
108 static squidaio_thread_t *threads = nullptr;
109 static int squidaio_initialised = 0;
110 
111 #define AIO_LARGE_BUFS 16384
112 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
113 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
114 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
115 #define AIO_MICRO_BUFS 128
116 
117 static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
118 static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
119 static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
120 static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
121 static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
122 
123 static size_t request_queue_len = 0;
127 
128 static struct {
130 }
131 
132 request_queue2 = {
133 
134  nullptr, &request_queue2.head
135 };
137 
138 static struct {
140 }
141 
142 done_requests = {
143 
144  nullptr, &done_requests.head
145 };
146 static pthread_attr_t globattr;
147 #if HAVE_SCHED_H
148 
149 static struct sched_param globsched;
150 #endif
151 static pthread_t main_thread;
152 
153 static Mem::Allocator *
155 {
156  if (size <= AIO_LARGE_BUFS) {
157  if (size <= AIO_MICRO_BUFS)
158  return squidaio_micro_bufs;
159  else if (size <= AIO_TINY_BUFS)
160  return squidaio_tiny_bufs;
161  else if (size <= AIO_SMALL_BUFS)
162  return squidaio_small_bufs;
163  else if (size <= AIO_MEDIUM_BUFS)
164  return squidaio_medium_bufs;
165  else
166  return squidaio_large_bufs;
167  }
168 
169  return nullptr;
170 }
171 
172 void *
174 {
175  void *p;
176 
177  if (const auto pool = squidaio_get_pool(size)) {
178  p = pool->alloc();
179  } else
180  p = xmalloc(size);
181 
182  return p;
183 }
184 
185 static char *
186 squidaio_xstrdup(const char *str)
187 {
188  char *p;
189  int len = strlen(str) + 1;
190 
191  p = (char *)squidaio_xmalloc(len);
192  strncpy(p, str, len);
193 
194  return p;
195 }
196 
197 void
198 squidaio_xfree(void *p, int size)
199 {
200  if (const auto pool = squidaio_get_pool(size)) {
201  pool->freeOne(p);
202  } else
203  xfree(p);
204 }
205 
206 static void
208 {
209  int len = strlen(str) + 1;
210 
211  if (const auto pool = squidaio_get_pool(len)) {
212  pool->freeOne(str);
213  } else
214  xfree(str);
215 }
216 
217 void
219 {
220  squidaio_thread_t *threadp;
221 
223  return;
224 
225  pthread_attr_init(&globattr);
226 
227 #if HAVE_PTHREAD_ATTR_SETSCOPE
228 
229  pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
230 
231 #endif
232 #if HAVE_SCHED_H
233 
234  globsched.sched_priority = 1;
235 
236 #endif
237 
238  main_thread = pthread_self();
239 
240 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
241 
242  pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
243 
244 #endif
245 #if HAVE_SCHED_H
246 
247  globsched.sched_priority = 2;
248 
249 #endif
250 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
251 
252  pthread_attr_setschedparam(&globattr, &globsched);
253 
254 #endif
255 
256  /* Give each thread a smaller 256KB stack, should be more than sufficient */
257  pthread_attr_setstacksize(&globattr, 256 * 1024);
258 
259  /* Initialize request queue */
260  if (pthread_mutex_init(&(request_queue.mutex), nullptr))
261  fatal("Failed to create mutex");
262 
263  if (pthread_cond_init(&(request_queue.cond), nullptr))
264  fatal("Failed to create condition variable");
265 
266  request_queue.head = nullptr;
267 
269 
271 
273 
274  /* Initialize done queue */
275  if (pthread_mutex_init(&(done_queue.mutex), nullptr))
276  fatal("Failed to create mutex");
277 
278  if (pthread_cond_init(&(done_queue.cond), nullptr))
279  fatal("Failed to create condition variable");
280 
281  done_queue.head = nullptr;
282 
284 
285  done_queue.requests = 0;
286 
287  done_queue.blocked = 0;
288 
289  // Initialize the thread I/O pipes before creating any threads
290  // see bug 3189 comment 5 about race conditions.
292 
293  /* Create threads and get them to sit in their wait loop */
294  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
295 
296  assert(NUMTHREADS != 0);
297 
298  for (size_t i = 0; i < NUMTHREADS; ++i) {
300  threadp->status = _THREAD_STARTING;
301  threadp->current_req = nullptr;
302  threadp->requests = 0;
303  threadp->next = threads;
304  threads = threadp;
305 
306  if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
307  fprintf(stderr, "Thread creation failed\n");
308  threadp->status = _THREAD_FAILED;
309  continue;
310  }
311  }
312 
313  /* Create request pool */
314  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
315 
316  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
317 
318  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
319 
320  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
321 
322  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
323 
324  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
325 
327 }
328 
329 void
331 {
333  return;
334 
335  /* This is the same as in squidaio_sync */
336  do {
338  } while (request_queue_len > 0);
339 
341 
343 }
344 
345 void *
347 {
348  squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
349  squidaio_request_t *request;
350  sigset_t newSig;
351 
352  /*
353  * Make sure to ignore signals which may possibly get sent to
354  * the parent squid thread. Causes havoc with mutex's and
355  * condition waits otherwise
356  */
357 
358  sigemptyset(&newSig);
359  sigaddset(&newSig, SIGPIPE);
360  sigaddset(&newSig, SIGCHLD);
361 #if defined(_SQUID_LINUX_THREADS_)
362 
363  sigaddset(&newSig, SIGQUIT);
364  sigaddset(&newSig, SIGTRAP);
365 #else
366 
367  sigaddset(&newSig, SIGUSR1);
368  sigaddset(&newSig, SIGUSR2);
369 #endif
370 
371  sigaddset(&newSig, SIGHUP);
372  sigaddset(&newSig, SIGTERM);
373  sigaddset(&newSig, SIGINT);
374  sigaddset(&newSig, SIGALRM);
375  pthread_sigmask(SIG_BLOCK, &newSig, nullptr);
376 
377  while (1) {
378  threadp->current_req = request = nullptr;
379  request = nullptr;
380  /* Get a request to process */
381  threadp->status = _THREAD_WAITING;
382  pthread_mutex_lock(&request_queue.mutex);
383 
384  while (!request_queue.head) {
385  pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
386  }
387 
388  request = request_queue.head;
389 
390  if (request)
391  request_queue.head = request->next;
392 
393  if (!request_queue.head)
395 
396  pthread_mutex_unlock(&request_queue.mutex);
397 
398  /* process the request */
399  threadp->status = _THREAD_BUSY;
400 
401  request->next = nullptr;
402 
403  threadp->current_req = request;
404 
405  errno = 0;
406 
407  if (!request->cancelled) {
408  switch (request->request_type) {
409 
410  case _AIO_OP_OPEN:
411  squidaio_do_open(request);
412  break;
413 
414  case _AIO_OP_READ:
415  squidaio_do_read(request);
416  break;
417 
418  case _AIO_OP_WRITE:
419  squidaio_do_write(request);
420  break;
421 
422  case _AIO_OP_CLOSE:
423  squidaio_do_close(request);
424  break;
425 
426  case _AIO_OP_UNLINK:
427  squidaio_do_unlink(request);
428  break;
429 
430 #if AIO_OPENDIR /* Opendir not implemented yet */
431 
432  case _AIO_OP_OPENDIR:
433  squidaio_do_opendir(request);
434  break;
435 #endif
436 
437  case _AIO_OP_STAT:
438  squidaio_do_stat(request);
439  break;
440 
441  default:
442  request->ret = -1;
443  request->err = EINVAL;
444  break;
445  }
446  } else { /* cancelled */
447  request->ret = -1;
448  request->err = EINTR;
449  }
450 
451  threadp->status = _THREAD_DONE;
452  /* put the request in the done queue */
453  pthread_mutex_lock(&done_queue.mutex);
454  *done_queue.tailp = request;
455  done_queue.tailp = &request->next;
456  pthread_mutex_unlock(&done_queue.mutex);
458  ++ threadp->requests;
459  } /* while forever */
460 
461  return nullptr;
462 } /* squidaio_thread_loop */
463 
464 static void
466 {
467  static int high_start = 0;
468  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
469  /* Mark it as not executed (failing result, no error) */
470  request->ret = -1;
471  request->err = 0;
472  /* Internal housekeeping */
473  request_queue_len += 1;
474  request->resultp->_data = request;
475  /* Play some tricks with the request_queue2 queue */
476  request->next = nullptr;
477 
478  if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
479  if (request_queue2.head) {
480  /* Grab blocked requests */
483  }
484 
485  /* Enqueue request */
486  *request_queue.tailp = request;
487 
488  request_queue.tailp = &request->next;
489 
490  pthread_cond_signal(&request_queue.cond);
491 
492  pthread_mutex_unlock(&request_queue.mutex);
493 
494  if (request_queue2.head) {
495  /* Clear queue of blocked requests */
496  request_queue2.head = nullptr;
497  request_queue2.tailp = &request_queue2.head;
498  }
499  } else {
500  /* Oops, the request queue is blocked, use request_queue2 */
501  *request_queue2.tailp = request;
502  request_queue2.tailp = &request->next;
503  }
504 
505  if (request_queue2.head) {
506  static uint64_t filter = 0;
507  static uint64_t filter_limit = 8192;
508 
509  if (++filter >= filter_limit) {
510  filter_limit += filter;
511  filter = 0;
512  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
513  }
514  }
515 
516  /* Warn if out of threads */
517  if (request_queue_len > MAGIC1) {
518  static int last_warn = 0;
519  static size_t queue_high, queue_low;
520 
521  if (high_start == 0) {
522  high_start = squid_curtime;
523  queue_high = request_queue_len;
524  queue_low = request_queue_len;
525  }
526 
527  if (request_queue_len > queue_high)
528  queue_high = request_queue_len;
529 
530  if (request_queue_len < queue_low)
531  queue_low = request_queue_len;
532 
533  if (squid_curtime >= (last_warn + 15) &&
534  squid_curtime >= (high_start + 5)) {
535  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
536 
537  if (squid_curtime >= (high_start + 15))
538  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
539  request_queue_len << ", high=" << queue_high <<
540  ", low=" << queue_low << ", duration=" <<
541  (long int) (squid_curtime - high_start));
542 
543  last_warn = squid_curtime;
544  }
545  } else {
546  high_start = 0;
547  }
548 
549  /* Warn if seriously overloaded */
551  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
552  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
553  squidaio_sync();
554  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
555  }
556 } /* squidaio_queue_request */
557 
558 static void
560 {
561  squidaio_result_t *resultp = requestp->resultp;
562  int cancelled = requestp->cancelled;
563 
564  /* Free allocated structures and copy data back to user space if the */
565  /* request hasn't been cancelled */
566 
567  switch (requestp->request_type) {
568 
569  case _AIO_OP_STAT:
570 
571  if (!cancelled && requestp->ret == 0)
572  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
573 
574  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
575 
576  squidaio_xstrfree(requestp->path);
577 
578  break;
579 
580  case _AIO_OP_OPEN:
581  if (cancelled && requestp->ret >= 0)
582  /* The open() was cancelled but completed */
583  xclose(requestp->ret);
584 
585  squidaio_xstrfree(requestp->path);
586 
587  break;
588 
589  case _AIO_OP_CLOSE:
590  if (cancelled && requestp->ret < 0)
591  /* The close() was cancelled and never got executed */
592  xclose(requestp->fd);
593 
594  break;
595 
596  case _AIO_OP_UNLINK:
597 
598  case _AIO_OP_OPENDIR:
599  squidaio_xstrfree(requestp->path);
600 
601  break;
602 
603  case _AIO_OP_READ:
604  break;
605 
606  case _AIO_OP_WRITE:
607  break;
608 
609  default:
610  break;
611  }
612 
613  if (resultp != nullptr && !cancelled) {
614  resultp->aio_return = requestp->ret;
615  resultp->aio_errno = requestp->err;
616  }
617 
618  squidaio_request_pool->freeOne(requestp);
619 } /* squidaio_cleanup_request */
620 
621 int
623 {
624  squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
625 
626  if (request && request->resultp == resultp) {
627  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
628  request->cancelled = 1;
629  request->resultp = nullptr;
630  resultp->_data = nullptr;
631  resultp->result_type = _AIO_OP_NONE;
632  return 0;
633  }
634 
635  return 1;
636 } /* squidaio_cancel */
637 
638 int
639 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
640 {
641  squidaio_init();
642  squidaio_request_t *requestp;
643 
645 
646  requestp->path = (char *) squidaio_xstrdup(path);
647 
648  requestp->oflag = oflag;
649 
650  requestp->mode = mode;
651 
652  requestp->resultp = resultp;
653 
654  requestp->request_type = _AIO_OP_OPEN;
655 
656  requestp->cancelled = 0;
657 
658  resultp->result_type = _AIO_OP_OPEN;
659 
660  squidaio_queue_request(requestp);
661 
662  return 0;
663 }
664 
665 static void
667 {
668  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
669  requestp->err = errno;
670 }
671 
672 int
673 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
674 {
675  squidaio_request_t *requestp;
676 
678 
679  requestp->fd = fd;
680 
681  requestp->bufferp = bufp;
682 
683  requestp->buflen = bufs;
684 
685  requestp->offset = offset;
686 
687  requestp->whence = whence;
688 
689  requestp->resultp = resultp;
690 
691  requestp->request_type = _AIO_OP_READ;
692 
693  requestp->cancelled = 0;
694 
695  resultp->result_type = _AIO_OP_READ;
696 
697  squidaio_queue_request(requestp);
698 
699  return 0;
700 }
701 
702 static void
704 {
705  if (lseek(requestp->fd, requestp->offset, requestp->whence) >= 0)
706  requestp->ret = xread(requestp->fd, requestp->bufferp, requestp->buflen);
707  else
708  requestp->ret = -1;
709  requestp->err = errno;
710 }
711 
712 int
713 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
714 {
715  squidaio_request_t *requestp;
716 
718 
719  requestp->fd = fd;
720 
721  requestp->bufferp = bufp;
722 
723  requestp->buflen = bufs;
724 
725  requestp->offset = offset;
726 
727  requestp->whence = whence;
728 
729  requestp->resultp = resultp;
730 
731  requestp->request_type = _AIO_OP_WRITE;
732 
733  requestp->cancelled = 0;
734 
735  resultp->result_type = _AIO_OP_WRITE;
736 
737  squidaio_queue_request(requestp);
738 
739  return 0;
740 }
741 
742 static void
744 {
745  requestp->ret = xwrite(requestp->fd, requestp->bufferp, requestp->buflen);
746  requestp->err = errno;
747 }
748 
749 int
751 {
752  squidaio_request_t *requestp;
753 
755 
756  requestp->fd = fd;
757 
758  requestp->resultp = resultp;
759 
760  requestp->request_type = _AIO_OP_CLOSE;
761 
762  requestp->cancelled = 0;
763 
764  resultp->result_type = _AIO_OP_CLOSE;
765 
766  squidaio_queue_request(requestp);
767 
768  return 0;
769 }
770 
771 static void
773 {
774  requestp->ret = xclose(requestp->fd);
775  requestp->err = errno;
776 }
777 
778 int
779 
780 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
781 {
782  squidaio_init();
783  squidaio_request_t *requestp;
784 
786 
787  requestp->path = (char *) squidaio_xstrdup(path);
788 
789  requestp->statp = sb;
790 
791  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
792 
793  requestp->resultp = resultp;
794 
795  requestp->request_type = _AIO_OP_STAT;
796 
797  requestp->cancelled = 0;
798 
799  resultp->result_type = _AIO_OP_STAT;
800 
801  squidaio_queue_request(requestp);
802 
803  return 0;
804 }
805 
806 static void
808 {
809  requestp->ret = stat(requestp->path, requestp->tmpstatp);
810  requestp->err = errno;
811 }
812 
813 int
814 squidaio_unlink(const char *path, squidaio_result_t * resultp)
815 {
816  squidaio_init();
817  squidaio_request_t *requestp;
818 
820 
821  requestp->path = squidaio_xstrdup(path);
822 
823  requestp->resultp = resultp;
824 
825  requestp->request_type = _AIO_OP_UNLINK;
826 
827  requestp->cancelled = 0;
828 
829  resultp->result_type = _AIO_OP_UNLINK;
830 
831  squidaio_queue_request(requestp);
832 
833  return 0;
834 }
835 
836 static void
838 {
839  requestp->ret = unlink(requestp->path);
840  requestp->err = errno;
841 }
842 
843 #if AIO_OPENDIR
844 /* XXX squidaio_opendir NOT implemented yet.. */
845 
846 int
847 squidaio_opendir(const char *path, squidaio_result_t * resultp)
848 {
849  squidaio_request_t *requestp;
850  int len;
851 
852  requestp = squidaio_request_pool->alloc();
853 
854  resultp->result_type = _AIO_OP_OPENDIR;
855 
856  return -1;
857 }
858 
859 static void
860 squidaio_do_opendir(squidaio_request_t * requestp)
861 {
862  /* NOT IMPLEMENTED */
863 }
864 
865 #endif
866 
867 static void
869 {
870  /* kick "overflow" request queue */
871 
872  if (request_queue2.head &&
873  pthread_mutex_trylock(&request_queue.mutex) == 0) {
876  pthread_cond_signal(&request_queue.cond);
877  pthread_mutex_unlock(&request_queue.mutex);
878  request_queue2.head = nullptr;
879  request_queue2.tailp = &request_queue2.head;
880  }
881 
882  /* poll done queue */
883  if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
884 
885  struct squidaio_request_t *requests = done_queue.head;
886  done_queue.head = nullptr;
888  pthread_mutex_unlock(&done_queue.mutex);
889  *done_requests.tailp = requests;
890  request_queue_len -= 1;
891 
892  while (requests->next) {
893  requests = requests->next;
894  request_queue_len -= 1;
895  }
896 
897  done_requests.tailp = &requests->next;
898  }
899 }
900 
903 {
904  squidaio_request_t *request;
906  int cancelled;
907  int polled = 0;
908 
909 AIO_REPOLL:
910  request = done_requests.head;
911 
912  if (request == nullptr && !polled) {
915  polled = 1;
916  request = done_requests.head;
917  }
918 
919  if (!request) {
920  return nullptr;
921  }
922 
923  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
924  done_requests.head = request->next;
925 
926  if (!done_requests.head)
927  done_requests.tailp = &done_requests.head;
928 
929  resultp = request->resultp;
930 
931  cancelled = request->cancelled;
932 
933  squidaio_debug(request);
934 
935  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
936 
937  squidaio_cleanup_request(request);
938 
939  if (cancelled)
940  goto AIO_REPOLL;
941 
942  return resultp;
943 } /* squidaio_poll_done */
944 
945 int
947 {
948  return request_queue_len + (done_requests.head ? 1 : 0);
949 }
950 
951 int
953 {
954  /* XXX This might take a while if the queue is large.. */
955 
956  do {
958  } while (request_queue_len > 0);
959 
961 }
962 
963 int
965 {
966  return request_queue_len;
967 }
968 
969 static void
971 {
972  switch (request->request_type) {
973 
974  case _AIO_OP_OPEN:
975  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
976  break;
977 
978  case _AIO_OP_READ:
979  debugs(43, 5, "READ on fd: " << request->fd);
980  break;
981 
982  case _AIO_OP_WRITE:
983  debugs(43, 5, "WRITE on fd: " << request->fd);
984  break;
985 
986  case _AIO_OP_CLOSE:
987  debugs(43, 5, "CLOSE of fd: " << request->fd);
988  break;
989 
990  case _AIO_OP_UNLINK:
991  debugs(43, 5, "UNLINK of " << request->path);
992  break;
993 
994  default:
995  break;
996  }
997 }
998 
999 void
1001 {
1002  squidaio_thread_t *threadp;
1003 
1004  if (!squidaio_initialised)
1005  return;
1006 
1007  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1008 
1009  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1010 
1011  threadp = threads;
1012 
1013  for (size_t i = 0; i < NUMTHREADS; ++i) {
1014  storeAppendPrintf(sentry, "%zu\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1015  threadp = threadp->next;
1016  }
1017 }
1018 
struct squidaio_request_queue_t squidaio_request_queue_t
void fatal(const char *message)
Definition: fatal.cc:28
static struct @38 request_queue2
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops.cc:837
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops.cc:703
static pthread_t main_thread
Definition: aiops.cc:151
#define DBG_CRITICAL
Definition: Stream.h:37
#define AIO_SMALL_BUFS
Definition: aiops.cc:113
#define xmalloc
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops.cc:559
squidaio_result_t * resultp
Definition: aiops.cc:70
struct squidaio_request_t * next
Definition: aiops.cc:53
squidaio_result_t * squidaio_poll_done(void)
Definition: aiops.cc:902
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops.cc:807
unsigned long requests
Definition: aiops.cc:90
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
void squidaio_stats(StoreEntry *sentry)
Definition: aiops.cc:1000
static int squidaio_initialised
Definition: aiops.cc:109
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops.cc:666
@ _THREAD_DONE
Definition: aiops.cc:47
#define NUMTHREADS
Definition: DiskThreads.h:30
static void NotifyIOCompleted()
Definition: CommIO.h:36
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops.cc:622
static Mem::Allocator * squidaio_get_pool(int size)
Definition: aiops.cc:154
static char * squidaio_xstrdup(const char *str)
Definition: aiops.cc:186
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
#define RIDICULOUS_LENGTH
Definition: aiops.cc:40
static squidaio_thread_t * threads
Definition: aiops.cc:108
unsigned long requests
Definition: aiops.cc:78
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition: Allocator.h:44
_squidaio_thread_status
Definition: aiops.cc:42
pthread_mutex_t mutex
Definition: aiops.cc:74
int xwrite(int fd, const void *buf, size_t bufSize)
POSIX write(2) equivalent.
Definition: unistd.h:67
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
static void ResetNotifications()
Definition: CommIO.cc:71
void * squidaio_xmalloc(int size)
Definition: aiops.cc:173
static Mem::Allocator * squidaio_small_bufs
Definition: aiops.cc:119
struct stat * statp
Definition: aiops.cc:69
#define AIO_MEDIUM_BUFS
Definition: aiops.cc:112
@ _THREAD_BUSY
Definition: aiops.cc:45
#define AIO_MICRO_BUFS
Definition: aiops.cc:115
squidaio_request_type request_type
Definition: aiops.cc:54
pthread_t thread
Definition: aiops.cc:86
static void squidaio_xstrfree(char *str)
Definition: aiops.cc:207
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops.cc:780
#define AIO_LARGE_BUFS
Definition: aiops.cc:111
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops.cc:639
static size_t request_queue_len
Definition: aiops.cc:123
@ _AIO_OP_READ
Definition: DiskThreads.h:48
struct squidaio_request_t squidaio_request_t
static void NotifyIOClose()
Definition: CommIO.cc:40
void squidaio_init(void)
Definition: aiops.cc:218
void squidaio_xfree(void *p, int size)
Definition: aiops.cc:198
static void squidaio_poll_queues(void)
Definition: aiops.cc:868
int size
Definition: ModDevPoll.cc:70
void freeOne(void *obj)
return memory reserved by alloc()
Definition: Allocator.h:51
static Mem::Allocator * squidaio_request_pool
Definition: aiops.cc:124
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops.cc:743
static Mem::Allocator * squidaio_tiny_bufs
Definition: aiops.cc:120
#define MAGIC1
Definition: DiskThreads.h:34
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:713
@ _THREAD_FAILED
Definition: aiops.cc:46
squidaio_request_t ** tailp
Definition: aiops.cc:129
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition: Pool.h:123
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:673
static squidaio_request_queue_t done_queue
Definition: aiops.cc:136
squidaio_thread_status status
Definition: aiops.cc:87
void * squidaio_thread_loop(void *)
Definition: aiops.cc:346
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:49
#define assert(EX)
Definition: assert.h:17
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops.cc:750
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
static Mem::Allocator * squidaio_medium_bufs
Definition: aiops.cc:118
@ _THREAD_STARTING
Definition: aiops.cc:43
time_t squid_curtime
Definition: stub_libtime.cc:20
#define xfree
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
static pthread_attr_t globattr
Definition: aiops.cc:146
static squidaio_request_queue_t request_queue
Definition: aiops.cc:126
int squidaio_sync(void)
Definition: aiops.cc:952
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops.cc:465
squidaio_thread_t * next
Definition: aiops.cc:85
squidaio_request_t *volatile head
Definition: aiops.cc:76
size_t buflen
Definition: aiops.cc:61
squidaio_request_t * head
Definition: aiops.cc:129
static void squidaio_debug(squidaio_request_t *)
Definition: aiops.cc:970
static void Initialize()
Definition: CommIO.cc:21
static Mem::Allocator * squidaio_micro_bufs
Definition: aiops.cc:121
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:77
unsigned short mode_t
Definition: types.h:129
int xread(int fd, void *buf, size_t bufSize)
POSIX read(2) equivalent.
Definition: unistd.h:61
#define AIO_TINY_BUFS
Definition: aiops.cc:114
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops.cc:814
#define DBG_IMPORTANT
Definition: Stream.h:38
int squidaio_opendir(const char *, squidaio_result_t *)
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
static Mem::Allocator * squidaio_large_bufs
Definition: aiops.cc:117
struct squidaio_request_t * current_req
Definition: aiops.cc:89
pthread_cond_t cond
Definition: aiops.cc:75
int xclose(int fd)
POSIX close(2) equivalent.
Definition: unistd.h:43
unsigned long blocked
Definition: aiops.cc:79
static struct @39 done_requests
int squidaio_operations_pending(void)
Definition: aiops.cc:946
char * bufferp
Definition: aiops.cc:60
static Mem::Allocator * squidaio_thread_pool
Definition: aiops.cc:125
int squidaio_get_queue_len(void)
Definition: aiops.cc:964
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void squidaio_shutdown(void)
Definition: aiops.cc:330
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops.cc:772
@ _THREAD_WAITING
Definition: aiops.cc:44
struct stat * tmpstatp
Definition: aiops.cc:67

 

Introduction

Documentation

Support

Miscellaneous