aiops.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 AIOPS */
10 
11 #ifndef _REENTRANT
12 #error "_REENTRANT MUST be defined to build squid async io support."
13 #endif
14 
15 #include "squid.h"
17 #include "DiskThreads.h"
18 #include "SquidConfig.h"
19 #include "Store.h"
20 
21 /*
22  * struct stat and squidaio_xstrdup use explicit pool alloc()/freeOne().
23  * XXX: convert to MEMPROXY_CLASS() API
24  */
25 #include "mem/Allocator.h"
26 #include "mem/Pool.h"
27 
28 #include <cerrno>
29 #include <csignal>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <pthread.h>
33 #include <dirent.h>
34 #if HAVE_SCHED_H
35 #include <sched.h>
36 #endif
37 
38 #define RIDICULOUS_LENGTH 4096
39 
46 };
48 
49 typedef struct squidaio_request_t {
50 
53  int cancelled;
54  char *path;
55  int oflag;
57  int fd;
58  char *bufferp;
59  size_t buflen;
60  off_t offset;
61  int whence;
62  int ret;
63  int err;
64 
65  struct stat *tmpstatp;
66 
67  struct stat *statp;
70 
71 typedef struct squidaio_request_queue_t {
72  pthread_mutex_t mutex;
73  pthread_cond_t cond;
75  squidaio_request_t *volatile *volatile tailp;
76  unsigned long requests;
77  unsigned long blocked; /* main failed to lock the queue */
79 
81 
84  pthread_t thread;
86 
88  unsigned long requests;
89 };
90 
93 void *squidaio_thread_loop(void *);
100 #if AIO_OPENDIR
101 static void *squidaio_do_opendir(squidaio_request_t *);
102 #endif
103 static void squidaio_debug(squidaio_request_t *);
104 static void squidaio_poll_queues(void);
105 
106 static squidaio_thread_t *threads = nullptr;
107 static int squidaio_initialised = 0;
108 
109 #define AIO_LARGE_BUFS 16384
110 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
111 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
112 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
113 #define AIO_MICRO_BUFS 128
114 
115 static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
116 static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
117 static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
118 static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
119 static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
120 
121 static int request_queue_len = 0;
125 
126 static struct {
128 }
129 
130 request_queue2 = {
131 
132  nullptr, &request_queue2.head
133 };
135 
136 static struct {
138 }
139 
140 done_requests = {
141 
142  nullptr, &done_requests.head
143 };
144 static pthread_attr_t globattr;
145 #if HAVE_SCHED_H
146 
147 static struct sched_param globsched;
148 #endif
149 static pthread_t main_thread;
150 
151 static Mem::Allocator *
153 {
154  if (size <= AIO_LARGE_BUFS) {
155  if (size <= AIO_MICRO_BUFS)
156  return squidaio_micro_bufs;
157  else if (size <= AIO_TINY_BUFS)
158  return squidaio_tiny_bufs;
159  else if (size <= AIO_SMALL_BUFS)
160  return squidaio_small_bufs;
161  else if (size <= AIO_MEDIUM_BUFS)
162  return squidaio_medium_bufs;
163  else
164  return squidaio_large_bufs;
165  }
166 
167  return nullptr;
168 }
169 
170 void *
172 {
173  void *p;
174 
175  if (const auto pool = squidaio_get_pool(size)) {
176  p = pool->alloc();
177  } else
178  p = xmalloc(size);
179 
180  return p;
181 }
182 
183 static char *
184 squidaio_xstrdup(const char *str)
185 {
186  char *p;
187  int len = strlen(str) + 1;
188 
189  p = (char *)squidaio_xmalloc(len);
190  strncpy(p, str, len);
191 
192  return p;
193 }
194 
195 void
196 squidaio_xfree(void *p, int size)
197 {
198  if (const auto pool = squidaio_get_pool(size)) {
199  pool->freeOne(p);
200  } else
201  xfree(p);
202 }
203 
204 static void
206 {
207  int len = strlen(str) + 1;
208 
209  if (const auto pool = squidaio_get_pool(len)) {
210  pool->freeOne(str);
211  } else
212  xfree(str);
213 }
214 
215 void
217 {
218  int i;
219  squidaio_thread_t *threadp;
220 
222  return;
223 
224  pthread_attr_init(&globattr);
225 
226 #if HAVE_PTHREAD_ATTR_SETSCOPE
227 
228  pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
229 
230 #endif
231 #if HAVE_SCHED_H
232 
233  globsched.sched_priority = 1;
234 
235 #endif
236 
237  main_thread = pthread_self();
238 
239 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
240 
241  pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
242 
243 #endif
244 #if HAVE_SCHED_H
245 
246  globsched.sched_priority = 2;
247 
248 #endif
249 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
250 
251  pthread_attr_setschedparam(&globattr, &globsched);
252 
253 #endif
254 
255  /* Give each thread a smaller 256KB stack, should be more than sufficient */
256  pthread_attr_setstacksize(&globattr, 256 * 1024);
257 
258  /* Initialize request queue */
259  if (pthread_mutex_init(&(request_queue.mutex), nullptr))
260  fatal("Failed to create mutex");
261 
262  if (pthread_cond_init(&(request_queue.cond), nullptr))
263  fatal("Failed to create condition variable");
264 
265  request_queue.head = nullptr;
266 
268 
270 
272 
273  /* Initialize done queue */
274  if (pthread_mutex_init(&(done_queue.mutex), nullptr))
275  fatal("Failed to create mutex");
276 
277  if (pthread_cond_init(&(done_queue.cond), nullptr))
278  fatal("Failed to create condition variable");
279 
280  done_queue.head = nullptr;
281 
283 
284  done_queue.requests = 0;
285 
286  done_queue.blocked = 0;
287 
288  // Initialize the thread I/O pipes before creating any threads
289  // see bug 3189 comment 5 about race conditions.
291 
292  /* Create threads and get them to sit in their wait loop */
293  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
294 
295  assert(NUMTHREADS != 0);
296 
297  for (i = 0; i < NUMTHREADS; ++i) {
299  threadp->status = _THREAD_STARTING;
300  threadp->current_req = nullptr;
301  threadp->requests = 0;
302  threadp->next = threads;
303  threads = threadp;
304 
305  if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
306  fprintf(stderr, "Thread creation failed\n");
307  threadp->status = _THREAD_FAILED;
308  continue;
309  }
310  }
311 
312  /* Create request pool */
313  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
314 
315  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
316 
317  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
318 
319  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
320 
321  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
322 
323  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
324 
326 }
327 
328 void
330 {
332  return;
333 
334  /* This is the same as in squidaio_sync */
335  do {
337  } while (request_queue_len > 0);
338 
340 
342 }
343 
344 void *
346 {
347  squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
348  squidaio_request_t *request;
349  sigset_t newSig;
350 
351  /*
352  * Make sure to ignore signals which may possibly get sent to
353  * the parent squid thread. Causes havoc with mutex's and
354  * condition waits otherwise
355  */
356 
357  sigemptyset(&newSig);
358  sigaddset(&newSig, SIGPIPE);
359  sigaddset(&newSig, SIGCHLD);
360 #if defined(_SQUID_LINUX_THREADS_)
361 
362  sigaddset(&newSig, SIGQUIT);
363  sigaddset(&newSig, SIGTRAP);
364 #else
365 
366  sigaddset(&newSig, SIGUSR1);
367  sigaddset(&newSig, SIGUSR2);
368 #endif
369 
370  sigaddset(&newSig, SIGHUP);
371  sigaddset(&newSig, SIGTERM);
372  sigaddset(&newSig, SIGINT);
373  sigaddset(&newSig, SIGALRM);
374  pthread_sigmask(SIG_BLOCK, &newSig, nullptr);
375 
376  while (1) {
377  threadp->current_req = request = nullptr;
378  request = nullptr;
379  /* Get a request to process */
380  threadp->status = _THREAD_WAITING;
381  pthread_mutex_lock(&request_queue.mutex);
382 
383  while (!request_queue.head) {
384  pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
385  }
386 
387  request = request_queue.head;
388 
389  if (request)
390  request_queue.head = request->next;
391 
392  if (!request_queue.head)
394 
395  pthread_mutex_unlock(&request_queue.mutex);
396 
397  /* process the request */
398  threadp->status = _THREAD_BUSY;
399 
400  request->next = nullptr;
401 
402  threadp->current_req = request;
403 
404  errno = 0;
405 
406  if (!request->cancelled) {
407  switch (request->request_type) {
408 
409  case _AIO_OP_OPEN:
410  squidaio_do_open(request);
411  break;
412 
413  case _AIO_OP_READ:
414  squidaio_do_read(request);
415  break;
416 
417  case _AIO_OP_WRITE:
418  squidaio_do_write(request);
419  break;
420 
421  case _AIO_OP_CLOSE:
422  squidaio_do_close(request);
423  break;
424 
425  case _AIO_OP_UNLINK:
426  squidaio_do_unlink(request);
427  break;
428 
429 #if AIO_OPENDIR /* Opendir not implemented yet */
430 
431  case _AIO_OP_OPENDIR:
432  squidaio_do_opendir(request);
433  break;
434 #endif
435 
436  case _AIO_OP_STAT:
437  squidaio_do_stat(request);
438  break;
439 
440  default:
441  request->ret = -1;
442  request->err = EINVAL;
443  break;
444  }
445  } else { /* cancelled */
446  request->ret = -1;
447  request->err = EINTR;
448  }
449 
450  threadp->status = _THREAD_DONE;
451  /* put the request in the done queue */
452  pthread_mutex_lock(&done_queue.mutex);
453  *done_queue.tailp = request;
454  done_queue.tailp = &request->next;
455  pthread_mutex_unlock(&done_queue.mutex);
457  ++ threadp->requests;
458  } /* while forever */
459 
460  return nullptr;
461 } /* squidaio_thread_loop */
462 
463 static void
465 {
466  static int high_start = 0;
467  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
468  /* Mark it as not executed (failing result, no error) */
469  request->ret = -1;
470  request->err = 0;
471  /* Internal housekeeping */
472  request_queue_len += 1;
473  request->resultp->_data = request;
474  /* Play some tricks with the request_queue2 queue */
475  request->next = nullptr;
476 
477  if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
478  if (request_queue2.head) {
479  /* Grab blocked requests */
482  }
483 
484  /* Enqueue request */
485  *request_queue.tailp = request;
486 
487  request_queue.tailp = &request->next;
488 
489  pthread_cond_signal(&request_queue.cond);
490 
491  pthread_mutex_unlock(&request_queue.mutex);
492 
493  if (request_queue2.head) {
494  /* Clear queue of blocked requests */
495  request_queue2.head = nullptr;
496  request_queue2.tailp = &request_queue2.head;
497  }
498  } else {
499  /* Oops, the request queue is blocked, use request_queue2 */
500  *request_queue2.tailp = request;
501  request_queue2.tailp = &request->next;
502  }
503 
504  if (request_queue2.head) {
505  static uint64_t filter = 0;
506  static uint64_t filter_limit = 8192;
507 
508  if (++filter >= filter_limit) {
509  filter_limit += filter;
510  filter = 0;
511  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
512  }
513  }
514 
515  /* Warn if out of threads */
516  if (request_queue_len > MAGIC1) {
517  static int last_warn = 0;
518  static int queue_high, queue_low;
519 
520  if (high_start == 0) {
521  high_start = squid_curtime;
522  queue_high = request_queue_len;
523  queue_low = request_queue_len;
524  }
525 
526  if (request_queue_len > queue_high)
527  queue_high = request_queue_len;
528 
529  if (request_queue_len < queue_low)
530  queue_low = request_queue_len;
531 
532  if (squid_curtime >= (last_warn + 15) &&
533  squid_curtime >= (high_start + 5)) {
534  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
535 
536  if (squid_curtime >= (high_start + 15))
537  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
538  request_queue_len << ", high=" << queue_high <<
539  ", low=" << queue_low << ", duration=" <<
540  (long int) (squid_curtime - high_start));
541 
542  last_warn = squid_curtime;
543  }
544  } else {
545  high_start = 0;
546  }
547 
548  /* Warn if seriously overloaded */
550  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
551  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
552  squidaio_sync();
553  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
554  }
555 } /* squidaio_queue_request */
556 
557 static void
559 {
560  squidaio_result_t *resultp = requestp->resultp;
561  int cancelled = requestp->cancelled;
562 
563  /* Free allocated structures and copy data back to user space if the */
564  /* request hasn't been cancelled */
565 
566  switch (requestp->request_type) {
567 
568  case _AIO_OP_STAT:
569 
570  if (!cancelled && requestp->ret == 0)
571  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
572 
573  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
574 
575  squidaio_xstrfree(requestp->path);
576 
577  break;
578 
579  case _AIO_OP_OPEN:
580  if (cancelled && requestp->ret >= 0)
581  /* The open() was cancelled but completed */
582  close(requestp->ret);
583 
584  squidaio_xstrfree(requestp->path);
585 
586  break;
587 
588  case _AIO_OP_CLOSE:
589  if (cancelled && requestp->ret < 0)
590  /* The close() was cancelled and never got executed */
591  close(requestp->fd);
592 
593  break;
594 
595  case _AIO_OP_UNLINK:
596 
597  case _AIO_OP_OPENDIR:
598  squidaio_xstrfree(requestp->path);
599 
600  break;
601 
602  case _AIO_OP_READ:
603  break;
604 
605  case _AIO_OP_WRITE:
606  break;
607 
608  default:
609  break;
610  }
611 
612  if (resultp != nullptr && !cancelled) {
613  resultp->aio_return = requestp->ret;
614  resultp->aio_errno = requestp->err;
615  }
616 
617  squidaio_request_pool->freeOne(requestp);
618 } /* squidaio_cleanup_request */
619 
620 int
622 {
623  squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
624 
625  if (request && request->resultp == resultp) {
626  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
627  request->cancelled = 1;
628  request->resultp = nullptr;
629  resultp->_data = nullptr;
630  resultp->result_type = _AIO_OP_NONE;
631  return 0;
632  }
633 
634  return 1;
635 } /* squidaio_cancel */
636 
637 int
638 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
639 {
640  squidaio_init();
641  squidaio_request_t *requestp;
642 
644 
645  requestp->path = (char *) squidaio_xstrdup(path);
646 
647  requestp->oflag = oflag;
648 
649  requestp->mode = mode;
650 
651  requestp->resultp = resultp;
652 
653  requestp->request_type = _AIO_OP_OPEN;
654 
655  requestp->cancelled = 0;
656 
657  resultp->result_type = _AIO_OP_OPEN;
658 
659  squidaio_queue_request(requestp);
660 
661  return 0;
662 }
663 
664 static void
666 {
667  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
668  requestp->err = errno;
669 }
670 
671 int
672 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
673 {
674  squidaio_request_t *requestp;
675 
677 
678  requestp->fd = fd;
679 
680  requestp->bufferp = bufp;
681 
682  requestp->buflen = bufs;
683 
684  requestp->offset = offset;
685 
686  requestp->whence = whence;
687 
688  requestp->resultp = resultp;
689 
690  requestp->request_type = _AIO_OP_READ;
691 
692  requestp->cancelled = 0;
693 
694  resultp->result_type = _AIO_OP_READ;
695 
696  squidaio_queue_request(requestp);
697 
698  return 0;
699 }
700 
701 static void
703 {
704  if (lseek(requestp->fd, requestp->offset, requestp->whence) >= 0)
705  requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
706  else
707  requestp->ret = -1;
708  requestp->err = errno;
709 }
710 
711 int
712 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
713 {
714  squidaio_request_t *requestp;
715 
717 
718  requestp->fd = fd;
719 
720  requestp->bufferp = bufp;
721 
722  requestp->buflen = bufs;
723 
724  requestp->offset = offset;
725 
726  requestp->whence = whence;
727 
728  requestp->resultp = resultp;
729 
730  requestp->request_type = _AIO_OP_WRITE;
731 
732  requestp->cancelled = 0;
733 
734  resultp->result_type = _AIO_OP_WRITE;
735 
736  squidaio_queue_request(requestp);
737 
738  return 0;
739 }
740 
741 static void
743 {
744  requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
745  requestp->err = errno;
746 }
747 
748 int
750 {
751  squidaio_request_t *requestp;
752 
754 
755  requestp->fd = fd;
756 
757  requestp->resultp = resultp;
758 
759  requestp->request_type = _AIO_OP_CLOSE;
760 
761  requestp->cancelled = 0;
762 
763  resultp->result_type = _AIO_OP_CLOSE;
764 
765  squidaio_queue_request(requestp);
766 
767  return 0;
768 }
769 
770 static void
772 {
773  requestp->ret = close(requestp->fd);
774  requestp->err = errno;
775 }
776 
777 int
778 
779 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
780 {
781  squidaio_init();
782  squidaio_request_t *requestp;
783 
785 
786  requestp->path = (char *) squidaio_xstrdup(path);
787 
788  requestp->statp = sb;
789 
790  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
791 
792  requestp->resultp = resultp;
793 
794  requestp->request_type = _AIO_OP_STAT;
795 
796  requestp->cancelled = 0;
797 
798  resultp->result_type = _AIO_OP_STAT;
799 
800  squidaio_queue_request(requestp);
801 
802  return 0;
803 }
804 
805 static void
807 {
808  requestp->ret = stat(requestp->path, requestp->tmpstatp);
809  requestp->err = errno;
810 }
811 
812 int
813 squidaio_unlink(const char *path, squidaio_result_t * resultp)
814 {
815  squidaio_init();
816  squidaio_request_t *requestp;
817 
819 
820  requestp->path = squidaio_xstrdup(path);
821 
822  requestp->resultp = resultp;
823 
824  requestp->request_type = _AIO_OP_UNLINK;
825 
826  requestp->cancelled = 0;
827 
828  resultp->result_type = _AIO_OP_UNLINK;
829 
830  squidaio_queue_request(requestp);
831 
832  return 0;
833 }
834 
835 static void
837 {
838  requestp->ret = unlink(requestp->path);
839  requestp->err = errno;
840 }
841 
842 #if AIO_OPENDIR
843 /* XXX squidaio_opendir NOT implemented yet.. */
844 
845 int
846 squidaio_opendir(const char *path, squidaio_result_t * resultp)
847 {
848  squidaio_request_t *requestp;
849  int len;
850 
851  requestp = squidaio_request_pool->alloc();
852 
853  resultp->result_type = _AIO_OP_OPENDIR;
854 
855  return -1;
856 }
857 
858 static void
859 squidaio_do_opendir(squidaio_request_t * requestp)
860 {
861  /* NOT IMPLEMENTED */
862 }
863 
864 #endif
865 
866 static void
868 {
869  /* kick "overflow" request queue */
870 
871  if (request_queue2.head &&
872  pthread_mutex_trylock(&request_queue.mutex) == 0) {
875  pthread_cond_signal(&request_queue.cond);
876  pthread_mutex_unlock(&request_queue.mutex);
877  request_queue2.head = nullptr;
878  request_queue2.tailp = &request_queue2.head;
879  }
880 
881  /* poll done queue */
882  if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
883 
884  struct squidaio_request_t *requests = done_queue.head;
885  done_queue.head = nullptr;
887  pthread_mutex_unlock(&done_queue.mutex);
888  *done_requests.tailp = requests;
889  request_queue_len -= 1;
890 
891  while (requests->next) {
892  requests = requests->next;
893  request_queue_len -= 1;
894  }
895 
896  done_requests.tailp = &requests->next;
897  }
898 }
899 
902 {
903  squidaio_request_t *request;
905  int cancelled;
906  int polled = 0;
907 
908 AIO_REPOLL:
909  request = done_requests.head;
910 
911  if (request == nullptr && !polled) {
914  polled = 1;
915  request = done_requests.head;
916  }
917 
918  if (!request) {
919  return nullptr;
920  }
921 
922  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
923  done_requests.head = request->next;
924 
925  if (!done_requests.head)
926  done_requests.tailp = &done_requests.head;
927 
928  resultp = request->resultp;
929 
930  cancelled = request->cancelled;
931 
932  squidaio_debug(request);
933 
934  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
935 
936  squidaio_cleanup_request(request);
937 
938  if (cancelled)
939  goto AIO_REPOLL;
940 
941  return resultp;
942 } /* squidaio_poll_done */
943 
944 int
946 {
947  return request_queue_len + (done_requests.head ? 1 : 0);
948 }
949 
950 int
952 {
953  /* XXX This might take a while if the queue is large.. */
954 
955  do {
957  } while (request_queue_len > 0);
958 
960 }
961 
962 int
964 {
965  return request_queue_len;
966 }
967 
968 static void
970 {
971  switch (request->request_type) {
972 
973  case _AIO_OP_OPEN:
974  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
975  break;
976 
977  case _AIO_OP_READ:
978  debugs(43, 5, "READ on fd: " << request->fd);
979  break;
980 
981  case _AIO_OP_WRITE:
982  debugs(43, 5, "WRITE on fd: " << request->fd);
983  break;
984 
985  case _AIO_OP_CLOSE:
986  debugs(43, 5, "CLOSE of fd: " << request->fd);
987  break;
988 
989  case _AIO_OP_UNLINK:
990  debugs(43, 5, "UNLINK of " << request->path);
991  break;
992 
993  default:
994  break;
995  }
996 }
997 
998 void
1000 {
1001  squidaio_thread_t *threadp;
1002  int i;
1003 
1004  if (!squidaio_initialised)
1005  return;
1006 
1007  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1008 
1009  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1010 
1011  threadp = threads;
1012 
1013  for (i = 0; i < NUMTHREADS; ++i) {
1014  storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1015  threadp = threadp->next;
1016  }
1017 }
1018 
struct squidaio_request_queue_t squidaio_request_queue_t
void fatal(const char *message)
Definition: fatal.cc:28
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops.cc:836
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops.cc:702
static pthread_t main_thread
Definition: aiops.cc:149
#define DBG_CRITICAL
Definition: Stream.h:37
#define AIO_SMALL_BUFS
Definition: aiops.cc:111
#define xmalloc
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops.cc:558
squidaio_result_t * resultp
Definition: aiops.cc:68
struct squidaio_request_t * next
Definition: aiops.cc:51
squidaio_result_t * squidaio_poll_done(void)
Definition: aiops.cc:901
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops.cc:806
unsigned long requests
Definition: aiops.cc:88
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
void squidaio_stats(StoreEntry *sentry)
Definition: aiops.cc:999
static int squidaio_initialised
Definition: aiops.cc:107
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops.cc:665
@ _THREAD_DONE
Definition: aiops.cc:45
#define NUMTHREADS
Definition: DiskThreads.h:30
static void NotifyIOCompleted()
Definition: CommIO.h:36
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops.cc:621
static Mem::Allocator * squidaio_get_pool(int size)
Definition: aiops.cc:152
static char * squidaio_xstrdup(const char *str)
Definition: aiops.cc:184
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
#define RIDICULOUS_LENGTH
Definition: aiops.cc:38
static squidaio_thread_t * threads
Definition: aiops.cc:106
unsigned long requests
Definition: aiops.cc:76
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition: Allocator.h:44
_squidaio_thread_status
Definition: aiops.cc:40
pthread_mutex_t mutex
Definition: aiops.cc:72
static struct @43 done_requests
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
static void ResetNotifications()
Definition: CommIO.cc:69
void * squidaio_xmalloc(int size)
Definition: aiops.cc:171
static Mem::Allocator * squidaio_small_bufs
Definition: aiops.cc:117
struct stat * statp
Definition: aiops.cc:67
#define AIO_MEDIUM_BUFS
Definition: aiops.cc:110
@ _THREAD_BUSY
Definition: aiops.cc:43
#define AIO_MICRO_BUFS
Definition: aiops.cc:113
squidaio_request_type request_type
Definition: aiops.cc:52
static int request_queue_len
Definition: aiops.cc:121
pthread_t thread
Definition: aiops.cc:84
static void squidaio_xstrfree(char *str)
Definition: aiops.cc:205
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops.cc:779
#define AIO_LARGE_BUFS
Definition: aiops.cc:109
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops.cc:638
@ _AIO_OP_READ
Definition: DiskThreads.h:48
struct squidaio_request_t squidaio_request_t
static void NotifyIOClose()
Definition: CommIO.cc:38
void squidaio_init(void)
Definition: aiops.cc:216
void squidaio_xfree(void *p, int size)
Definition: aiops.cc:196
static void squidaio_poll_queues(void)
Definition: aiops.cc:867
int size
Definition: ModDevPoll.cc:69
void freeOne(void *obj)
return memory reserved by alloc()
Definition: Allocator.h:51
static Mem::Allocator * squidaio_request_pool
Definition: aiops.cc:122
static struct @42 request_queue2
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops.cc:742
static Mem::Allocator * squidaio_tiny_bufs
Definition: aiops.cc:118
#define MAGIC1
Definition: DiskThreads.h:34
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:712
@ _THREAD_FAILED
Definition: aiops.cc:44
squidaio_request_t ** tailp
Definition: aiops.cc:127
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition: Pool.h:123
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops.cc:672
static squidaio_request_queue_t done_queue
Definition: aiops.cc:134
squidaio_thread_status status
Definition: aiops.cc:85
void * squidaio_thread_loop(void *)
Definition: aiops.cc:345
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:47
#define assert(EX)
Definition: assert.h:17
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops.cc:749
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
static Mem::Allocator * squidaio_medium_bufs
Definition: aiops.cc:116
@ _THREAD_STARTING
Definition: aiops.cc:41
time_t squid_curtime
Definition: stub_libtime.cc:20
#define xfree
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
static pthread_attr_t globattr
Definition: aiops.cc:144
static squidaio_request_queue_t request_queue
Definition: aiops.cc:124
int squidaio_sync(void)
Definition: aiops.cc:951
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops.cc:464
squidaio_thread_t * next
Definition: aiops.cc:83
squidaio_request_t *volatile head
Definition: aiops.cc:74
size_t buflen
Definition: aiops.cc:59
squidaio_request_t * head
Definition: aiops.cc:127
static void squidaio_debug(squidaio_request_t *)
Definition: aiops.cc:969
static void Initialize()
Definition: CommIO.cc:19
static Mem::Allocator * squidaio_micro_bufs
Definition: aiops.cc:119
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:75
unsigned short mode_t
Definition: types.h:129
#define AIO_TINY_BUFS
Definition: aiops.cc:112
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops.cc:813
#define DBG_IMPORTANT
Definition: Stream.h:38
int squidaio_opendir(const char *, squidaio_result_t *)
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
static Mem::Allocator * squidaio_large_bufs
Definition: aiops.cc:115
struct squidaio_request_t * current_req
Definition: aiops.cc:87
pthread_cond_t cond
Definition: aiops.cc:73
unsigned long blocked
Definition: aiops.cc:77
int squidaio_operations_pending(void)
Definition: aiops.cc:945
char * bufferp
Definition: aiops.cc:58
static Mem::Allocator * squidaio_thread_pool
Definition: aiops.cc:123
int squidaio_get_queue_len(void)
Definition: aiops.cc:963
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void squidaio_shutdown(void)
Definition: aiops.cc:329
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops.cc:771
@ _THREAD_WAITING
Definition: aiops.cc:42
struct stat * tmpstatp
Definition: aiops.cc:65

 

Introduction

Documentation

Support

Miscellaneous