helper.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 84 Helper process maintenance */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "base/Raw.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
20 #include "fd.h"
21 #include "fde.h"
22 #include "format/Quoting.h"
23 #include "helper.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
26 #include "MemBuf.h"
27 #include "SquidConfig.h"
28 #include "SquidIpc.h"
29 #include "SquidMath.h"
30 #include "Store.h"
31 #include "wordlist.h"
32 
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
34 #include "mem/Pool.h"
35 
36 #define HELPER_MAX_ARGS 64
37 
39 #define MAX_RETRIES 2
40 
42 const size_t ReadBufSize(32*1024);
43 
46 static void Enqueue(Helper::Client *, Helper::Xaction *);
51 static void helperKickQueue(const Helper::Client::Pointer &);
54 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
55 
58 
60 
61 void
63 {
64  stats.uses=0;
65  stats.replies=0;
66  stats.pending=0;
67  stats.releases=0;
68  stats.timedout = 0;
69 }
70 
71 void
73 {
74 #if _SQUID_WINDOWS_
75  shutdown(writePipe->fd, SD_BOTH);
76 #endif
77 
78  flags.closing = true;
79  if (readPipe->fd == writePipe->fd)
80  readPipe->fd = -1;
81  else
82  readPipe->close();
83  writePipe->close();
84 
85 #if _SQUID_WINDOWS_
86  if (hIpc) {
87  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
89  debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
90  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
91  }
92  CloseHandle(hIpc);
93  }
94 #endif
95 }
96 
97 void
99 {
100 #if _SQUID_WINDOWS_
101  shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
102 #endif
103 
104  flags.closing = true;
105  if (readPipe->fd == writePipe->fd)
106  readPipe->fd = -1;
107  writePipe->close();
108 
109 #if _SQUID_WINDOWS_
110  if (hIpc) {
111  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
112  getCurrentTime();
113  debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
114  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
115  }
116  CloseHandle(hIpc);
117  }
118 #endif
119 }
120 
121 void
123 {
124  while (!requests.empty()) {
125  // XXX: re-schedule these on another helper?
126  const auto r = requests.front();
127  requests.pop_front();
128  r->reply.result = Helper::Unknown;
129  helper().callBack(*r);
130  delete r;
131  }
132 }
133 
135 {
136  if (rbuf) {
137  memFreeBuf(rbuf_sz, rbuf);
138  rbuf = nullptr;
139  }
140 }
141 
143 {
144  wqueue->clean();
145  delete wqueue;
146 
147  if (writebuf) {
148  writebuf->clean();
149  delete writebuf;
150  writebuf = nullptr;
151  }
152 
153  if (Comm::IsConnOpen(writePipe))
154  closeWritePipeSafely();
155 
156  dlinkDelete(&link, &parent->servers);
157 
158  assert(parent->childs.n_running > 0);
159  -- parent->childs.n_running;
160 
161  assert(requests.empty());
162 }
163 
164 void
166 {
168  requestsIndex.clear();
169 }
170 
172 {
173  /* TODO: walk the local queue of requests and carry them all out */
174  if (Comm::IsConnOpen(writePipe))
175  closeWritePipeSafely();
176 
177  parent->cancelReservation(reservationId);
178 
179  dlinkDelete(&link, &parent->servers);
180 
181  assert(parent->childs.n_running > 0);
182  -- parent->childs.n_running;
183 
184  assert(requests.empty());
185 }
186 
187 void
189 {
190  char *s;
191  char *progname;
192  char *shortname;
193  char *procname;
194  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
195  char fd_note_buf[FD_DESC_SZ];
196  int nargs = 0;
197  int k;
198  pid_t pid;
199  int rfd;
200  int wfd;
201  void * hIpc;
202  wordlist *w;
203  // Helps reducing diff. TODO: remove
204  const auto hlp = this;
205 
206  if (hlp->cmdline == nullptr)
207  return;
208 
209  progname = hlp->cmdline->key;
210 
211  if ((s = strrchr(progname, '/')))
212  shortname = xstrdup(s + 1);
213  else
214  shortname = xstrdup(progname);
215 
216  /* figure out how many new child are actually needed. */
217  int need_new = hlp->childs.needNew();
218 
219  debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
220 
221  if (need_new < 1) {
222  debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
223  }
224 
225  procname = (char *)xmalloc(strlen(shortname) + 3);
226 
227  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
228 
229  args[nargs] = procname;
230  ++nargs;
231 
232  for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
233  args[nargs] = w->key;
234  ++nargs;
235  }
236 
237  args[nargs] = nullptr;
238  ++nargs;
239 
240  assert(nargs <= HELPER_MAX_ARGS);
241 
242  int successfullyStarted = 0;
243 
244  for (k = 0; k < need_new; ++k) {
245  getCurrentTime();
246  rfd = wfd = -1;
247  pid = ipcCreate(hlp->ipc_type,
248  progname,
249  args,
250  shortname,
251  hlp->addr,
252  &rfd,
253  &wfd,
254  &hIpc);
255 
256  if (pid < 0) {
257  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
258  continue;
259  }
260 
261  ++successfullyStarted;
262  ++ hlp->childs.n_running;
263  ++ hlp->childs.n_active;
264  const auto srv = new Helper::Session;
265  srv->hIpc = hIpc;
266  srv->pid = pid;
267  srv->initStats();
268  srv->addr = hlp->addr;
269  srv->readPipe = new Comm::Connection;
270  srv->readPipe->fd = rfd;
271  srv->writePipe = new Comm::Connection;
272  srv->writePipe->fd = wfd;
273  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
274  srv->wqueue = new MemBuf;
275  srv->roffset = 0;
276  srv->nextRequestId = 0;
277  srv->replyXaction = nullptr;
278  srv->ignoreToEom = false;
279  srv->parent = hlp;
280  dlinkAddTail(srv, &srv->link, &hlp->servers);
281 
282  if (rfd == wfd) {
283  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
284  fd_note(rfd, fd_note_buf);
285  } else {
286  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
287  fd_note(rfd, fd_note_buf);
288  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
289  fd_note(wfd, fd_note_buf);
290  }
291 
292  commSetNonBlocking(rfd);
293 
294  if (wfd != rfd)
295  commSetNonBlocking(wfd);
296 
297  AsyncCall::Pointer closeCall = asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed,
298  static_cast<Helper::SessionBase *>(srv)));
299 
300  comm_add_close_handler(rfd, closeCall);
301 
302  if (hlp->timeout && hlp->childs.concurrency) {
303  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
305  commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
306  }
307 
308  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
310  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
311  }
312 
313  // Call handleFewerServers() before hlp->last_restart is updated because
314  // that method uses last_restart to measure the delay since previous start.
315  // TODO: Refactor last_restart code to measure failure frequency rather than
316  // detecting a helper #X failure that is being close to the helper #Y start.
317  if (successfullyStarted < need_new)
318  hlp->handleFewerServers(false);
319 
320  hlp->last_restart = squid_curtime;
321  safe_free(shortname);
322  safe_free(procname);
323  helperKickQueue(hlp);
324 }
325 
326 void
328 {
329  char *shortname;
330  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
331  char fd_note_buf[FD_DESC_SZ];
332  int nargs = 0;
333  // Helps reducing diff. TODO: remove
334  const auto hlp = this;
335 
336  if (hlp->cmdline == nullptr)
337  return;
338 
339  if (hlp->childs.concurrency)
340  debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
341 
342  char *progname = hlp->cmdline->key;
343 
344  char *s;
345  if ((s = strrchr(progname, '/')))
346  shortname = xstrdup(s + 1);
347  else
348  shortname = xstrdup(progname);
349 
350  /* figure out haw mant new helpers are needed. */
351  int need_new = hlp->childs.needNew();
352 
353  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
354 
355  if (need_new < 1) {
356  debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
357  }
358 
359  char *procname = (char *)xmalloc(strlen(shortname) + 3);
360 
361  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
362 
363  args[nargs] = procname;
364  ++nargs;
365 
366  for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
367  args[nargs] = w->key;
368  ++nargs;
369  }
370 
371  args[nargs] = nullptr;
372  ++nargs;
373 
374  assert(nargs <= HELPER_MAX_ARGS);
375 
376  int successfullyStarted = 0;
377 
378  for (int k = 0; k < need_new; ++k) {
379  getCurrentTime();
380  int rfd = -1;
381  int wfd = -1;
382  void * hIpc;
383  pid_t pid = ipcCreate(hlp->ipc_type,
384  progname,
385  args,
386  shortname,
387  hlp->addr,
388  &rfd,
389  &wfd,
390  &hIpc);
391 
392  if (pid < 0) {
393  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
394  continue;
395  }
396 
397  ++successfullyStarted;
398  ++ hlp->childs.n_running;
399  ++ hlp->childs.n_active;
401  srv->hIpc = hIpc;
402  srv->pid = pid;
403  srv->initStats();
404  srv->addr = hlp->addr;
405  srv->readPipe = new Comm::Connection;
406  srv->readPipe->fd = rfd;
407  srv->writePipe = new Comm::Connection;
408  srv->writePipe->fd = wfd;
409  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
410  srv->roffset = 0;
411  srv->parent = hlp;
412  srv->reservationStart = 0;
413 
414  dlinkAddTail(srv, &srv->link, &hlp->servers);
415 
416  if (rfd == wfd) {
417  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
418  fd_note(rfd, fd_note_buf);
419  } else {
420  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
421  fd_note(rfd, fd_note_buf);
422  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
423  fd_note(wfd, fd_note_buf);
424  }
425 
426  commSetNonBlocking(rfd);
427 
428  if (wfd != rfd)
429  commSetNonBlocking(wfd);
430 
431  AsyncCall::Pointer closeCall = asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed,
432  static_cast<Helper::SessionBase *>(srv)));
433 
434  comm_add_close_handler(rfd, closeCall);
435 
436  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
438  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
439  }
440 
441  // Call handleFewerServers() before hlp->last_restart is updated because
442  // that method uses last_restart to measure the delay since previous start.
443  // TODO: Refactor last_restart code to measure failure frequency rather than
444  // detecting a helper #X failure that is being close to the helper #Y start.
445  if (successfullyStarted < need_new)
446  hlp->handleFewerServers(false);
447 
448  hlp->last_restart = squid_curtime;
449  safe_free(shortname);
450  safe_free(procname);
452 }
453 
454 void
456 {
457  if (const auto srv = GetFirstAvailable(this))
458  helperDispatch(srv, r);
459  else
460  Enqueue(this, r);
461 
462  syncQueueStats();
463 }
464 
466 static void
467 SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
468 {
469  auto result = Helper::Error;
470  if (!hlp) {
471  debugs(84, 3, "no helper");
472  result = Helper::Unknown;
473  }
474  // else pretend the helper has responded with ERR
475 
476  callback(data, Helper::Reply(result));
477 }
478 
479 void
480 helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
481 {
482  if (!hlp || !hlp->trySubmit(buf, callback, data))
483  SubmissionFailure(hlp, callback, data);
484 }
485 
487 bool
489  return stats.queue_size >= static_cast<int>(childs.queue_size);
490 }
491 
492 bool
494  return stats.queue_size > static_cast<int>(childs.queue_size);
495 }
496 
498 void
500 {
501  if (overloaded()) {
502  if (overloadStart) {
503  debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
504  } else {
505  overloadStart = squid_curtime;
506  debugs(84, 3, id_name << " became overloaded");
507  }
508  } else {
509  if (overloadStart) {
510  debugs(84, 5, id_name << " is no longer overloaded");
511  if (droppedRequests) {
512  debugs(84, DBG_IMPORTANT, "helper " << id_name <<
513  " is no longer overloaded after dropping " << droppedRequests <<
514  " requests in " << (squid_curtime - overloadStart) << " seconds");
515  droppedRequests = 0;
516  }
517  overloadStart = 0;
518  }
519  }
520 }
521 
525 bool
527 {
528  // re-sync for the configuration may have changed since the last submission
529  syncQueueStats();
530 
531  // Nothing special to do if the new request does not overload (i.e., the
532  // queue is not even full yet) or only _starts_ overloading this helper
533  // (i.e., the queue is currently at its limit).
534  if (!overloaded())
535  return true;
536 
537  if (squid_curtime - overloadStart <= 180)
538  return true; // also OK: overload has not persisted long enough to panic
539 
540  if (childs.onPersistentOverload == ChildConfig::actDie)
541  fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
542 
543  if (!droppedRequests) {
544  debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
545  id_name << " helper configured with on-persistent-overload=err");
546  }
547  ++droppedRequests;
548  debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
549  return false;
550 }
551 
552 bool
553 Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
554 {
555  if (!prepSubmit())
556  return false; // request was dropped
557 
558  submit(buf, callback, data); // will send or queue
559  return true; // request submitted or queued
560 }
561 
563 void
564 Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
565 {
566  const auto r = new Xaction(callback, data, buf);
567  submitRequest(r);
568  debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
569 }
570 
571 void
573 {
574  const auto callback = r.request.callback;
575  Assure(callback);
576 
577  r.request.callback = nullptr;
578  void *cbdata = nullptr;
580  callback(cbdata, r.reply);
581 }
582 
585 void
586 helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
587 {
588  if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
589  SubmissionFailure(hlp, callback, data);
590 }
591 
593 bool
594 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
595 {
596  if (!prepSubmit())
597  return false; // request was dropped
598 
599  submit(buf, callback, data, reservation); // will send or queue
600  return true; // request submitted or queued
601 }
602 
603 void
605 {
606  // clear any old reservation
607  if (srv->reserved()) {
608  reservations.erase(srv->reservationId);
609  srv->clearReservation();
610  }
611 
612  srv->reserve();
613  reservations.insert(Reservations::value_type(srv->reservationId, srv));
614 }
615 
616 void
618 {
619  const auto it = reservations.find(reservation);
620  if (it == reservations.end())
621  return;
622 
623  helper_stateful_server *srv = it->second;
624  reservations.erase(it);
625  srv->clearReservation();
626 
627  // schedule a queue kick
628  AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
629  ScheduleCallHere(call);
630 }
631 
634 {
635  const auto it = reservations.find(reservation);
636  if (it == reservations.end())
637  return nullptr;
638  return it->second;
639 }
640 
641 void
643 {
644  assert(!reservationId);
645  reservationStart = squid_curtime;
646  reservationId = Helper::ReservationId::Next();
647  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
648 }
649 
650 void
652 {
653  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
654  if (!reservationId)
655  return;
656 
657  ++stats.releases;
658 
659  reservationId.clear();
660  reservationStart = 0;
661 }
662 
663 void
664 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
665 {
666  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
667 
668  if (buf && reservation) {
669  debugs(84, 5, reservation);
670  helper_stateful_server *lastServer = findServer(reservation);
671  if (!lastServer) {
672  debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
674  callBack(*r);
675  delete r;
676  return;
677  }
678  debugs(84, 5, "StatefulSubmit dispatching");
679  helperStatefulDispatch(lastServer, r);
680  } else {
682  if ((srv = StatefulGetFirstAvailable(this))) {
683  reserveServer(srv);
684  helperStatefulDispatch(srv, r); // may delete r
685  } else
686  StatefulEnqueue(this, r);
687  }
688 
689  // r may be dangling here
690  syncQueueStats();
691 }
692 
693 void
694 Helper::Client::packStatsInto(Packable * const p, const char * const label) const
695 {
696  if (label)
697  p->appendf("%s:\n", label);
698 
699  p->appendf(" program: %s\n", cmdline->key);
700  p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
701  p->appendf(" requests sent: %d\n", stats.requests);
702  p->appendf(" replies received: %d\n", stats.replies);
703  p->appendf(" requests timedout: %d\n", stats.timedout);
704  p->appendf(" queue length: %d\n", stats.queue_size);
705  p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
706  p->append("\n",1);
707  p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
708  "ID #",
709  "FD",
710  "PID",
711  "# Requests",
712  "# Replies",
713  "# Timed-out",
714  "Flags",
715  "Time",
716  "Offset",
717  "Request");
718 
719  for (dlink_node *link = servers.head; link; link = link->next) {
720  const auto srv = static_cast<SessionBase *>(link->data);
721  assert(srv);
722  const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
723  double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
724  p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
725  srv->index.value,
726  srv->readPipe->fd,
727  srv->pid,
728  srv->stats.uses,
729  srv->stats.replies,
730  srv->stats.timedout,
731  srv->stats.pending ? 'B' : ' ',
732  srv->flags.writing ? 'W' : ' ',
733  srv->flags.closing ? 'C' : ' ',
734  srv->reserved() ? 'R' : ' ',
735  srv->flags.shutdown ? 'S' : ' ',
736  xaction && xaction->request.placeholder ? 'P' : ' ',
737  tt < 0.0 ? 0.0 : tt,
738  (int) srv->roffset,
739  xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
740  }
741 
742  p->append("\nFlags key:\n"
743  " B\tBUSY\n"
744  " W\tWRITING\n"
745  " C\tCLOSING\n"
746  " R\tRESERVED\n"
747  " S\tSHUTDOWN PENDING\n"
748  " P\tPLACEHOLDER\n", 101);
749 }
750 
751 bool
753  return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
754 }
755 
757 Helper::Client::Make(const char * const name)
758 {
759  return new Client(name);
760 }
761 
764 {
765  return new statefulhelper(name);
766 }
767 
768 void
770 {
771  dlink_node *link = hlp->servers.head;
772 
773  while (link) {
774  const auto srv = static_cast<Helper::Session *>(link->data);
775  link = link->next;
776 
777  if (srv->flags.shutdown) {
778  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
779  continue;
780  }
781 
782  assert(hlp->childs.n_active > 0);
783  -- hlp->childs.n_active;
784  srv->flags.shutdown = true; /* request it to shut itself down */
785 
786  if (srv->flags.closing) {
787  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
788  continue;
789  }
790 
791  if (srv->stats.pending) {
792  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
793  continue;
794  }
795 
796  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
797  /* the rest of the details is dealt with in the helperServerFree
798  * close handler
799  */
800  srv->closePipesSafely();
801  }
802 
803  Assure(!hlp->childs.n_active);
804  hlp->dropQueued();
805 }
806 
807 void
809 {
810  dlink_node *link = hlp->servers.head;
812 
813  while (link) {
814  srv = (helper_stateful_server *)link->data;
815  link = link->next;
816 
817  if (srv->flags.shutdown) {
818  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
819  continue;
820  }
821 
822  assert(hlp->childs.n_active > 0);
823  -- hlp->childs.n_active;
824  srv->flags.shutdown = true; /* request it to shut itself down */
825 
826  if (srv->stats.pending) {
827  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
828  continue;
829  }
830 
831  if (srv->flags.closing) {
832  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
833  continue;
834  }
835 
836  if (srv->reserved()) {
837  if (shutting_down) {
838  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
839  } else {
840  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
841  continue;
842  }
843  }
844 
845  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
846 
847  /* the rest of the details is dealt with in the helperStatefulServerFree
848  * close handler
849  */
850  srv->closePipesSafely();
851  }
852 }
853 
855 {
856  /* note, don't free id_name, it probably points to static memory */
857 
858  // A non-empty queue would leak Helper::Xaction objects, stalling any
859  // pending (and even future collapsed) transactions. To avoid stalling
860  // transactions, we must dropQueued(). We ought to do that when we
861  // discover that no progress is possible rather than here because
862  // reference counting may keep this object alive for a long time.
863  assert(queue.empty());
864 }
865 
866 void
868 {
869  if (!srv->flags.shutdown) {
870  assert(childs.n_active > 0);
871  --childs.n_active;
872  debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
873 
874  handleFewerServers(srv->stats.replies >= 1);
875 
876  if (childs.needNew() > 0) {
877  srv->flags.shutdown = true;
878  openSessions();
879  }
880  }
881 
882  if (!childs.n_active)
883  dropQueued();
884 }
885 
886 void
888 {
889  if (queue.empty())
890  return;
891 
892  Assure(!childs.n_active);
893  Assure(!GetFirstAvailable(this));
894 
895  // no helper servers means nobody can advance our queued transactions
896 
897  debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
898  id_name << " helper requests due to lack of helper processes");
899  // similar to SessionBase::dropQueued()
900  while (const auto r = nextRequest()) {
901  r->reply.result = Helper::Unknown;
902  callBack(*r);
903  delete r;
904  }
905 }
906 
907 void
908 Helper::Client::handleFewerServers(const bool madeProgress)
909 {
910  const auto needNew = childs.needNew();
911 
912  if (!needNew)
913  return; // some server(s) have died, but we still have enough
914 
915  debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
916  Debug::Extra << "active processes: " << childs.n_active <<
917  Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
918 
919  if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
920  if (madeProgress)
921  debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
922  else
923  fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
924  }
925 }
926 
927 void
929 {
930  srv->helper().handleKilledServer(srv);
931  srv->dropQueued();
932  delete srv;
933 }
934 
936 Helper::Session::popRequest(const int request_number)
937 {
938  Xaction *r = nullptr;
939  if (parent->childs.concurrency) {
940  // If concurrency supported retrieve request from ID
941  const auto it = requestsIndex.find(request_number);
942  if (it != requestsIndex.end()) {
943  r = *(it->second);
944  requests.erase(it->second);
945  requestsIndex.erase(it);
946  }
947  } else if(!requests.empty()) {
948  // Else get the first request from queue, if any
949  r = requests.front();
950  requests.pop_front();
951  }
952 
953  return r;
954 }
955 
957 static void
958 helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
959 {
960  if (Helper::Xaction *r = srv->replyXaction) {
961  const bool hasSpace = r->reply.accumulate(msg, msgSize);
962  if (!hasSpace) {
963  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
964  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
965  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
966  srv->closePipesSafely();
967  return;
968  }
969 
970  if (!msgEnd)
971  return; // We are waiting for more data.
972 
973  bool retry = false;
974  if (cbdataReferenceValid(r->request.data)) {
975  r->reply.finalize();
976  if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
977  debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
978  retry = true;
979  } else {
980  hlp->callBack(*r);
981  }
982  }
983 
984  -- srv->stats.pending;
985  ++ srv->stats.replies;
986 
987  ++ hlp->stats.replies;
988 
989  srv->answer_time = current_time;
990 
991  srv->dispatch_time = r->request.dispatch_time;
992 
993  hlp->stats.avg_svc_time =
995  tvSubMsec(r->request.dispatch_time, current_time),
997 
998  // release or re-submit parsedRequestXaction object
999  srv->replyXaction = nullptr;
1000  if (retry) {
1001  ++r->request.retries;
1002  hlp->submitRequest(r);
1003  } else
1004  delete r;
1005  }
1006 
1007  if (hlp->timeout && hlp->childs.concurrency)
1009 
1010  if (!srv->flags.shutdown) {
1011  helperKickQueue(hlp);
1012  } else if (!srv->flags.closing && !srv->stats.pending) {
1013  srv->closeWritePipeSafely();
1014  }
1015 }
1016 
1017 static void
1018 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1019 {
1020  const auto srv = static_cast<Helper::Session *>(data);
1021  const auto hlp = srv->parent;
1023 
1024  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1025 
1026  if (flag == Comm::ERR_CLOSING) {
1027  return;
1028  }
1029 
1030  assert(conn->fd == srv->readPipe->fd);
1031 
1032  debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
1033 
1034  if (flag != Comm::OK || len == 0) {
1035  srv->closePipesSafely();
1036  return;
1037  }
1038 
1039  srv->roffset += len;
1040  srv->rbuf[srv->roffset] = '\0';
1041  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1042 
1043  if (!srv->stats.pending && !srv->stats.timedout) {
1044  /* someone spoke without being spoken to */
1045  debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1046  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1047  " bytes '" << srv->rbuf << "'");
1048 
1049  srv->roffset = 0;
1050  srv->rbuf[0] = '\0';
1051  srv->closePipesSafely();
1052  return;
1053  }
1054 
1055  bool needsMore = false;
1056  char *msg = srv->rbuf;
1057  while (*msg && !needsMore) {
1058  int skip = 0;
1059  char *eom = strchr(msg, hlp->eom);
1060  if (eom) {
1061  skip = 1;
1062  debugs(84, 3, "helperHandleRead: end of reply found");
1063  if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1064  *eom = '\0';
1065  // rewind to the \r octet which is the real terminal now
1066  // and remember that we have to skip forward 2 places now.
1067  skip = 2;
1068  --eom;
1069  }
1070  *eom = '\0';
1071  }
1072 
1073  if (!srv->ignoreToEom && !srv->replyXaction) {
1074  int i = 0;
1075  if (hlp->childs.concurrency) {
1076  char *e = nullptr;
1077  i = strtol(msg, &e, 10);
1078  // Do we need to check for e == msg? Means wrong response from helper.
1079  // Will be dropped as "unexpected reply on channel 0"
1080  needsMore = !(xisspace(*e) || (eom && e == eom));
1081  if (!needsMore) {
1082  msg = e;
1083  while (*msg && xisspace(*msg))
1084  ++msg;
1085  } // else not enough data to compute request number
1086  }
1087  if (!(srv->replyXaction = srv->popRequest(i))) {
1088  if (srv->stats.timedout) {
1089  debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1090  } else {
1091  debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1092  i << " from " << hlp->id_name << " #" << srv->index <<
1093  " '" << srv->rbuf << "'");
1094  }
1095  srv->ignoreToEom = true;
1096  }
1097  } // else we need to just append reply data to the current Xaction
1098 
1099  if (!needsMore) {
1100  size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1101  assert(msgSize <= srv->rbuf_sz);
1102  helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1103  msg += msgSize + skip;
1104  assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1105 
1106  // The next message should not ignored.
1107  if (eom && srv->ignoreToEom)
1108  srv->ignoreToEom = false;
1109  } else
1110  assert(skip == 0 && eom == nullptr);
1111  }
1112 
1113  if (needsMore) {
1114  size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1115  assert(msgSize <= srv->rbuf_sz);
1116  memmove(srv->rbuf, msg, msgSize);
1117  srv->roffset = msgSize;
1118  srv->rbuf[srv->roffset] = '\0';
1119  } else {
1120  // All of the responses parsed and msg points at the end of read data
1121  assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1122  srv->roffset = 0;
1123  }
1124 
1125  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1126  int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1127  assert(spaceSize >= 0);
1128 
1129  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1131  comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1132  }
1133 }
1134 
1135 static void
1136 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1137 {
1138  char *t = nullptr;
1140  const auto hlp = srv->parent;
1142 
1143  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1144 
1145  if (flag == Comm::ERR_CLOSING) {
1146  return;
1147  }
1148 
1149  assert(conn->fd == srv->readPipe->fd);
1150 
1151  debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1152  hlp->id_name << " #" << srv->index);
1153 
1154  if (flag != Comm::OK || len == 0) {
1155  srv->closePipesSafely();
1156  return;
1157  }
1158 
1159  srv->roffset += len;
1160  srv->rbuf[srv->roffset] = '\0';
1161  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1162 
1163  if (srv->requests.empty()) {
1164  /* someone spoke without being spoken to */
1165  debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1166  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1167  " bytes '" << srv->rbuf << "'");
1168 
1169  srv->roffset = 0;
1170  srv->closePipesSafely();
1171  return;
1172  }
1173 
1174  if ((t = strchr(srv->rbuf, hlp->eom))) {
1175  debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1176 
1177  if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1178  *t = '\0';
1179  // rewind to the \r octet which is the real terminal now
1180  --t;
1181  }
1182 
1183  *t = '\0';
1184  }
1185 
1186  const auto r = srv->requests.front();
1187 
1188  if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1189  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1190  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1191  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1192  srv->closePipesSafely();
1193  return;
1194  }
1201  srv->roffset = 0;
1202 
1203  if (t) {
1204  /* end of reply found */
1205  srv->requests.pop_front(); // we already have it in 'r'
1206  int called = 1;
1207 
1208  if (cbdataReferenceValid(r->request.data)) {
1209  r->reply.finalize();
1210  r->reply.reservationId = srv->reservationId;
1211  hlp->callBack(*r);
1212  } else {
1213  debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1214  called = 0;
1215  }
1216 
1217  delete r;
1218 
1219  -- srv->stats.pending;
1220  ++ srv->stats.replies;
1221 
1222  ++ hlp->stats.replies;
1223  srv->answer_time = current_time;
1224  hlp->stats.avg_svc_time =
1225  Math::intAverage(hlp->stats.avg_svc_time,
1227  hlp->stats.replies, REDIRECT_AV_FACTOR);
1228 
1229  if (called)
1231  else
1232  hlp->cancelReservation(srv->reservationId);
1233  }
1234 
1235  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1236  int spaceSize = srv->rbuf_sz - 1;
1237 
1238  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1240  comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1241  }
1242 }
1243 
1245 static void
1246 Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
1247 {
1248  hlp->queue.push(r);
1249  ++ hlp->stats.queue_size;
1250 
1251  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1252  if (hlp->childs.needNew() > 0) {
1253  hlp->openSessions();
1254  return;
1255  }
1256 
1257  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1258  return;
1259 
1260  if (squid_curtime - hlp->last_queue_warn < 600)
1261  return;
1262 
1264  return;
1265 
1267 
1268  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1269  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1270  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1271 }
1272 
1273 static void
1275 {
1276  hlp->queue.push(r);
1277  ++ hlp->stats.queue_size;
1278 
1279  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1280  if (hlp->childs.needNew() > 0) {
1281  hlp->openSessions();
1282  return;
1283  }
1284 
1285  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1286  return;
1287 
1288  if (squid_curtime - hlp->last_queue_warn < 600)
1289  return;
1290 
1292  return;
1293 
1295 
1296  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1297  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1298  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1299 }
1300 
1303 {
1304  if (queue.empty())
1305  return nullptr;
1306 
1307  auto *r = queue.front();
1308  queue.pop();
1309  --stats.queue_size;
1310  return r;
1311 }
1312 
1313 static Helper::Session *
1315 {
1316  dlink_node *n;
1317  Helper::Session *selected = nullptr;
1318  debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1319 
1320  if (hlp->childs.n_running == 0)
1321  return nullptr;
1322 
1323  /* Find "least" loaded helper (approx) */
1324  for (n = hlp->servers.head; n != nullptr; n = n->next) {
1325  const auto srv = static_cast<Helper::Session *>(n->data);
1326 
1327  if (selected && selected->stats.pending <= srv->stats.pending)
1328  continue;
1329 
1330  if (srv->flags.shutdown)
1331  continue;
1332 
1333  if (!srv->stats.pending)
1334  return srv;
1335 
1336  if (selected) {
1337  selected = srv;
1338  break;
1339  }
1340 
1341  selected = srv;
1342  }
1343 
1344  if (!selected) {
1345  debugs(84, 5, "GetFirstAvailable: None available.");
1346  return nullptr;
1347  }
1348 
1349  if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1350  debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1351  return nullptr;
1352  }
1353 
1354  debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1355  return selected;
1356 }
1357 
1358 static helper_stateful_server *
1360 {
1361  dlink_node *n;
1362  helper_stateful_server *srv = nullptr;
1363  helper_stateful_server *oldestReservedServer = nullptr;
1364  debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1365 
1366  if (hlp->childs.n_running == 0)
1367  return nullptr;
1368 
1369  for (n = hlp->servers.head; n != nullptr; n = n->next) {
1370  srv = (helper_stateful_server *)n->data;
1371 
1372  if (srv->stats.pending)
1373  continue;
1374 
1375  if (srv->reserved()) {
1377  if (!oldestReservedServer)
1378  oldestReservedServer = srv;
1379  else if (oldestReservedServer->reservationStart < srv->reservationStart)
1380  oldestReservedServer = srv;
1381  debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1382  }
1383  continue;
1384  }
1385 
1386  if (srv->flags.shutdown)
1387  continue;
1388 
1389  debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1390  return srv;
1391  }
1392 
1393  if (oldestReservedServer) {
1394  debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1395  return oldestReservedServer;
1396  }
1397 
1398  debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1399  return nullptr;
1400 }
1401 
1402 static void
1403 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1404 {
1405  const auto srv = static_cast<Helper::Session *>(data);
1406 
1407  srv->writebuf->clean();
1408  delete srv->writebuf;
1409  srv->writebuf = nullptr;
1410  srv->flags.writing = false;
1411 
1412  if (flag != Comm::OK) {
1413  /* Helper server has crashed */
1414  debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1415  return;
1416  }
1417 
1418  if (!srv->wqueue->isNull()) {
1419  srv->writebuf = srv->wqueue;
1420  srv->wqueue = new MemBuf;
1421  srv->flags.writing = true;
1422  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1424  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1425  }
1426 }
1427 
1428 static void
1430 {
1431  const auto hlp = srv->parent;
1432  const uint64_t reqId = ++srv->nextRequestId;
1433 
1434  if (!cbdataReferenceValid(r->request.data)) {
1435  debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1436  delete r;
1437  return;
1438  }
1439 
1440  r->request.Id = reqId;
1441  const auto it = srv->requests.insert(srv->requests.end(), r);
1443 
1444  if (srv->wqueue->isNull())
1445  srv->wqueue->init();
1446 
1447  if (hlp->childs.concurrency) {
1448  srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
1449  assert(srv->requestsIndex.size() == srv->requests.size());
1450  srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1451  } else
1452  srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1453 
1454  if (!srv->flags.writing) {
1455  assert(nullptr == srv->writebuf);
1456  srv->writebuf = srv->wqueue;
1457  srv->wqueue = new MemBuf;
1458  srv->flags.writing = true;
1459  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1461  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1462  }
1463 
1464  debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1465 
1466  ++ srv->stats.uses;
1467  ++ srv->stats.pending;
1468  ++ hlp->stats.requests;
1469 }
1470 
1471 static void
1473 {}
1474 
1475 static void
1477 {
1478  const auto hlp = srv->parent;
1479 
1480  if (!cbdataReferenceValid(r->request.data)) {
1481  debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1482  delete r;
1483  hlp->cancelReservation(srv->reservationId);
1484  return;
1485  }
1486 
1487  debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1488 
1489  assert(srv->reservationId);
1490  r->reply.reservationId = srv->reservationId;
1491 
1492  if (r->request.placeholder == 1) {
1493  /* a callback is needed before this request can _use_ a helper. */
1494  /* we don't care about releasing this helper. The request NEVER
1495  * gets to the helper. So we throw away the return code */
1497  hlp->callBack(*r);
1498  /* throw away the placeholder */
1499  delete r;
1500  /* and push the queue. Note that the callback may have submitted a new
1501  * request to the helper which is why we test for the request */
1502 
1503  if (!srv->requests.size())
1505 
1506  return;
1507  }
1508 
1509  srv->requests.push_back(r);
1510  srv->dispatch_time = current_time;
1511  AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1513  Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
1514  debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1515  hlp->id_name << " #" << srv->index << ", " <<
1516  (int) strlen(r->request.buf) << " bytes");
1517 
1518  ++ srv->stats.uses;
1519  ++ srv->stats.pending;
1520  ++ hlp->stats.requests;
1521 }
1522 
1523 static void
1525 {
1526  Helper::Xaction *r = nullptr;
1527  Helper::Session *srv = nullptr;
1528 
1529  while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1530  helperDispatch(srv, r);
1531 
1532  if (!hlp->childs.n_active)
1533  hlp->dropQueued();
1534 }
1535 
1536 static void
1538 {
1539  Helper::Xaction *r;
1541  while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1542  debugs(84, 5, "found srv-" << srv->index);
1543  hlp->reserveServer(srv);
1544  helperStatefulDispatch(srv, r);
1545  }
1546 
1547  if (!hlp->childs.n_active)
1548  hlp->dropQueued();
1549 }
1550 
1551 static void
1553 {
1554  if (!srv->flags.shutdown) {
1556  } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1557  srv->closeWritePipeSafely();
1558  return;
1559  }
1560 }
1561 
1562 void
1564 {
1565  assert(parent->childs.concurrency);
1566  while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1567  const auto r = requests.front();
1568  RequestIndex::iterator it;
1569  it = requestsIndex.find(r->request.Id);
1570  assert(it != requestsIndex.end());
1571  requestsIndex.erase(it);
1572  requests.pop_front();
1573  debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1574  bool retried = false;
1575  if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1576  debugs(84, 2, "Retry request " << r->request.Id);
1577  ++r->request.retries;
1578  parent->submitRequest(r);
1579  retried = true;
1580  } else if (cbdataReferenceValid(r->request.data)) {
1581  if (!parent->onTimedOutResponse.isEmpty()) {
1582  if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1583  r->reply.finalize();
1584  else
1585  r->reply.result = Helper::TimedOut;
1586  parent->callBack(*r);
1587  } else {
1588  r->reply.result = Helper::TimedOut;
1589  parent->callBack(*r);
1590  }
1591  }
1592  --stats.pending;
1593  ++stats.timedout;
1594  ++parent->stats.timedout;
1595  if (!retried)
1596  delete r;
1597  }
1598 }
1599 
1600 void
1602 {
1603  debugs(26, 3, io.conn);
1604  const auto srv = static_cast<Session *>(io.data);
1605 
1606  srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1607 
1608  debugs(84, 3, io.conn << " establish a new timeout");
1609  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1611 
1612  const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1613  const time_t minimumNewTimeout = 1; // second
1614  const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
1615 
1616  commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1617 }
1618 
static void helperReturnBuffer(Helper::Session *srv, const Helper::Client::Pointer &hlp, char *const msg, const size_t msgSize, const char *const msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:958
#define FD_DESC_SZ
Definition: defines.h:32
static IOCB helperStatefulHandleRead
Definition: helper.cc:45
uint64_t replies
Definition: helper.h:252
time_t reservationStart
when the last reservation was made
Definition: helper.h:333
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:952
Xaction * nextRequest()
Definition: helper.cc:1302
void openSessions() override
Definition: helper.cc:327
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
#define DBG_CRITICAL
Definition: Stream.h:37
static Helper::Session * GetFirstAvailable(const Helper::Client::Pointer &)
Definition: helper.cc:1314
unsigned int queue_size
Definition: ChildConfig.h:91
static void helperKickQueue(const Helper::Client::Pointer &)
Definition: helper.cc:1524
#define xmalloc
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1476
struct timeval dispatch_time
Definition: helper.h:236
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
Definition: helper.cc:633
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:239
uint64_t nextRequestId
Definition: helper.h:267
@ Error
Definition: ResultCode.h:19
void fd_note(int fd, const char *s)
Definition: fd.cc:216
@ actDie
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
Definition: cbdata.h:333
uint64_t uses
Definition: helper.h:251
static IOCB helperHandleRead
Definition: helper.cc:44
@ Unknown
Definition: ResultCode.h:17
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:39
static Pointer Make(const char *name)
Definition: helper.cc:763
~SessionBase() override
Definition: helper.cc:134
Xaction * replyXaction
Definition: helper.h:278
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
virtual Client & helper() const =0
our creator (parent) object
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
void handleFewerServers(bool madeProgress)
Definition: helper.cc:908
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1472
struct Helper::SessionBase::_helper_flags flags
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:93
void submitRequest(Xaction *)
Definition: helper.cc:455
uint64_t Id
Definition: Request.h:47
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
InstanceIdDefinitions(Helper::SessionBase, "Hlpr")
#define xstrdup
Comm::ConnectionPointer writePipe
Definition: helper.h:229
~helper_stateful_server() override
Definition: helper.cc:171
char * buf
Definition: Request.h:41
void packStatsInto(Packable *p, const char *label=nullptr) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:694
const A & max(A const &lhs, A const &rhs)
#define PRIu64
Definition: types.h:114
int commSetNonBlocking(int fd)
Definition: comm.cc:1054
const size_t ReadBufSize(32 *1024)
Helpers input buffer size.
Client::Pointer parent
Definition: helper.h:272
Definition: cbdata.cc:37
time_t last_queue_warn
Definition: helper.h:124
Helper::Reply reply
Definition: helper.h:46
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:553
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
@ OK
Definition: Flag.h:16
void callBack(Xaction &)
sends transaction response to the transaction initiator
Definition: helper.cc:572
dlink_node link
Definition: helper.h:239
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1601
@ ERR_CLOSING
Definition: Flag.h:24
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
static Pointer Make(const char *name)
Definition: helper.cc:757
unsigned int n_running
Definition: ChildConfig.h:80
HLPCB * callback
Definition: Request.h:42
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:664
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
void helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:586
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
Definition: helper.cc:617
#define DBG_DATA
Definition: Stream.h:40
static void * hIpc
Definition: IcmpSquid.cc:33
static pid_t pid
Definition: IcmpSquid.cc:34
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1403
void helperShutdown(const Helper::Client::Pointer &hlp)
Definition: helper.cc:769
Definition: Raw.h:20
void reserveServer(helper_stateful_server *srv)
reserve the given server
Definition: helper.cc:604
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:59
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
void closeWritePipeSafely()
Definition: helper.cc:98
#define HELPER_MAX_ARGS
Definition: helper.cc:36
Comm::ConnectionPointer readPipe
Definition: helper.h:228
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
int isNull() const
Definition: MemBuf.cc:145
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:18
void helperStatefulShutdown(const statefulhelper::Pointer &hlp)
Definition: helper.cc:808
MemBuf * writebuf
Definition: helper.h:270
MemBuf * wqueue
Definition: helper.h:269
void append(const char *c, int sz) override
Definition: MemBuf.cc:209
int needNew() const
Definition: ChildConfig.cc:60
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1563
int placeholder
Definition: Request.h:45
void handleKilledServer(SessionBase *)
Definition: helper.cc:867
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
bool reserved() override
whether the server is locked for exclusive use by a client
Definition: helper.h:323
struct timeval answer_time
Definition: helper.h:237
struct timeval dispatch_time
Definition: Request.h:46
static ReservationId Next()
Helper::ResultCode result
The helper response 'result' field.
Definition: Reply.h:59
virtual ~Client()
Definition: helper.cc:854
static void Enqueue(Helper::Client *, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1246
Definition: MemBuf.h:23
const char *const name
waiting event name, for debugging
static void HelperServerClosed(SessionBase *)
close handler to handle exited server processes
Definition: helper.cc:928
Requests requests
requests in order of submission/expiration
Definition: helper.h:248
struct Helper::Client::_stats stats
bool overloaded() const
Definition: helper.cc:493
void clean()
Definition: MemBuf.cc:110
struct Helper::SessionBase::@60 stats
bool willOverload() const
Definition: helper.cc:752
ChildConfig childs
Configuration settings for number running.
Definition: helper.h:119
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:34
int reconfiguring
const InstanceId< SessionBase > index
Definition: helper.h:224
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
Ip::Address addr
Definition: helper.h:227
#define safe_free(x)
Definition: xalloc.h:73
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:564
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:312
#define assert(EX)
Definition: assert.h:17
static void helperDispatch(Helper::Session *, Helper::Xaction *)
Definition: helper.cc:1429
SSL Connection
Definition: Session.h:49
a (temporary) lock on a (stateful) helper channel
Definition: ReservationId.h:17
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
unsigned int n_max
Definition: ChildConfig.h:48
helper protocol primitives
Definition: ChildConfig.h:12
std::queue< Xaction * > queue
Definition: helper.h:117
void helperSubmit(const Helper::Client::Pointer &hlp, const char *const buf, HLPCB *const callback, void *const data)
Definition: helper.cc:480
void * data
Definition: Request.h:43
#define Assure(condition)
Definition: Assure.h:35
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:70
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:325
const char * id_name
Definition: helper.h:118
time_t squid_curtime
Definition: stub_libtime.cc:20
static std::ostream & Extra(std::ostream &)
Definition: debug.cc:1316
Xaction * popRequest(int requestId)
Definition: helper.cc:936
time_t reservationTimeout
older stateful helper server reservations may be forgotten
Definition: ChildConfig.h:109
void initStats()
Definition: helper.cc:62
wordlist * next
Definition: wordlist.h:60
bool retryTimedOut
Whether the timed-out requests must retried.
Definition: helper.h:127
static void helperStatefulKickQueue(const statefulhelper::Pointer &)
Definition: helper.cc:1537
Flag
Definition: Flag.h:15
#define REDIRECT_AV_FACTOR
Definition: defines.h:51
bool prepSubmit()
Definition: helper.cc:526
#define fd_table
Definition: fde.h:189
time_t timeout
Requests timeout.
Definition: helper.h:126
virtual void dropQueued()
dequeues and sends an Unknown answer to all queued requests
Definition: helper.cc:122
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1552
void commSetConnTimeout(const Comm::ConnectionPointer &conn, time_t timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:592
char * key
Definition: wordlist.h:59
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
reserved servers indexed by reservation IDs
Definition: helper.cc:594
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
Definition: Reply.h:65
static helper_stateful_server * StatefulGetFirstAvailable(const statefulhelper::Pointer &)
Definition: helper.cc:1359
char * content()
start of the added data
Definition: MemBuf.h:41
statefulhelper::Pointer parent
Definition: helper.h:326
void closePipesSafely()
Definition: helper.cc:72
void dropQueued()
Definition: helper.cc:887
~Session() override
Definition: helper.cc:142
#define Important(id)
Definition: Messages.h:93
#define DBG_IMPORTANT
Definition: Stream.h:38
uint64_t pending
Definition: helper.h:253
static void SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:467
void dropQueued() override
dequeues and sends an Unknown answer to all queued requests
Definition: helper.cc:165
@ BrokenHelper
Definition: ResultCode.h:20
Helper::Request request
Definition: helper.h:45
int shutting_down
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:33
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1274
SSL_SESSION Session
Definition: Session.h:51
virtual void openSessions()
Definition: helper.cc:188
#define xisspace(x)
Definition: xis.h:15
unsigned int concurrency
Definition: ChildConfig.h:72
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:285
char progname[]
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:499
unsigned int n_active
Definition: ChildConfig.h:86
dlink_list servers
Definition: helper.h:116
represents a single helper process
Definition: helper.h:192
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
Helper::ReservationId reservationId
"confirmation ID" of the last
Definition: helper.h:332
@ TimedOut
Definition: ResultCode.h:21
int unsigned int
Definition: stub_fd.cc:19
void memFreeBuf(size_t size, void *)
Definition: minimal.cc:67
Holds the required data to serve a helper request.
Definition: helper.h:41
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:488

 

Introduction

Documentation

Support

Miscellaneous