aiops_win32.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 43 Windows AIOPS */
10 
11 #include "squid.h"
13 #include "DiskThreads.h"
14 #include "fd.h"
15 #include "mem/Pool.h"
16 #include "SquidConfig.h"
17 #include "Store.h"
18 
19 #include <cerrno>
20 #include <csignal>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <dirent.h>
24 
25 #define RIDICULOUS_LENGTH 4096
26 
33 };
35 
36 typedef struct squidaio_request_t {
37 
38  struct squidaio_request_t *next;
40  int cancelled;
41  char *path;
42  int oflag;
43  mode_t mode;
44  int fd;
45  char *bufferp;
46  char *tmpbufp;
47  size_t buflen;
48  off_t offset;
49  int whence;
50  int ret;
51  int err;
52 
53  struct stat *tmpstatp;
54 
55  struct stat *statp;
58 
59 typedef struct squidaio_request_queue_t {
60  HANDLE mutex;
61  HANDLE cond; /* See Event objects */
62  squidaio_request_t *volatile head;
63  squidaio_request_t *volatile *volatile tailp;
64  unsigned long requests;
65  unsigned long blocked; /* main failed to lock the queue */
67 
69 
70 struct squidaio_thread_t {
72  HANDLE thread;
73  DWORD dwThreadId; /* thread ID */
75 
77  unsigned long requests;
78  int volatile exit;
79 };
80 
83 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
90 #if AIO_OPENDIR
91 static void *squidaio_do_opendir(squidaio_request_t *);
92 #endif
93 static void squidaio_debug(squidaio_request_t *);
94 static void squidaio_poll_queues(void);
95 
96 static squidaio_thread_t *threads = nullptr;
97 static int squidaio_initialised = 0;
98 
99 #define AIO_LARGE_BUFS 16384
100 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
101 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
102 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
103 #define AIO_MICRO_BUFS 128
104 
105 static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
106 static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
107 static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
108 static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
109 static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
110 
111 static int request_queue_len = 0;
115 
116 static struct {
118 }
119 
120 request_queue2 = {
121 
122  nullptr, &request_queue2.head
123 };
125 
126 static struct {
128 }
129 
130 done_requests = {
131 
132  nullptr, &done_requests.head
133 };
134 
135 static HANDLE main_thread;
136 
137 static Mem::Allocator *
139 {
140  if (size <= AIO_LARGE_BUFS) {
141  if (size <= AIO_MICRO_BUFS)
142  return squidaio_micro_bufs;
143  else if (size <= AIO_TINY_BUFS)
144  return squidaio_tiny_bufs;
145  else if (size <= AIO_SMALL_BUFS)
146  return squidaio_small_bufs;
147  else if (size <= AIO_MEDIUM_BUFS)
148  return squidaio_medium_bufs;
149  else
150  return squidaio_large_bufs;
151  }
152 
153  return nullptr;
154 }
155 
156 void *
158 {
159  void *p;
160  if (const auto pool = squidaio_get_pool(size)) {
161  p = pool->alloc();
162  } else
163  p = xmalloc(size);
164 
165  return p;
166 }
167 
168 static char *
169 squidaio_xstrdup(const char *str)
170 {
171  char *p;
172  int len = strlen(str) + 1;
173 
174  p = (char *)squidaio_xmalloc(len);
175  strncpy(p, str, len);
176 
177  return p;
178 }
179 
180 void
181 squidaio_xfree(void *p, int size)
182 {
183  if (const auto pool = squidaio_get_pool(size)) {
184  pool->freeOne(p);
185  } else
186  xfree(p);
187 }
188 
189 static void
191 {
192  int len = strlen(str) + 1;
193 
194  if (const auto pool = squidaio_get_pool(len)) {
195  pool->freeOne(str);
196  } else
197  xfree(str);
198 }
199 
200 void
202 {
203  int i;
204  squidaio_thread_t *threadp;
205 
207  return;
208 
209  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
210  GetCurrentThread(), /* pseudo handle to copy */
211  GetCurrentProcess(), /* pseudo handle, don't close */
212  &main_thread,
213  0, /* required access */
214  FALSE, /* child process's don't inherit the handle */
215  DUPLICATE_SAME_ACCESS)) {
216  /* spit errors */
217  fatal("Couldn't get current thread handle");
218  }
219 
220  /* Initialize request queue */
221  if ((request_queue.mutex = CreateMutex(nullptr, /* no inheritance */
222  FALSE, /* start unowned (as per mutex_init) */
223  nullptr) /* no name */
224  ) == NULL) {
225  fatal("Failed to create mutex");
226  }
227 
228  if ((request_queue.cond = CreateEvent(nullptr, /* no inheritance */
229  FALSE, /* auto signal reset - which I think is pthreads like ? */
230  FALSE, /* start non signaled */
231  nullptr) /* no name */
232  ) == NULL) {
233  fatal("Failed to create condition variable");
234  }
235 
236  request_queue.head = nullptr;
237 
239 
241 
243 
244  /* Initialize done queue */
245 
246  if ((done_queue.mutex = CreateMutex(nullptr, /* no inheritance */
247  FALSE, /* start unowned (as per mutex_init) */
248  nullptr) /* no name */
249  ) == NULL) {
250  fatal("Failed to create mutex");
251  }
252 
253  if ((done_queue.cond = CreateEvent(nullptr, /* no inheritance */
254  TRUE, /* manually signaled - which I think is pthreads like ? */
255  FALSE, /* start non signaled */
256  nullptr) /* no name */
257  ) == NULL) {
258  fatal("Failed to create condition variable");
259  }
260 
261  done_queue.head = nullptr;
262 
264 
265  done_queue.requests = 0;
266 
267  done_queue.blocked = 0;
268 
269  // Initialize the thread I/O pipes before creating any threads
270  // see bug 3189 comment 5 about race conditions.
272 
273  /* Create threads and get them to sit in their wait loop */
274  squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
275 
276  assert(NUMTHREADS > 0);
277 
278  for (i = 0; i < NUMTHREADS; ++i) {
280  threadp->status = _THREAD_STARTING;
281  threadp->current_req = nullptr;
282  threadp->requests = 0;
283  threadp->next = threads;
284  threads = threadp;
285 
286  if ((threadp->thread = CreateThread(nullptr, /* no security attributes */
287  0, /* use default stack size */
288  squidaio_thread_loop, /* thread function */
289  threadp, /* argument to thread function */
290  0, /* use default creation flags */
291  &(threadp->dwThreadId)) /* returns the thread identifier */
292  ) == NULL) {
293  fprintf(stderr, "Thread creation failed\n");
294  threadp->status = _THREAD_FAILED;
295  continue;
296  }
297 
298  /* Set the new thread priority above parent process */
299  SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
300  }
301 
302  /* Create request pool */
303  squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
304 
305  squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
306 
307  squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
308 
309  squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
310 
311  squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
312 
313  squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
314 
316 }
317 
318 void
320 {
321  squidaio_thread_t *threadp;
322  int i;
323  HANDLE * hthreads;
324 
326  return;
327 
328  /* This is the same as in squidaio_sync */
329  do {
331  } while (request_queue_len > 0);
332 
333  hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
334 
335  threadp = threads;
336 
337  for (i = 0; i < NUMTHREADS; ++i) {
338  threadp->exit = 1;
339  hthreads[i] = threadp->thread;
340  threadp = threadp->next;
341  }
342 
343  ReleaseMutex(request_queue.mutex);
344  ResetEvent(request_queue.cond);
345  ReleaseMutex(done_queue.mutex);
346  ResetEvent(done_queue.cond);
347  Sleep(0);
348 
349  WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
350 
351  for (i = 0; i < NUMTHREADS; ++i) {
352  CloseHandle(hthreads[i]);
353  }
354 
355  CloseHandle(main_thread);
357 
359  xfree(hthreads);
360 }
361 
362 static DWORD WINAPI
363 squidaio_thread_loop(LPVOID lpParam)
364 {
365  squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
366  squidaio_request_t *request;
367  HANDLE cond; /* local copy of the event queue because win32 event handles
368  * don't atomically release the mutex as cond variables do. */
369 
370  /* lock the thread info */
371 
372  if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
373  fatal("Can't get ownership of mutex\n");
374  }
375 
376  /* duplicate the handle */
377  if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
378  request_queue.cond, /* handle to copy */
379  GetCurrentProcess(), /* pseudo handle, don't close */
380  &cond,
381  0, /* required access */
382  FALSE, /* child process's don't inherit the handle */
383  DUPLICATE_SAME_ACCESS))
384  fatal("Can't duplicate mutex handle\n");
385 
386  if (!ReleaseMutex(request_queue.mutex)) {
387  CloseHandle(cond);
388  fatal("Can't release mutex\n");
389  }
390 
391  Sleep(0);
392 
393  while (1) {
394  DWORD rv;
395  threadp->current_req = request = nullptr;
396  request = nullptr;
397  /* Get a request to process */
398  threadp->status = _THREAD_WAITING;
399 
400  if (threadp->exit) {
401  CloseHandle(request_queue.mutex);
402  CloseHandle(cond);
403  return 0;
404  }
405 
406  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
407 
408  if (rv == WAIT_FAILED) {
409  CloseHandle(cond);
410  return 1;
411  }
412 
413  while (!request_queue.head) {
414  if (!ReleaseMutex(request_queue.mutex)) {
415  CloseHandle(cond);
416  threadp->status = _THREAD_FAILED;
417  return 1;
418  }
419 
420  Sleep(0);
421  rv = WaitForSingleObject(cond, INFINITE);
422 
423  if (rv == WAIT_FAILED) {
424  CloseHandle(cond);
425  return 1;
426  }
427 
428  rv = WaitForSingleObject(request_queue.mutex, INFINITE);
429 
430  if (rv == WAIT_FAILED) {
431  CloseHandle(cond);
432  return 1;
433  }
434  }
435 
436  request = request_queue.head;
437 
438  if (request)
439  request_queue.head = request->next;
440 
441  if (!request_queue.head)
443 
444  if (!ReleaseMutex(request_queue.mutex)) {
445  CloseHandle(cond);
446  return 1;
447  }
448 
449  Sleep(0);
450 
451  /* process the request */
452  threadp->status = _THREAD_BUSY;
453 
454  request->next = nullptr;
455 
456  threadp->current_req = request;
457 
458  errno = 0;
459 
460  if (!request->cancelled) {
461  switch (request->request_type) {
462 
463  case _AIO_OP_OPEN:
464  squidaio_do_open(request);
465  break;
466 
467  case _AIO_OP_READ:
468  squidaio_do_read(request);
469  break;
470 
471  case _AIO_OP_WRITE:
472  squidaio_do_write(request);
473  break;
474 
475  case _AIO_OP_CLOSE:
476  squidaio_do_close(request);
477  break;
478 
479  case _AIO_OP_UNLINK:
480  squidaio_do_unlink(request);
481  break;
482 
483 #if AIO_OPENDIR /* Opendir not implemented yet */
484 
485  case _AIO_OP_OPENDIR:
486  squidaio_do_opendir(request);
487  break;
488 #endif
489 
490  case _AIO_OP_STAT:
491  squidaio_do_stat(request);
492  break;
493 
494  default:
495  request->ret = -1;
496  request->err = EINVAL;
497  break;
498  }
499  } else { /* cancelled */
500  request->ret = -1;
501  request->err = EINTR;
502  }
503 
504  threadp->status = _THREAD_DONE;
505  /* put the request in the done queue */
506  rv = WaitForSingleObject(done_queue.mutex, INFINITE);
507 
508  if (rv == WAIT_FAILED) {
509  CloseHandle(cond);
510  return 1;
511  }
512 
513  *done_queue.tailp = request;
514  done_queue.tailp = &request->next;
515 
516  if (!ReleaseMutex(done_queue.mutex)) {
517  CloseHandle(cond);
518  return 1;
519  }
520 
522  Sleep(0);
523  ++ threadp->requests;
524  } /* while forever */
525 
526  CloseHandle(cond);
527 
528  return 0;
529 } /* squidaio_thread_loop */
530 
531 static void
533 {
534  static int high_start = 0;
535  debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
536  /* Mark it as not executed (failing result, no error) */
537  request->ret = -1;
538  request->err = 0;
539  /* Internal housekeeping */
540  request_queue_len += 1;
541  request->resultp->_data = request;
542  /* Play some tricks with the request_queue2 queue */
543  request->next = nullptr;
544 
545  if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
546  if (request_queue2.head) {
547  /* Grab blocked requests */
550  }
551 
552  /* Enqueue request */
553  *request_queue.tailp = request;
554 
555  request_queue.tailp = &request->next;
556 
557  if (!SetEvent(request_queue.cond))
558  fatal("Couldn't push queue");
559 
560  if (!ReleaseMutex(request_queue.mutex)) {
561  /* unexpected error */
562  fatal("Couldn't push queue");
563  }
564 
565  Sleep(0);
566 
567  if (request_queue2.head) {
568  /* Clear queue of blocked requests */
569  request_queue2.head = nullptr;
570  request_queue2.tailp = &request_queue2.head;
571  }
572  } else {
573  /* Oops, the request queue is blocked, use request_queue2 */
574  *request_queue2.tailp = request;
575  request_queue2.tailp = &request->next;
576  }
577 
578  if (request_queue2.head) {
579  static uint64_t filter = 0;
580  static uint64_t filter_limit = 8196;
581 
582  if (++filter >= filter_limit) {
583  filter_limit += filter;
584  filter = 0;
585  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
586  }
587  }
588 
589  /* Warn if out of threads */
590  if (request_queue_len > MAGIC1) {
591  static int last_warn = 0;
592  static int queue_high, queue_low;
593 
594  if (high_start == 0) {
595  high_start = (int)squid_curtime;
596  queue_high = request_queue_len;
597  queue_low = request_queue_len;
598  }
599 
600  if (request_queue_len > queue_high)
601  queue_high = request_queue_len;
602 
603  if (request_queue_len < queue_low)
604  queue_low = request_queue_len;
605 
606  if (squid_curtime >= (last_warn + 15) &&
607  squid_curtime >= (high_start + 5)) {
608  debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
609 
610  if (squid_curtime >= (high_start + 15))
611  debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
612  request_queue_len << ", high=" << queue_high <<
613  ", low=" << queue_low << ", duration=" <<
614  (long int) (squid_curtime - high_start));
615 
616  last_warn = (int)squid_curtime;
617  }
618  } else {
619  high_start = 0;
620  }
621 
622  /* Warn if seriously overloaded */
624  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
625  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
626  squidaio_sync();
627  debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
628  }
629 } /* squidaio_queue_request */
630 
631 static void
633 {
634  squidaio_result_t *resultp = requestp->resultp;
635  int cancelled = requestp->cancelled;
636 
637  /* Free allocated structures and copy data back to user space if the */
638  /* request hasn't been cancelled */
639 
640  switch (requestp->request_type) {
641 
642  case _AIO_OP_STAT:
643 
644  if (!cancelled && requestp->ret == 0)
645  memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
646 
647  squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
648 
649  squidaio_xstrfree(requestp->path);
650 
651  break;
652 
653  case _AIO_OP_OPEN:
654  if (cancelled && requestp->ret >= 0)
655  /* The open() was cancelled but completed */
656  close(requestp->ret);
657 
658  squidaio_xstrfree(requestp->path);
659 
660  break;
661 
662  case _AIO_OP_CLOSE:
663  if (cancelled && requestp->ret < 0)
664  /* The close() was cancelled and never got executed */
665  close(requestp->fd);
666 
667  break;
668 
669  case _AIO_OP_UNLINK:
670 
671  case _AIO_OP_OPENDIR:
672  squidaio_xstrfree(requestp->path);
673 
674  break;
675 
676  case _AIO_OP_READ:
677  break;
678 
679  case _AIO_OP_WRITE:
680  break;
681 
682  default:
683  break;
684  }
685 
686  if (resultp != NULL && !cancelled) {
687  resultp->aio_return = requestp->ret;
688  resultp->aio_errno = requestp->err;
689  }
690 
691  squidaio_request_pool->freeOne(requestp);
692 } /* squidaio_cleanup_request */
693 
694 int
696 {
697  squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
698 
699  if (request && request->resultp == resultp) {
700  debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
701  request->cancelled = 1;
702  request->resultp = nullptr;
703  resultp->_data = nullptr;
704  resultp->result_type = _AIO_OP_NONE;
705  return 0;
706  }
707 
708  return 1;
709 } /* squidaio_cancel */
710 
711 int
712 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
713 {
714  squidaio_init();
715  squidaio_request_t *requestp;
716 
718 
719  requestp->path = (char *) squidaio_xstrdup(path);
720 
721  requestp->oflag = oflag;
722 
723  requestp->mode = mode;
724 
725  requestp->resultp = resultp;
726 
727  requestp->request_type = _AIO_OP_OPEN;
728 
729  requestp->cancelled = 0;
730 
731  resultp->result_type = _AIO_OP_OPEN;
732 
733  squidaio_queue_request(requestp);
734 
735  return 0;
736 }
737 
738 static void
740 {
741  requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
742  requestp->err = errno;
743 }
744 
745 int
746 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
747 {
748  squidaio_request_t *requestp;
749 
751 
752  requestp->fd = fd;
753 
754  requestp->bufferp = bufp;
755 
756  requestp->buflen = bufs;
757 
758  requestp->offset = offset;
759 
760  requestp->whence = whence;
761 
762  requestp->resultp = resultp;
763 
764  requestp->request_type = _AIO_OP_READ;
765 
766  requestp->cancelled = 0;
767 
768  resultp->result_type = _AIO_OP_READ;
769 
770  squidaio_queue_request(requestp);
771 
772  return 0;
773 }
774 
775 static void
777 {
778  lseek(requestp->fd, requestp->offset, requestp->whence);
779 
780  if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
781  requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
782  WIN32_maperror(GetLastError());
783  requestp->ret = -1;
784  }
785 
786  requestp->err = errno;
787 }
788 
789 int
790 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
791 {
792  squidaio_request_t *requestp;
793 
795 
796  requestp->fd = fd;
797 
798  requestp->bufferp = bufp;
799 
800  requestp->buflen = bufs;
801 
802  requestp->offset = offset;
803 
804  requestp->whence = whence;
805 
806  requestp->resultp = resultp;
807 
808  requestp->request_type = _AIO_OP_WRITE;
809 
810  requestp->cancelled = 0;
811 
812  resultp->result_type = _AIO_OP_WRITE;
813 
814  squidaio_queue_request(requestp);
815 
816  return 0;
817 }
818 
819 static void
821 {
822  if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
823  requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
824  WIN32_maperror(GetLastError());
825  requestp->ret = -1;
826  }
827 
828  requestp->err = errno;
829 }
830 
831 int
833 {
834  squidaio_request_t *requestp;
835 
837 
838  requestp->fd = fd;
839 
840  requestp->resultp = resultp;
841 
842  requestp->request_type = _AIO_OP_CLOSE;
843 
844  requestp->cancelled = 0;
845 
846  resultp->result_type = _AIO_OP_CLOSE;
847 
848  squidaio_queue_request(requestp);
849 
850  return 0;
851 }
852 
853 static void
855 {
856  if ((requestp->ret = close(requestp->fd)) < 0) {
857  debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
858  close(requestp->fd);
859  }
860 
861  requestp->err = errno;
862 }
863 
864 int
865 
866 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
867 {
868  squidaio_init();
869  squidaio_request_t *requestp;
870 
872 
873  requestp->path = (char *) squidaio_xstrdup(path);
874 
875  requestp->statp = sb;
876 
877  requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
878 
879  requestp->resultp = resultp;
880 
881  requestp->request_type = _AIO_OP_STAT;
882 
883  requestp->cancelled = 0;
884 
885  resultp->result_type = _AIO_OP_STAT;
886 
887  squidaio_queue_request(requestp);
888 
889  return 0;
890 }
891 
892 static void
894 {
895  requestp->ret = stat(requestp->path, requestp->tmpstatp);
896  requestp->err = errno;
897 }
898 
899 int
900 squidaio_unlink(const char *path, squidaio_result_t * resultp)
901 {
902  squidaio_init();
903  squidaio_request_t *requestp;
904 
906 
907  requestp->path = squidaio_xstrdup(path);
908 
909  requestp->resultp = resultp;
910 
911  requestp->request_type = _AIO_OP_UNLINK;
912 
913  requestp->cancelled = 0;
914 
915  resultp->result_type = _AIO_OP_UNLINK;
916 
917  squidaio_queue_request(requestp);
918 
919  return 0;
920 }
921 
922 static void
924 {
925  requestp->ret = unlink(requestp->path);
926  requestp->err = errno;
927 }
928 
929 #if AIO_OPENDIR
930 /* XXX squidaio_opendir NOT implemented yet.. */
931 
932 int
933 squidaio_opendir(const char *path, squidaio_result_t * resultp)
934 {
935  squidaio_request_t *requestp;
936  int len;
937 
938  requestp = squidaio_request_pool->alloc();
939 
940  resultp->result_type = _AIO_OP_OPENDIR;
941 
942  return -1;
943 }
944 
945 static void
946 squidaio_do_opendir(squidaio_request_t * requestp)
947 {
948  /* NOT IMPLEMENTED */
949 }
950 
951 #endif
952 
953 static void
955 {
956  /* kick "overflow" request queue */
957 
958  if (request_queue2.head &&
959  (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
962 
963  if (!SetEvent(request_queue.cond))
964  fatal("couldn't push queue\n");
965 
966  if (!ReleaseMutex(request_queue.mutex)) {
967  /* unexpected error */
968  }
969 
970  Sleep(0);
971  request_queue2.head = nullptr;
972  request_queue2.tailp = &request_queue2.head;
973  }
974 
975  /* poll done queue */
976  if (done_queue.head &&
977  (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
978 
979  struct squidaio_request_t *requests = done_queue.head;
980  done_queue.head = nullptr;
982 
983  if (!ReleaseMutex(done_queue.mutex)) {
984  /* unexpected error */
985  }
986 
987  Sleep(0);
988  *done_requests.tailp = requests;
989  request_queue_len -= 1;
990 
991  while (requests->next) {
992  requests = requests->next;
993  request_queue_len -= 1;
994  }
995 
996  done_requests.tailp = &requests->next;
997  }
998 }
999 
1002 {
1003  squidaio_request_t *request;
1005  int cancelled;
1006  int polled = 0;
1007 
1008 AIO_REPOLL:
1009  request = done_requests.head;
1010 
1011  if (request == NULL && !polled) {
1014  polled = 1;
1015  request = done_requests.head;
1016  }
1017 
1018  if (!request) {
1019  return nullptr;
1020  }
1021 
1022  debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1023  done_requests.head = request->next;
1024 
1025  if (!done_requests.head)
1026  done_requests.tailp = &done_requests.head;
1027 
1028  resultp = request->resultp;
1029 
1030  cancelled = request->cancelled;
1031 
1032  squidaio_debug(request);
1033 
1034  debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1035 
1036  squidaio_cleanup_request(request);
1037 
1038  if (cancelled)
1039  goto AIO_REPOLL;
1040 
1041  return resultp;
1042 } /* squidaio_poll_done */
1043 
1044 int
1046 {
1047  return request_queue_len + (done_requests.head ? 1 : 0);
1048 }
1049 
1050 int
1052 {
1053  /* XXX This might take a while if the queue is large.. */
1054 
1055  do {
1057  } while (request_queue_len > 0);
1058 
1059  return squidaio_operations_pending();
1060 }
1061 
1062 int
1064 {
1065  return request_queue_len;
1066 }
1067 
1068 static void
1070 {
1071  switch (request->request_type) {
1072 
1073  case _AIO_OP_OPEN:
1074  debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1075  break;
1076 
1077  case _AIO_OP_READ:
1078  debugs(43, 5, "READ on fd: " << request->fd);
1079  break;
1080 
1081  case _AIO_OP_WRITE:
1082  debugs(43, 5, "WRITE on fd: " << request->fd);
1083  break;
1084 
1085  case _AIO_OP_CLOSE:
1086  debugs(43, 5, "CLOSE of fd: " << request->fd);
1087  break;
1088 
1089  case _AIO_OP_UNLINK:
1090  debugs(43, 5, "UNLINK of " << request->path);
1091  break;
1092 
1093  default:
1094  break;
1095  }
1096 }
1097 
1098 void
1100 {
1101  squidaio_thread_t *threadp;
1102  int i;
1103 
1104  if (!squidaio_initialised)
1105  return;
1106 
1107  storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1108 
1109  storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1110 
1111  threadp = threads;
1112 
1113  for (i = 0; i < NUMTHREADS; ++i) {
1114  storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1115  threadp = threadp->next;
1116  }
1117 }
1118 
static HANDLE main_thread
Definition: aiops_win32.cc:135
void fatal(const char *message)
Definition: fatal.cc:28
static void squidaio_poll_queues(void)
Definition: aiops_win32.cc:954
static void squidaio_do_close(squidaio_request_t *)
Definition: aiops_win32.cc:854
static void squidaio_do_write(squidaio_request_t *)
Definition: aiops_win32.cc:820
static squidaio_request_queue_t request_queue
Definition: aiops_win32.cc:114
void * xcalloc(size_t n, size_t sz)
Definition: xalloc.cc:71
int squidaio_operations_pending(void)
#define AIO_TINY_BUFS
Definition: aiops_win32.cc:102
#define DBG_CRITICAL
Definition: Stream.h:37
#define xmalloc
static Mem::Allocator * squidaio_large_bufs
Definition: aiops_win32.cc:105
squidaio_result_t * resultp
Definition: aiops.cc:68
struct squidaio_request_t * next
Definition: aiops.cc:51
void squidaio_stats(StoreEntry *sentry)
struct squidaio_request_t squidaio_request_t
unsigned long requests
Definition: aiops.cc:88
@ _AIO_OP_OPENDIR
Definition: DiskThreads.h:52
#define FALSE
Definition: std-includes.h:56
#define AIO_SMALL_BUFS
Definition: aiops_win32.cc:101
static void squidaio_queue_request(squidaio_request_t *)
Definition: aiops_win32.cc:532
#define NUMTHREADS
Definition: DiskThreads.h:30
static void NotifyIOCompleted()
Definition: CommIO.h:36
static int squidaio_initialised
Definition: aiops_win32.cc:97
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
#define AIO_MEDIUM_BUFS
Definition: aiops_win32.cc:100
static void squidaio_debug(squidaio_request_t *)
static void squidaio_cleanup_request(squidaio_request_t *)
Definition: aiops_win32.cc:632
unsigned long requests
Definition: aiops.cc:76
void * alloc()
provide (and reserve) memory suitable for storing one object
Definition: Allocator.h:44
_squidaio_thread_status
Definition: aiops.cc:40
pthread_mutex_t mutex
Definition: aiops.cc:72
void squidaio_shutdown(void)
Definition: aiops_win32.cc:319
static char * squidaio_xstrdup(const char *str)
Definition: aiops_win32.cc:169
@ _AIO_OP_STAT
Definition: DiskThreads.h:53
static Mem::Allocator * squidaio_small_bufs
Definition: aiops_win32.cc:107
int squidaio_cancel(squidaio_result_t *resultp)
Definition: aiops_win32.cc:695
static void squidaio_do_stat(squidaio_request_t *)
Definition: aiops_win32.cc:893
enum _squidaio_request_type result_type
Definition: DiskThreads.h:64
static void ResetNotifications()
Definition: CommIO.cc:69
squidaio_request_t ** tailp
Definition: aiops_win32.cc:117
int squidaio_unlink(const char *path, squidaio_result_t *resultp)
Definition: aiops_win32.cc:900
struct stat * statp
Definition: aiops.cc:67
@ _THREAD_DONE
Definition: aiops_win32.cc:32
int squidaio_close(int fd, squidaio_result_t *resultp)
Definition: aiops_win32.cc:832
#define AIO_LARGE_BUFS
Definition: aiops_win32.cc:99
squidaio_request_type request_type
Definition: aiops.cc:52
pthread_t thread
Definition: aiops.cc:84
static void squidaio_do_open(squidaio_request_t *)
Definition: aiops_win32.cc:739
static void squidaio_xstrfree(char *str)
Definition: aiops_win32.cc:190
int squidaio_stat(const char *path, struct stat *sb, squidaio_result_t *resultp)
Definition: aiops_win32.cc:866
void squidaio_init(void)
Definition: aiops_win32.cc:201
static struct @45 done_requests
@ _AIO_OP_READ
Definition: DiskThreads.h:48
static void NotifyIOClose()
Definition: CommIO.cc:38
int size
Definition: ModDevPoll.cc:69
void freeOne(void *obj)
return memory reserved by alloc()
Definition: Allocator.h:51
#define NULL
Definition: types.h:145
@ _THREAD_WAITING
Definition: aiops_win32.cc:29
static Mem::Allocator * squidaio_medium_bufs
Definition: aiops_win32.cc:106
static int request_queue_len
Definition: aiops_win32.cc:111
int squidaio_sync(void)
#define MAGIC1
Definition: DiskThreads.h:34
static void squidaio_do_read(squidaio_request_t *)
Definition: aiops_win32.cc:776
static Mem::Allocator * squidaio_thread_pool
Definition: aiops_win32.cc:113
int squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:746
static squidaio_request_queue_t done_queue
Definition: aiops_win32.cc:124
#define memPoolCreate
Creates a named MemPool of elements with the given size.
Definition: Pool.h:123
squidaio_thread_status status
Definition: aiops.cc:85
static DWORD WINAPI squidaio_thread_loop(LPVOID lpParam)
Definition: aiops_win32.cc:363
@ _THREAD_STARTING
Definition: aiops_win32.cc:28
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops.cc:47
#define assert(EX)
Definition: assert.h:17
@ _AIO_OP_CLOSE
Definition: DiskThreads.h:50
squidaio_request_t * head
Definition: aiops_win32.cc:117
time_t squid_curtime
Definition: stub_libtime.cc:20
@ _THREAD_FAILED
Definition: aiops_win32.cc:31
#define xfree
enum _squidaio_request_type squidaio_request_type
Definition: DiskThreads.h:55
int squidaio_get_queue_len(void)
#define AIO_MICRO_BUFS
Definition: aiops_win32.cc:103
#define TRUE
Definition: std-includes.h:55
static Mem::Allocator * squidaio_request_pool
Definition: aiops_win32.cc:112
squidaio_thread_t * next
Definition: aiops.cc:83
squidaio_request_t *volatile head
Definition: aiops.cc:74
size_t buflen
Definition: aiops.cc:59
int volatile exit
Definition: aiops_win32.cc:78
static Mem::Allocator * squidaio_tiny_bufs
Definition: aiops_win32.cc:108
int squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t *resultp)
Definition: aiops_win32.cc:790
static void Initialize()
Definition: CommIO.cc:19
static struct @44 request_queue2
void squidaio_xfree(void *p, int size)
Definition: aiops_win32.cc:181
squidaio_request_t *volatile *volatile tailp
Definition: aiops.cc:75
unsigned short mode_t
Definition: types.h:129
#define DBG_IMPORTANT
Definition: Stream.h:38
int squidaio_opendir(const char *, squidaio_result_t *)
@ _AIO_OP_OPEN
Definition: DiskThreads.h:47
@ _THREAD_BUSY
Definition: aiops_win32.cc:30
static Mem::Allocator * squidaio_get_pool(int size)
Definition: aiops_win32.cc:138
struct squidaio_request_t * current_req
Definition: aiops.cc:87
pthread_cond_t cond
Definition: aiops.cc:73
static void squidaio_do_unlink(squidaio_request_t *)
Definition: aiops_win32.cc:923
unsigned long blocked
Definition: aiops.cc:77
struct squidaio_request_queue_t squidaio_request_queue_t
#define RIDICULOUS_LENGTH
Definition: aiops_win32.cc:25
char * bufferp
Definition: aiops.cc:58
@ _AIO_OP_WRITE
Definition: DiskThreads.h:49
void * squidaio_xmalloc(int size)
Definition: aiops_win32.cc:157
@ _AIO_OP_UNLINK
Definition: DiskThreads.h:51
@ _AIO_OP_NONE
Definition: DiskThreads.h:46
static squidaio_thread_t * threads
Definition: aiops_win32.cc:96
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
int squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t *resultp)
Definition: aiops_win32.cc:712
enum _squidaio_thread_status squidaio_thread_status
Definition: aiops_win32.cc:34
static Mem::Allocator * squidaio_micro_bufs
Definition: aiops_win32.cc:109
int unsigned int
Definition: stub_fd.cc:19
squidaio_result_t * squidaio_poll_done(void)
struct stat * tmpstatp
Definition: aiops.cc:65

 

Introduction

Documentation

Support

Miscellaneous