aiops_win32.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 Windows AIOPS */
10 
11 #include "squid.h"
12 #include "compat/unistd.h"
13 #include "compat/win32_maperror.h"
15 #include "DiskThreads.h"
16 #include "fd.h"
17 #include "mem/Allocator.h"
18 #include "mem/Pool.h"
19 #include "SquidConfig.h"
20 #include "Store.h"
21 
22 #include <cerrno>
23 #include <csignal>
24 #include <sys/stat.h>
25 #include <fcntl.h>
26 #include <dirent.h>
27 
28 #define RIDICULOUS_LENGTH 4096
29 
36 };
38 
39 typedef struct squidaio_request_t {
40 
41  struct squidaio_request_t *next;
43  int cancelled;
44  char *path;
45  int oflag;
46  mode_t mode;
47  int fd;
48  char *bufferp;
49  char *tmpbufp;
50  size_t buflen;
51  off_t offset;
52  int whence;
53  int ret;
54  int err;
55 
56  struct stat *tmpstatp;
57 
58  struct stat *statp;
61 
62 typedef struct squidaio_request_queue_t {
63  HANDLE mutex;
64  HANDLE cond; /* See Event objects */
65  squidaio_request_t *volatile head;
66  squidaio_request_t *volatile *volatile tailp;
67  unsigned long requests;
68  unsigned long blocked; /* main failed to lock the queue */
70 
72 
73 struct squidaio_thread_t {
75  HANDLE thread;
76  DWORD dwThreadId; /* thread ID */
78 
80  unsigned long requests;
81  int volatile exit;
82 };
83 
86 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
93 #if AIO_OPENDIR
94 static void *squidaio_do_opendir(squidaio_request_t *);
95 #endif
96 static void squidaio_debug(squidaio_request_t *);
97 static void squidaio_poll_queues(void);
98 
99 static squidaio_thread_t *threads = nullptr;
100 static int squidaio_initialised = 0;
101 
102 #define AIO_LARGE_BUFS 16384
103 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
104 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
105 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
106 #define AIO_MICRO_BUFS 128
107 
108 static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
109 static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
110 static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
111 static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
112 static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
113 
114 static size_t request_queue_len = 0;
118 
119 static struct {
121 }
122 
123 request_queue2 = {
124 
125  nullptr, &request_queue2.head
126 };
128 
129 static struct {
131 }
132 
133 done_requests = {
134 
135  nullptr, &done_requests.head
136 };
137 
138 static HANDLE main_thread;
139 
140 static Mem::Allocator *
142 {
143  if (size <= AIO_LARGE_BUFS) {
144  if (size <= AIO_MICRO_BUFS)
145  return squidaio_micro_bufs;
146  else if (size <= AIO_TINY_BUFS)
147  return squidaio_tiny_bufs;
148  else if (size <= AIO_SMALL_BUFS)
149  return squidaio_small_bufs;
150  else if (size <= AIO_MEDIUM_BUFS)
151  return squidaio_medium_bufs;
152  else
153  return squidaio_large_bufs;
154  }
155 
156  return nullptr;
157 }
158 
159 void *
161 {
162  void *p;
163  if (const auto pool = squidaio_get_pool(size)) {
164  p = pool->alloc();
165  } else
166  p = xmalloc(size);
167 
168  return p;
169 }
170 
171 static char *
172 squidaio_xstrdup(const char *str)
173 {
174  char *p;
175  int len = strlen(str) + 1;
176 
177  p = (char *)squidaio_xmalloc(len);
178  strncpy(p, str, len);
179 
180  return p;
181 }
182 
183 void
184 squidaio_xfree(void *p, int size)
185 {
186  if (const auto pool = squidaio_get_pool(size)) {
187  pool->freeOne(p);
188  } else
189  xfree(p);
190 }
191 
192 static void
194 {
195  int len = strlen(str) + 1;
196 
197  if (const auto pool = squidaio_get_pool(len)) {
198  pool->freeOne(str);
199  } else
200  xfree(str);
201 }
202 
203 void
205 {
206  squidaio_thread_t *threadp;
207 
209  return;
210 
211  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
212  GetCurrentThread(), /* pseudo handle to copy */
213  GetCurrentProcess(), /* pseudo handle, don't close */
214  &main_thread,
215  0, /* required access */
216  FALSE, /* child process's don't inherit the handle */
217  DUPLICATE_SAME_ACCESS)) {
218  /* spit errors */
219  fatal("Couldn't get current thread handle");
220  }
221 
222  /* Initialize request queue */
223  if ((request_queue.mutex = CreateMutex(nullptr, /* no inheritance */
224  FALSE, /* start unowned (as per mutex_init) */
225  nullptr) /* no name */
226  ) == NULL) {
227  fatal("Failed to create mutex");
228  }
229 
230  if ((request_queue.cond = CreateEvent(nullptr, /* no inheritance */
231  FALSE, /* auto signal reset - which I think is pthreads like ? */
232  FALSE, /* start non signaled */
233  nullptr) /* no name */
234  ) == NULL) {
235  fatal("Failed to create condition variable");
236  }
237 
238  request_queue.head = nullptr;
239 
241 
243 
245 
246  /* Initialize done queue */
247 
248  if ((done_queue.mutex = CreateMutex(nullptr, /* no inheritance */
249  FALSE, /* start unowned (as per mutex_init) */
250  nullptr) /* no name */
251  ) == NULL) {
252  fatal("Failed to create mutex");
253  }
254 
255  if ((done_queue.cond = CreateEvent(nullptr, /* no inheritance */
256  TRUE, /* manually signaled - which I think is pthreads like ? */
257  FALSE, /* start non signaled */
258  nullptr) /* no name */
259  ) == NULL) {
260  fatal("Failed to create condition variable");
261  }
262 
263  done_queue.head = nullptr;
264 
266 
267  done_queue.requests = 0;
268 
269  done_queue.blocked = 0;
270 
271  // Initialize the thread I/O pipes before creating any threads
272  // see bug 3189 comment 5 about race conditions.
274 
275  /* Create threads and get them to sit in their wait loop */
276  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
277 
278  assert(NUMTHREADS > 0);
279 
280  for (size_t i = 0; i < NUMTHREADS; ++i) {
282  threadp->status = _THREAD_STARTING;
283  threadp->current_req = nullptr;
284  threadp->requests = 0;
285  threadp->next = threads;
286  threads = threadp;
287 
288  if ((threadp->thread = CreateThread(nullptr, /* no security attributes */
289  0, /* use default stack size */
290  squidaio_thread_loop, /* thread function */
291  threadp, /* argument to thread function */
292  0, /* use default creation flags */
293  &(threadp->dwThreadId)) /* returns the thread identifier */
294  ) == NULL) {
295  fprintf(stderr, "Thread creation failed\n");
296  threadp->status = _THREAD_FAILED;
297  continue;
298  }
299 
300  /* Set the new thread priority above parent process */
301  SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
302  }
303 
304  /* Create request pool */
305  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
306 
307  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
308 
309  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
310 
311  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
312 
313  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
314 
315  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
316 
318 }
319 
320 void
322 {
323  squidaio_thread_t *threadp;
324  HANDLE * hthreads;
325 
327  return;
328 
329  /* This is the same as in squidaio_sync */
330  do {
332  } while (request_queue_len > 0);
333 
334  hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
335 
336  threadp = threads;
337 
338  for (size_t i = 0; i < NUMTHREADS; ++i) {
339  threadp->exit = 1;
340  hthreads[i] = threadp->thread;
341  threadp = threadp->next;
342  }
343 
344  ReleaseMutex(request_queue.mutex);
345  ResetEvent(request_queue.cond);
346  ReleaseMutex(done_queue.mutex);
347  ResetEvent(done_queue.cond);
348  Sleep(0);
349 
350  WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
351 
352  for (size_t i = 0; i < NUMTHREADS; ++i) {
353  CloseHandle(hthreads[i]);
354  }
355 
356  CloseHandle(main_thread);
358 
360  xfree(hthreads);
361 }
362 
363 static DWORD WINAPI
364 squidaio_thread_loop(LPVOID lpParam)
365 {
366  squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
367  squidaio_request_t *request;
368  HANDLE cond; /* local copy of the event queue because win32 event handles
369  * don't atomically release the mutex as cond variables do. */
370 
371  /* lock the thread info */
372 
373  if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
374  fatal("Can't get ownership of mutex\n");
375  }
376 
377  /* duplicate the handle */
378  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
379  request_queue.cond, /* handle to copy */
380  GetCurrentProcess(), /* pseudo handle, don't close */
381  &cond,
382  0, /* required access */
383  FALSE, /* child process's don't inherit the handle */
384  DUPLICATE_SAME_ACCESS))
385  fatal("Can't duplicate mutex handle\n");
386 
387  if (!ReleaseMutex(request_queue.mutex)) {
388  CloseHandle(cond);
389  fatal("Can't release mutex\n");
390  }
391 
392  Sleep(0);
393 
394  while (1) {
395  DWORD rv;
396  threadp->current_req = request = nullptr;
397  request = nullptr;
398  /* Get a request to process */
399  threadp->status = _THREAD_WAITING;
400 
401  if (threadp->exit) {
402  CloseHandle(request_queue.mutex);
403  CloseHandle(cond);
404  return 0;
405  }
406 
407  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
408 
409  if (rv == WAIT_FAILED) {
410  CloseHandle(cond);
411  return 1;
412  }
413 
414  while (!request_queue.head) {
415  if (!ReleaseMutex(request_queue.mutex)) {
416  CloseHandle(cond);
417  threadp->status = _THREAD_FAILED;
418  return 1;
419  }
420 
421  Sleep(0);
422  rv = WaitForSingleObject(cond, INFINITE);
423 
424  if (rv == WAIT_FAILED) {
425  CloseHandle(cond);
426  return 1;
427  }
428 
429  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
430 
431  if (rv == WAIT_FAILED) {
432  CloseHandle(cond);
433  return 1;
434  }
435  }
436 
437  request = request_queue.head;
438 
439  if (request)
440  request_queue.head = request->next;
441 
442  if (!request_queue.head)
444 
445  if (!ReleaseMutex(request_queue.mutex)) {
446  CloseHandle(cond);
447  return 1;
448  }
449 
450  Sleep(0);
451 
452  /* process the request */
453  threadp->status = _THREAD_BUSY;
454 
455  request->next = nullptr;
456 
457  threadp->current_req = request;
458 
459  errno = 0;
460 
461  if (!request->cancelled) {
462  switch (request->request_type) {
463 
464  case _AIO_OP_OPEN:
465  squidaio_do_open(request);
466  break;
467 
468  case _AIO_OP_READ:
469  squidaio_do_read(request);
470  break;
471 
472  case _AIO_OP_WRITE:
473  squidaio_do_write(request);
474  break;
475 
476  case _AIO_OP_CLOSE:
477  squidaio_do_close(request);
478  break;
479 
480  case _AIO_OP_UNLINK:
481  squidaio_do_unlink(request);
482  break;
483 
484 #if AIO_OPENDIR /* Opendir not implemented yet */
485 
486  case _AIO_OP_OPENDIR:
487  squidaio_do_opendir(request);
488  break;
489 #endif
490 
491  case _AIO_OP_STAT:
492  squidaio_do_stat(request);
493  break;
494 
495  default:
496  request->ret = -1;
497  request->err = EINVAL;
498  break;
499  }
500  } else { /* cancelled */
501  request->ret = -1;
502  request->err = EINTR;
503  }
504 
505  threadp->status = _THREAD_DONE;
506  /* put the request in the done queue */
507  rv = WaitForSingleObject(done_queue.mutex, INFINITE);
508 
509  if (rv == WAIT_FAILED) {
510  CloseHandle(cond);
511  return 1;
512  }
513 
514  *done_queue.tailp = request;
515  done_queue.tailp = &request->next;
516 
517  if (!ReleaseMutex(done_queue.mutex)) {
518  CloseHandle(cond);
519  return 1;
520  }
521 
523  Sleep(0);
524  ++ threadp->requests;
525  } /* while forever */
526 
527  CloseHandle(cond);
528 
529  return 0;
530 } /* squidaio_thread_loop */
531 
532 static void
534 {
535  static int high_start = 0;
536  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
537  /* Mark it as not executed (failing result, no error) */
538  request->ret = -1;
539  request->err = 0;
540  /* Internal housekeeping */
541  request_queue_len += 1;
542  request->resultp->_data = request;
543  /* Play some tricks with the request_queue2 queue */
544  request->next = nullptr;
545 
546  if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
547  if (request_queue2.head) {
548  /* Grab blocked requests */
551  }
552 
553  /* Enqueue request */
554  *request_queue.tailp = request;
555 
556  request_queue.tailp = &request->next;
557 
558  if (!SetEvent(request_queue.cond))
559  fatal("Couldn't push queue");
560 
561  if (!ReleaseMutex(request_queue.mutex)) {
562  /* unexpected error */
563  fatal("Couldn't push queue");
564  }
565 
566  Sleep(0);
567 
568  if (request_queue2.head) {
569  /* Clear queue of blocked requests */
570  request_queue2.head = nullptr;
571  request_queue2.tailp = &request_queue2.head;
572  }
573  } else {
574  /* Oops, the request queue is blocked, use request_queue2 */
575  *request_queue2.tailp = request;
576  request_queue2.tailp = &request->next;
577  }
578 
579  if (request_queue2.head) {
580  static uint64_t filter = 0;
581  static uint64_t filter_limit = 8196;
582 
583  if (++filter >= filter_limit) {
584  filter_limit += filter;
585  filter = 0;
586  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
587  }
588  }
589 
590  /* Warn if out of threads */
591  if (request_queue_len > MAGIC1) {
592  static int last_warn = 0;
593  static size_t queue_high, queue_low;
594 
595  if (high_start == 0) {
596  high_start = (int)squid_curtime;
597  queue_high = request_queue_len;
598  queue_low = request_queue_len;
599  }
600 
601  if (request_queue_len > queue_high)
602  queue_high = request_queue_len;
603 
604  if (request_queue_len < queue_low)
605  queue_low = request_queue_len;
606 
607  if (squid_curtime >= (last_warn + 15) &&
608  squid_curtime >= (high_start + 5)) {
609  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
610 
611  if (squid_curtime >= (high_start + 15))
612  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
613  request_queue_len << ", high=" << queue_high <<
614  ", low=" << queue_low << ", duration=" <<
615  (long int) (squid_curtime - high_start));
616 
617  last_warn = (int)squid_curtime;
618  }
619  } else {
620  high_start = 0;
621  }
622 
623  /* Warn if seriously overloaded */
625  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
626  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
627  squidaio_sync();
628  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
629  }
630 } /* squidaio_queue_request */
631 
632 static void
634 {
635  squidaio_result_t *resultp = requestp->resultp;
636  int cancelled = requestp->cancelled;
637 
638  /* Free allocated structures and copy data back to user space if the */
639  /* request hasn't been cancelled */
640 
641  switch (requestp->request_type) {
642 
643  case _AIO_OP_STAT:
644 
645  if (!cancelled && requestp->ret == 0)
646  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
647 
648  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
649 
650  squidaio_xstrfree(requestp->path);
651 
652  break;
653 
654  case _AIO_OP_OPEN:
655  if (cancelled && requestp->ret >= 0)
656  /* The open() was cancelled but completed */
657  xclose(requestp->ret);
658 
659  squidaio_xstrfree(requestp->path);
660 
661  break;
662 
663  case _AIO_OP_CLOSE:
664  if (cancelled && requestp->ret < 0)
665  /* The close() was cancelled and never got executed */
666  xclose(requestp->fd);
667 
668  break;
669 
670  case _AIO_OP_UNLINK:
671 
672  case _AIO_OP_OPENDIR:
673  squidaio_xstrfree(requestp->path);
674 
675  break;
676 
677  case _AIO_OP_READ:
678  break;
679 
680  case _AIO_OP_WRITE:
681  break;
682 
683  default:
684  break;
685  }
686 
687  if (resultp != NULL && !cancelled) {
688  resultp->aio_return = requestp->ret;
689  resultp->aio_errno = requestp->err;
690  }
691 
692  squidaio_request_pool->freeOne(requestp);
693 } /* squidaio_cleanup_request */
694 
695 int
697 {
698  squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
699 
700  if (request && request->resultp == resultp) {
701  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
702  request->cancelled = 1;
703  request->resultp = nullptr;
704  resultp->_data = nullptr;
705  resultp->result_type = _AIO_OP_NONE;
706  return 0;
707  }
708 
709  return 1;
710 } /* squidaio_cancel */
711 
712 int
713 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
714 {
715  squidaio_init();
716  squidaio_request_t *requestp;
717 
719 
720  requestp->path = (char *) squidaio_xstrdup(path);
721 
722  requestp->oflag = oflag;
723 
724  requestp->mode = mode;
725 
726  requestp->resultp = resultp;
727 
728  requestp->request_type = _AIO_OP_OPEN;
729 
730  requestp->cancelled = 0;
731 
732  resultp->result_type = _AIO_OP_OPEN;
733 
734  squidaio_queue_request(requestp);
735 
736  return 0;
737 }
738 
739 static void
741 {
742  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
743  requestp->err = errno;
744 }
745 
746 int
747 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
748 {
749  squidaio_request_t *requestp;
750 
752 
753  requestp->fd = fd;
754 
755  requestp->bufferp = bufp;
756 
757  requestp->buflen = bufs;
758 
759  requestp->offset = offset;
760 
761  requestp->whence = whence;
762 
763  requestp->resultp = resultp;
764 
765  requestp->request_type = _AIO_OP_READ;
766 
767  requestp->cancelled = 0;
768 
769  resultp->result_type = _AIO_OP_READ;
770 
771  squidaio_queue_request(requestp);
772 
773  return 0;
774 }
775 
776 static void
778 {
779  lseek(requestp->fd, requestp->offset, requestp->whence);
780 
781  if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
782  requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
783  WIN32_maperror(GetLastError());
784  requestp->ret = -1;
785  }
786 
787  requestp->err = errno;
788 }
789 
790 int
791 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
792 {
793  squidaio_request_t *requestp;
794 
796 
797  requestp->fd = fd;
798 
799  requestp->bufferp = bufp;
800 
801  requestp->buflen = bufs;
802 
803  requestp->offset = offset;
804 
805  requestp->whence = whence;
806 
807  requestp->resultp = resultp;
808 
809  requestp->request_type = _AIO_OP_WRITE;
810 
811  requestp->cancelled = 0;
812 
813  resultp->result_type = _AIO_OP_WRITE;
814 
815  squidaio_queue_request(requestp);
816 
817  return 0;
818 }
819 
820 static void
822 {
823  if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
824  requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
825  WIN32_maperror(GetLastError());
826  requestp->ret = -1;
827  }
828 
829  requestp->err = errno;
830 }
831 
832 int
834 {
835  squidaio_request_t *requestp;
836 
838 
839  requestp->fd = fd;
840 
841  requestp->resultp = resultp;
842 
843  requestp->request_type = _AIO_OP_CLOSE;
844 
845  requestp->cancelled = 0;
846 
847  resultp->result_type = _AIO_OP_CLOSE;
848 
849  squidaio_queue_request(requestp);
850 
851  return 0;
852 }
853 
854 static void
856 {
857  if ((requestp->ret = xclose(requestp->fd)) < 0) {
858  debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
859  xclose(requestp->fd);
860  }
861 
862  requestp->err = errno;
863 }
864 
865 int
866 
867 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
868 {
869  squidaio_init();
870  squidaio_request_t *requestp;
871 
873 
874  requestp->path = (char *) squidaio_xstrdup(path);
875 
876  requestp->statp = sb;
877 
878  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
879 
880  requestp->resultp = resultp;
881 
882  requestp->request_type = _AIO_OP_STAT;
883 
884  requestp->cancelled = 0;
885 
886  resultp->result_type = _AIO_OP_STAT;
887 
888  squidaio_queue_request(requestp);
889 
890  return 0;
891 }
892 
893 static void
895 {
896  requestp->ret = stat(requestp->path, requestp->tmpstatp);
897  requestp->err = errno;
898 }
899 
900 int
901 squidaio_unlink(const char *path, squidaio_result_t * resultp)
902 {
903  squidaio_init();
904  squidaio_request_t *requestp;
905 
907 
908  requestp->path = squidaio_xstrdup(path);
909 
910  requestp->resultp = resultp;
911 
912  requestp->request_type = _AIO_OP_UNLINK;
913 
914  requestp->cancelled = 0;
915 
916  resultp->result_type = _AIO_OP_UNLINK;
917 
918  squidaio_queue_request(requestp);
919 
920  return 0;
921 }
922 
923 static void
925 {
926  requestp->ret = unlink(requestp->path);
927  requestp->err = errno;
928 }
929 
930 #if AIO_OPENDIR
931 /* XXX squidaio_opendir NOT implemented yet.. */
932 
933 int
934 squidaio_opendir(const char *path, squidaio_result_t * resultp)
935 {
936  squidaio_request_t *requestp;
937  int len;
938 
939  requestp = squidaio_request_pool->alloc();
940 
941  resultp->result_type = _AIO_OP_OPENDIR;
942 
943  return -1;
944 }
945 
946 static void
947 squidaio_do_opendir(squidaio_request_t * requestp)
948 {
949  /* NOT IMPLEMENTED */
950 }
951 
952 #endif
953 
954 static void
956 {
957  /* kick "overflow" request queue */
958 
959  if (request_queue2.head &&
960  (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
963 
964  if (!SetEvent(request_queue.cond))
965  fatal("couldn't push queue\n");
966 
967  if (!ReleaseMutex(request_queue.mutex)) {
968  /* unexpected error */
969  }
970 
971  Sleep(0);
972  request_queue2.head = nullptr;
973  request_queue2.tailp = &request_queue2.head;
974  }
975 
976  /* poll done queue */
977  if (done_queue.head &&
978  (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
979 
980  struct squidaio_request_t *requests = done_queue.head;
981  done_queue.head = nullptr;
983 
984  if (!ReleaseMutex(done_queue.mutex)) {
985  /* unexpected error */
986  }
987 
988  Sleep(0);
989  *done_requests.tailp = requests;
990  request_queue_len -= 1;
991 
992  while (requests->next) {
993  requests = requests->next;
994  request_queue_len -= 1;
995  }
996 
997  done_requests.tailp = &requests->next;
998  }
999 }
1000 
1003 {
1004  squidaio_request_t *request;
1006  int cancelled;
1007  int polled = 0;
1008 
1009 AIO_REPOLL:
1010  request = done_requests.head;
1011 
1012  if (request == NULL && !polled) {
1015  polled = 1;
1016  request = done_requests.head;
1017  }
1018 
1019  if (!request) {
1020  return nullptr;
1021  }
1022 
1023  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1024  done_requests.head = request->next;
1025 
1026  if (!done_requests.head)
1027  done_requests.tailp = &done_requests.head;
1028 
1029  resultp = request->resultp;
1030 
1031  cancelled = request->cancelled;
1032 
1033  squidaio_debug(request);
1034 
1035  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1036 
1037  squidaio_cleanup_request(request);
1038 
1039  if (cancelled)
1040  goto AIO_REPOLL;
1041 
1042  return resultp;
1043 } /* squidaio_poll_done */
1044 
1045 int
1047 {
1048  return request_queue_len + (done_requests.head ? 1 : 0);
1049 }
1050 
1051 int
1053 {
1054  /* XXX This might take a while if the queue is large.. */
1055 
1056  do {
1058  } while (request_queue_len > 0);
1059 
1060  return squidaio_operations_pending();
1061 }
1062 
1063 int
1065 {
1066  return request_queue_len;
1067 }
1068 
1069 static void
1071 {
1072  switch (request->request_type) {
1073 
1074  case _AIO_OP_OPEN:
1075  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1076  break;
1077 
1078  case _AIO_OP_READ:
1079  debugs(43, 5, "READ on fd: " << request->fd);
1080  break;
1081 
1082  case _AIO_OP_WRITE:
1083  debugs(43, 5, "WRITE on fd: " << request->fd);
1084  break;
1085 
1086  case _AIO_OP_CLOSE:
1087  debugs(43, 5, "CLOSE of fd: " << request->fd);
1088  break;
1089 
1090  case _AIO_OP_UNLINK:
1091  debugs(43, 5, "UNLINK of " << request->path);
1092  break;
1093 
1094  default:
1095  break;
1096  }
1097 }
1098 
1099 void
1101 {
1102  squidaio_thread_t *threadp;
1103 
1104  if (!squidaio_initialised)
1105  return;
1106 
1107  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1108 
1109  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1110 
1111  threadp = threads;
1112 
1113  for (size_t i = 0; i < NUMTHREADS; ++i) {
1114  storeAppendPrintf(sentry, "%zu\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1115  threadp = threadp->next;
1116  }
1117 }
1118 
static HANDLE main_thread
Definition: aiops_win32.cc:138
void fatal(const char *message)
Definition: fatal.cc:28
static void squidaio_poll_queues(void)
Definition: aiops_win32.cc:955
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops_win32.cc:855
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops_win32.cc:821
static squidaio_request_queue_t request_queue
Definition: aiops_win32.cc:117
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71
int squidaio_operations_pending(void)
#define AIO_TINY_BUFS
Definition: aiops_win32.cc:105
#define DBG_CRITICAL
Definition: Stream.h:37
#define xmalloc
static Mem::Allocator * squidaio_large_bufs
Definition: aiops_win32.cc:108
squidaio_result_t * resultp
Definition: aiops.cc:70
struct squidaio_request_t * next
Definition: aiops.cc:53
void squidaio_stats(StoreEntry *sentry)
struct squidaio_request_t squidaio_request_t
unsigned long requests
Definition: aiops.cc:90
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
#define FALSE
Definition: std-includes.h:56
#define AIO_SMALL_BUFS
Definition: aiops_win32.cc:104
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops_win32.cc:533
#define NUMTHREADS
Definition: DiskThreads.h:30
static void NotifyIOCompleted()
Definition: CommIO.h:36
static int squidaio_initialised
Definition: aiops_win32.cc:100
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
#define AIO_MEDIUM_BUFS
Definition: aiops_win32.cc:103
static void squidaio_debug(squidaio_request_t *)
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops_win32.cc:633
unsigned long requests
Definition: aiops.cc:78
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition: Allocator.h:44
_squidaio_thread_status
Definition: aiops.cc:42
pthread_mutex_t mutex
Definition: aiops.cc:74
void squidaio_shutdown(void)
Definition: aiops_win32.cc:321
static char * squidaio_xstrdup(const char *str)
Definition: aiops_win32.cc:172
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
static Mem::Allocator * squidaio_small_bufs
Definition: aiops_win32.cc:110
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops_win32.cc:696
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops_win32.cc:894
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
static void ResetNotifications()
Definition: CommIO.cc:71
squidaio_request_t ** tailp
Definition: aiops_win32.cc:120
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops_win32.cc:901
struct stat * statp
Definition: aiops.cc:69
@ _THREAD_DONE
Definition: aiops_win32.cc:35
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops_win32.cc:833
#define AIO_LARGE_BUFS
Definition: aiops_win32.cc:102
squidaio_request_type request_type
Definition: aiops.cc:54
pthread_t thread
Definition: aiops.cc:86
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops_win32.cc:740
static void squidaio_xstrfree(char *str)
Definition: aiops_win32.cc:193
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops_win32.cc:867
void squidaio_init(void)
Definition: aiops_win32.cc:204
@ _AIO_OP_READ
Definition: DiskThreads.h:48
static void NotifyIOClose()
Definition: CommIO.cc:40
int size
Definition: ModDevPoll.cc:70
void freeOne(void *obj)
return memory reserved by alloc()
Definition: Allocator.h:51
#define NULL
Definition: types.h:145
@ _THREAD_WAITING
Definition: aiops_win32.cc:32
static Mem::Allocator * squidaio_medium_bufs
Definition: aiops_win32.cc:109
int squidaio_sync(void)
#define MAGIC1
Definition: DiskThreads.h:34
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops_win32.cc:777
static Mem::Allocator * squidaio_thread_pool
Definition: aiops_win32.cc:116
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:747
static squidaio_request_queue_t done_queue
Definition: aiops_win32.cc:127
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition: Pool.h:123
squidaio_thread_status status
Definition: aiops.cc:87
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
Definition: aiops_win32.cc:364
@ _THREAD_STARTING
Definition: aiops_win32.cc:31
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:49
#define assert(EX)
Definition: assert.h:17
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
squidaio_request_t * head
Definition: aiops_win32.cc:120
static size_t request_queue_len
Definition: aiops_win32.cc:114
time_t squid_curtime
Definition: stub_libtime.cc:20
@ _THREAD_FAILED
Definition: aiops_win32.cc:34
#define xfree
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
static struct @41 done_requests
int squidaio_get_queue_len(void)
#define AIO_MICRO_BUFS
Definition: aiops_win32.cc:106
#define TRUE
Definition: std-includes.h:55
static Mem::Allocator * squidaio_request_pool
Definition: aiops_win32.cc:115
squidaio_thread_t * next
Definition: aiops.cc:85
squidaio_request_t *volatile head
Definition: aiops.cc:76
size_t buflen
Definition: aiops.cc:61
int volatile exit
Definition: aiops_win32.cc:81
static Mem::Allocator * squidaio_tiny_bufs
Definition: aiops_win32.cc:111
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:791
static void Initialize()
Definition: CommIO.cc:21
void squidaio_xfree(void *p, int size)
Definition: aiops_win32.cc:184
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:77
unsigned short mode_t
Definition: types.h:129
static struct @40 request_queue2
#define DBG_IMPORTANT
Definition: Stream.h:38
int squidaio_opendir(const char *, squidaio_result_t *)
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
@ _THREAD_BUSY
Definition: aiops_win32.cc:33
static Mem::Allocator * squidaio_get_pool(int size)
Definition: aiops_win32.cc:141
struct squidaio_request_t * current_req
Definition: aiops.cc:89
pthread_cond_t cond
Definition: aiops.cc:75
int xclose(int fd)
POSIX close(2) equivalent.
Definition: unistd.h:43
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops_win32.cc:924
unsigned long blocked
Definition: aiops.cc:79
struct squidaio_request_queue_t squidaio_request_queue_t
#define RIDICULOUS_LENGTH
Definition: aiops_win32.cc:28
char * bufferp
Definition: aiops.cc:60
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
void * squidaio_xmalloc(int size)
Definition: aiops_win32.cc:160
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
static squidaio_thread_t * threads
Definition: aiops_win32.cc:99
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops_win32.cc:713
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops_win32.cc:37
static Mem::Allocator * squidaio_micro_bufs
Definition: aiops_win32.cc:112
int unsigned int
Definition: stub_fd.cc:19
squidaio_result_t * squidaio_poll_done(void)
struct stat * tmpstatp
Definition: aiops.cc:67

 

Introduction

Documentation

Support

Miscellaneous