helper.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 84 Helper process maintenance */
10 
11 #include "squid.h"
12 #include "base/AsyncCbdataCalls.h"
13 #include "base/Packable.h"
14 #include "base/Raw.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "comm/Read.h"
18 #include "comm/Write.h"
19 #include "debug/Messages.h"
20 #include "fd.h"
21 #include "fde.h"
22 #include "format/Quoting.h"
23 #include "helper.h"
24 #include "helper/Reply.h"
25 #include "helper/Request.h"
26 #include "MemBuf.h"
27 #include "SquidConfig.h"
28 #include "SquidIpc.h"
29 #include "SquidMath.h"
30 #include "Store.h"
31 #include "wordlist.h"
32 
33 // helper_stateful_server::data uses explicit alloc()/freeOne() */
34 #include "mem/Pool.h"
35 
36 #define HELPER_MAX_ARGS 64
37 
39 #define MAX_RETRIES 2
40 
43 const size_t ReadBufSize(128*1024);
44 
47 static void Enqueue(Helper::Client *, Helper::Xaction *);
52 static void helperKickQueue(const Helper::Client::Pointer &);
55 static void StatefulEnqueue(statefulhelper * hlp, Helper::Xaction * r);
56 
59 
61 
62 void
64 {
65  stats.uses=0;
66  stats.replies=0;
67  stats.pending=0;
68  stats.releases=0;
69  stats.timedout = 0;
70 }
71 
72 void
74 {
75 #if _SQUID_WINDOWS_
76  shutdown(writePipe->fd, SD_BOTH);
77 #endif
78 
79  flags.closing = true;
80  if (readPipe->fd == writePipe->fd)
81  readPipe->fd = -1;
82  else
83  readPipe->close();
84  writePipe->close();
85 
86 #if _SQUID_WINDOWS_
87  if (hIpc) {
88  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
90  debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
91  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
92  }
93  CloseHandle(hIpc);
94  }
95 #endif
96 }
97 
98 void
100 {
101 #if _SQUID_WINDOWS_
102  shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
103 #endif
104 
105  flags.closing = true;
106  if (readPipe->fd == writePipe->fd)
107  readPipe->fd = -1;
108  writePipe->close();
109 
110 #if _SQUID_WINDOWS_
111  if (hIpc) {
112  if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
113  getCurrentTime();
114  debugs(84, DBG_IMPORTANT, "WARNING: " << helper().id_name <<
115  " #" << index << " (PID " << (long int)pid << ") didn't exit in 5 seconds");
116  }
117  CloseHandle(hIpc);
118  }
119 #endif
120 }
121 
122 void
124 {
125  while (!requests.empty()) {
126  // XXX: re-schedule these on another helper?
127  const auto r = requests.front();
128  requests.pop_front();
129  r->reply.result = Helper::Unknown;
130  helper().callBack(*r);
131  delete r;
132  }
133 }
134 
136 {
137  if (rbuf) {
138  memFreeBuf(rbuf_sz, rbuf);
139  rbuf = nullptr;
140  }
141 }
142 
144 {
145  wqueue->clean();
146  delete wqueue;
147 
148  if (writebuf) {
149  writebuf->clean();
150  delete writebuf;
151  writebuf = nullptr;
152  }
153 
154  if (Comm::IsConnOpen(writePipe))
155  closeWritePipeSafely();
156 
157  dlinkDelete(&link, &parent->servers);
158 
159  assert(parent->childs.n_running > 0);
160  -- parent->childs.n_running;
161 
162  assert(requests.empty());
163 }
164 
165 void
167 {
169  requestsIndex.clear();
170 }
171 
173 {
174  /* TODO: walk the local queue of requests and carry them all out */
175  if (Comm::IsConnOpen(writePipe))
176  closeWritePipeSafely();
177 
178  parent->cancelReservation(reservationId);
179 
180  dlinkDelete(&link, &parent->servers);
181 
182  assert(parent->childs.n_running > 0);
183  -- parent->childs.n_running;
184 
185  assert(requests.empty());
186 }
187 
188 void
190 {
191  char *s;
192  char *progname;
193  char *shortname;
194  char *procname;
195  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
196  char fd_note_buf[FD_DESC_SZ];
197  int nargs = 0;
198  int k;
199  pid_t pid;
200  int rfd;
201  int wfd;
202  void * hIpc;
203  wordlist *w;
204  // Helps reducing diff. TODO: remove
205  const auto hlp = this;
206 
207  if (hlp->cmdline == nullptr)
208  return;
209 
210  progname = hlp->cmdline->key;
211 
212  if ((s = strrchr(progname, '/')))
213  shortname = xstrdup(s + 1);
214  else
215  shortname = xstrdup(progname);
216 
217  /* figure out how many new child are actually needed. */
218  int need_new = hlp->childs.needNew();
219 
220  debugs(84, Important(19), "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
221 
222  if (need_new < 1) {
223  debugs(84, Important(20), "helperOpenServers: No '" << shortname << "' processes needed.");
224  }
225 
226  procname = (char *)xmalloc(strlen(shortname) + 3);
227 
228  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
229 
230  args[nargs] = procname;
231  ++nargs;
232 
233  for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
234  args[nargs] = w->key;
235  ++nargs;
236  }
237 
238  args[nargs] = nullptr;
239  ++nargs;
240 
241  assert(nargs <= HELPER_MAX_ARGS + 1);
242 
243  int successfullyStarted = 0;
244 
245  for (k = 0; k < need_new; ++k) {
246  getCurrentTime();
247  rfd = wfd = -1;
248  pid = ipcCreate(hlp->ipc_type,
249  progname,
250  args,
251  shortname,
252  hlp->addr,
253  &rfd,
254  &wfd,
255  &hIpc);
256 
257  if (pid < 0) {
258  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
259  continue;
260  }
261 
262  ++successfullyStarted;
263  ++ hlp->childs.n_running;
264  ++ hlp->childs.n_active;
265  const auto srv = new Helper::Session;
266  srv->hIpc = hIpc;
267  srv->pid = pid;
268  srv->initStats();
269  srv->addr = hlp->addr;
270  srv->readPipe = new Comm::Connection;
271  srv->readPipe->fd = rfd;
272  srv->writePipe = new Comm::Connection;
273  srv->writePipe->fd = wfd;
274  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
275  srv->wqueue = new MemBuf;
276  srv->roffset = 0;
277  srv->nextRequestId = 0;
278  srv->replyXaction = nullptr;
279  srv->ignoreToEom = false;
280  srv->parent = hlp;
281  dlinkAddTail(srv, &srv->link, &hlp->servers);
282 
283  if (rfd == wfd) {
284  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
285  fd_note(rfd, fd_note_buf);
286  } else {
287  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
288  fd_note(rfd, fd_note_buf);
289  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
290  fd_note(wfd, fd_note_buf);
291  }
292 
293  commSetNonBlocking(rfd);
294 
295  if (wfd != rfd)
296  commSetNonBlocking(wfd);
297 
298  AsyncCall::Pointer closeCall = asyncCall(5, 4, "Helper::Session::HelperServerClosed", cbdataDialer(SessionBase::HelperServerClosed,
299  static_cast<Helper::SessionBase *>(srv)));
300 
301  comm_add_close_handler(rfd, closeCall);
302 
303  if (hlp->timeout && hlp->childs.concurrency) {
304  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
306  commSetConnTimeout(srv->readPipe, hlp->timeout, timeoutCall);
307  }
308 
309  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
311  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
312  }
313 
314  // Call handleFewerServers() before hlp->last_restart is updated because
315  // that method uses last_restart to measure the delay since previous start.
316  // TODO: Refactor last_restart code to measure failure frequency rather than
317  // detecting a helper #X failure that is being close to the helper #Y start.
318  if (successfullyStarted < need_new)
319  hlp->handleFewerServers(false);
320 
321  hlp->last_restart = squid_curtime;
322  safe_free(shortname);
323  safe_free(procname);
324  helperKickQueue(hlp);
325 }
326 
327 void
329 {
330  char *shortname;
331  const char *args[HELPER_MAX_ARGS+1]; // save space for a NULL terminator
332  char fd_note_buf[FD_DESC_SZ];
333  int nargs = 0;
334  // Helps reducing diff. TODO: remove
335  const auto hlp = this;
336 
337  if (hlp->cmdline == nullptr)
338  return;
339 
340  if (hlp->childs.concurrency)
341  debugs(84, DBG_CRITICAL, "ERROR: concurrency= is not yet supported for stateful helpers ('" << hlp->cmdline << "')");
342 
343  char *progname = hlp->cmdline->key;
344 
345  char *s;
346  if ((s = strrchr(progname, '/')))
347  shortname = xstrdup(s + 1);
348  else
349  shortname = xstrdup(progname);
350 
351  /* figure out haw mant new helpers are needed. */
352  int need_new = hlp->childs.needNew();
353 
354  debugs(84, DBG_IMPORTANT, "helperOpenServers: Starting " << need_new << "/" << hlp->childs.n_max << " '" << shortname << "' processes");
355 
356  if (need_new < 1) {
357  debugs(84, DBG_IMPORTANT, "helperStatefulOpenServers: No '" << shortname << "' processes needed.");
358  }
359 
360  char *procname = (char *)xmalloc(strlen(shortname) + 3);
361 
362  snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
363 
364  args[nargs] = procname;
365  ++nargs;
366 
367  for (wordlist *w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next) {
368  args[nargs] = w->key;
369  ++nargs;
370  }
371 
372  args[nargs] = nullptr;
373  ++nargs;
374 
375  assert(nargs <= HELPER_MAX_ARGS + 1);
376 
377  int successfullyStarted = 0;
378 
379  for (int k = 0; k < need_new; ++k) {
380  getCurrentTime();
381  int rfd = -1;
382  int wfd = -1;
383  void * hIpc;
384  pid_t pid = ipcCreate(hlp->ipc_type,
385  progname,
386  args,
387  shortname,
388  hlp->addr,
389  &rfd,
390  &wfd,
391  &hIpc);
392 
393  if (pid < 0) {
394  debugs(84, DBG_IMPORTANT, "WARNING: Cannot run '" << progname << "' process.");
395  continue;
396  }
397 
398  ++successfullyStarted;
399  ++ hlp->childs.n_running;
400  ++ hlp->childs.n_active;
402  srv->hIpc = hIpc;
403  srv->pid = pid;
404  srv->initStats();
405  srv->addr = hlp->addr;
406  srv->readPipe = new Comm::Connection;
407  srv->readPipe->fd = rfd;
408  srv->writePipe = new Comm::Connection;
409  srv->writePipe->fd = wfd;
410  srv->rbuf = (char *)memAllocBuf(ReadBufSize, &srv->rbuf_sz);
411  srv->roffset = 0;
412  srv->parent = hlp;
413  srv->reservationStart = 0;
414 
415  dlinkAddTail(srv, &srv->link, &hlp->servers);
416 
417  if (rfd == wfd) {
418  snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
419  fd_note(rfd, fd_note_buf);
420  } else {
421  snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
422  fd_note(rfd, fd_note_buf);
423  snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
424  fd_note(wfd, fd_note_buf);
425  }
426 
427  commSetNonBlocking(rfd);
428 
429  if (wfd != rfd)
430  commSetNonBlocking(wfd);
431 
432  AsyncCall::Pointer closeCall = asyncCall(5, 4, "helper_stateful_server::HelperServerClosed", cbdataDialer(Helper::SessionBase::HelperServerClosed,
433  static_cast<Helper::SessionBase *>(srv)));
434 
435  comm_add_close_handler(rfd, closeCall);
436 
437  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
439  comm_read(srv->readPipe, srv->rbuf, srv->rbuf_sz - 1, call);
440  }
441 
442  // Call handleFewerServers() before hlp->last_restart is updated because
443  // that method uses last_restart to measure the delay since previous start.
444  // TODO: Refactor last_restart code to measure failure frequency rather than
445  // detecting a helper #X failure that is being close to the helper #Y start.
446  if (successfullyStarted < need_new)
447  hlp->handleFewerServers(false);
448 
449  hlp->last_restart = squid_curtime;
450  safe_free(shortname);
451  safe_free(procname);
453 }
454 
455 void
457 {
458  if (const auto srv = GetFirstAvailable(this))
459  helperDispatch(srv, r);
460  else
461  Enqueue(this, r);
462 
463  syncQueueStats();
464 }
465 
467 static void
468 SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
469 {
470  auto result = Helper::Error;
471  if (!hlp) {
472  debugs(84, 3, "no helper");
473  result = Helper::Unknown;
474  }
475  // else pretend the helper has responded with ERR
476 
477  callback(data, Helper::Reply(result));
478 }
479 
480 void
481 helperSubmit(const Helper::Client::Pointer &hlp, const char * const buf, HLPCB * const callback, void * const data)
482 {
483  if (!hlp || !hlp->trySubmit(buf, callback, data))
484  SubmissionFailure(hlp, callback, data);
485 }
486 
488 bool
490  return stats.queue_size >= static_cast<int>(childs.queue_size);
491 }
492 
493 bool
495  return stats.queue_size > static_cast<int>(childs.queue_size);
496 }
497 
499 void
501 {
502  if (overloaded()) {
503  if (overloadStart) {
504  debugs(84, 5, id_name << " still overloaded; dropped " << droppedRequests);
505  } else {
506  overloadStart = squid_curtime;
507  debugs(84, 3, id_name << " became overloaded");
508  }
509  } else {
510  if (overloadStart) {
511  debugs(84, 5, id_name << " is no longer overloaded");
512  if (droppedRequests) {
513  debugs(84, DBG_IMPORTANT, "helper " << id_name <<
514  " is no longer overloaded after dropping " << droppedRequests <<
515  " requests in " << (squid_curtime - overloadStart) << " seconds");
516  droppedRequests = 0;
517  }
518  overloadStart = 0;
519  }
520  }
521 }
522 
526 bool
528 {
529  // re-sync for the configuration may have changed since the last submission
530  syncQueueStats();
531 
532  // Nothing special to do if the new request does not overload (i.e., the
533  // queue is not even full yet) or only _starts_ overloading this helper
534  // (i.e., the queue is currently at its limit).
535  if (!overloaded())
536  return true;
537 
538  if (squid_curtime - overloadStart <= 180)
539  return true; // also OK: overload has not persisted long enough to panic
540 
541  if (childs.onPersistentOverload == ChildConfig::actDie)
542  fatalf("Too many queued %s requests; see on-persistent-overload.", id_name);
543 
544  if (!droppedRequests) {
545  debugs(84, DBG_IMPORTANT, "WARNING: dropping requests to overloaded " <<
546  id_name << " helper configured with on-persistent-overload=err");
547  }
548  ++droppedRequests;
549  debugs(84, 3, "failed to send " << droppedRequests << " helper requests to " << id_name);
550  return false;
551 }
552 
553 bool
554 Helper::Client::trySubmit(const char * const buf, HLPCB * const callback, void * const data)
555 {
556  if (!prepSubmit())
557  return false; // request was dropped
558 
559  submit(buf, callback, data); // will send or queue
560  return true; // request submitted or queued
561 }
562 
564 void
565 Helper::Client::submit(const char * const buf, HLPCB * const callback, void * const data)
566 {
567  const auto r = new Xaction(callback, data, buf);
568  submitRequest(r);
569  debugs(84, DBG_DATA, Raw("buf", buf, strlen(buf)));
570 }
571 
572 void
574 {
575  const auto callback = r.request.callback;
576  Assure(callback);
577 
578  r.request.callback = nullptr;
579  void *cbdata = nullptr;
581  callback(cbdata, r.reply);
582 }
583 
586 void
587 helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
588 {
589  if (!hlp || !hlp->trySubmit(buf, callback, data, reservation))
590  SubmissionFailure(hlp, callback, data);
591 }
592 
594 bool
595 statefulhelper::trySubmit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
596 {
597  if (!prepSubmit())
598  return false; // request was dropped
599 
600  submit(buf, callback, data, reservation); // will send or queue
601  return true; // request submitted or queued
602 }
603 
604 void
606 {
607  // clear any old reservation
608  if (srv->reserved()) {
609  reservations.erase(srv->reservationId);
610  srv->clearReservation();
611  }
612 
613  srv->reserve();
614  reservations.insert(Reservations::value_type(srv->reservationId, srv));
615 }
616 
617 void
619 {
620  const auto it = reservations.find(reservation);
621  if (it == reservations.end())
622  return;
623 
624  helper_stateful_server *srv = it->second;
625  reservations.erase(it);
626  srv->clearReservation();
627 
628  // schedule a queue kick
629  AsyncCall::Pointer call = asyncCall(5,4, "helperStatefulServerDone", cbdataDialer(helperStatefulServerDone, srv));
630  ScheduleCallHere(call);
631 }
632 
635 {
636  const auto it = reservations.find(reservation);
637  if (it == reservations.end())
638  return nullptr;
639  return it->second;
640 }
641 
642 void
644 {
645  assert(!reservationId);
646  reservationStart = squid_curtime;
647  reservationId = Helper::ReservationId::Next();
648  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
649 }
650 
651 void
653 {
654  debugs(84, 3, "srv-" << index << " reservation id = " << reservationId);
655  if (!reservationId)
656  return;
657 
658  ++stats.releases;
659 
660  reservationId.clear();
661  reservationStart = 0;
662 }
663 
664 void
665 statefulhelper::submit(const char *buf, HLPCB * callback, void *data, const Helper::ReservationId & reservation)
666 {
667  Helper::Xaction *r = new Helper::Xaction(callback, data, buf);
668 
669  if (buf && reservation) {
670  debugs(84, 5, reservation);
671  helper_stateful_server *lastServer = findServer(reservation);
672  if (!lastServer) {
673  debugs(84, DBG_CRITICAL, "ERROR: Helper " << id_name << " reservation expired (" << reservation << ")");
675  callBack(*r);
676  delete r;
677  return;
678  }
679  debugs(84, 5, "StatefulSubmit dispatching");
680  helperStatefulDispatch(lastServer, r);
681  } else {
683  if ((srv = StatefulGetFirstAvailable(this))) {
684  reserveServer(srv);
685  helperStatefulDispatch(srv, r); // may delete r
686  } else
687  StatefulEnqueue(this, r);
688  }
689 
690  // r may be dangling here
691  syncQueueStats();
692 }
693 
694 void
695 Helper::Client::packStatsInto(Packable * const p, const char * const label) const
696 {
697  if (label)
698  p->appendf("%s:\n", label);
699 
700  p->appendf(" program: %s\n", cmdline->key);
701  p->appendf(" number active: %d of %d (%d shutting down)\n", childs.n_active, childs.n_max, (childs.n_running - childs.n_active));
702  p->appendf(" requests sent: %d\n", stats.requests);
703  p->appendf(" replies received: %d\n", stats.replies);
704  p->appendf(" requests timedout: %d\n", stats.timedout);
705  p->appendf(" queue length: %d\n", stats.queue_size);
706  p->appendf(" avg service time: %d msec\n", stats.avg_svc_time);
707  p->append("\n",1);
708  p->appendf("%7s\t%7s\t%7s\t%11s\t%11s\t%11s\t%6s\t%7s\t%7s\t%7s\n",
709  "ID #",
710  "FD",
711  "PID",
712  "# Requests",
713  "# Replies",
714  "# Timed-out",
715  "Flags",
716  "Time",
717  "Offset",
718  "Request");
719 
720  for (dlink_node *link = servers.head; link; link = link->next) {
721  const auto srv = static_cast<SessionBase *>(link->data);
722  assert(srv);
723  const auto xaction = srv->requests.empty() ? nullptr : srv->requests.front();
724  double tt = 0.001 * (xaction ? tvSubMsec(xaction->request.dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
725  p->appendf("%7u\t%7d\t%7d\t%11" PRIu64 "\t%11" PRIu64 "\t%11" PRIu64 "\t%c%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
726  srv->index.value,
727  srv->readPipe->fd,
728  srv->pid,
729  srv->stats.uses,
730  srv->stats.replies,
731  srv->stats.timedout,
732  srv->stats.pending ? 'B' : ' ',
733  srv->flags.writing ? 'W' : ' ',
734  srv->flags.closing ? 'C' : ' ',
735  srv->reserved() ? 'R' : ' ',
736  srv->flags.shutdown ? 'S' : ' ',
737  xaction && xaction->request.placeholder ? 'P' : ' ',
738  tt < 0.0 ? 0.0 : tt,
739  (int) srv->roffset,
740  xaction ? Format::QuoteMimeBlob(xaction->request.buf) : "(none)");
741  }
742 
743  p->append("\nFlags key:\n"
744  " B\tBUSY\n"
745  " W\tWRITING\n"
746  " C\tCLOSING\n"
747  " R\tRESERVED\n"
748  " S\tSHUTDOWN PENDING\n"
749  " P\tPLACEHOLDER\n", 101);
750 }
751 
752 bool
754  return queueFull() && !(childs.needNew() || GetFirstAvailable(this));
755 }
756 
758 Helper::Client::Make(const char * const name)
759 {
760  return new Client(name);
761 }
762 
764 statefulhelper::Make(const char *name)
765 {
766  return new statefulhelper(name);
767 }
768 
769 void
771 {
772  dlink_node *link = hlp->servers.head;
773 
774  while (link) {
775  const auto srv = static_cast<Helper::Session *>(link->data);
776  link = link->next;
777 
778  if (srv->flags.shutdown) {
779  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
780  continue;
781  }
782 
783  assert(hlp->childs.n_active > 0);
784  -- hlp->childs.n_active;
785  srv->flags.shutdown = true; /* request it to shut itself down */
786 
787  if (srv->flags.closing) {
788  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
789  continue;
790  }
791 
792  if (srv->stats.pending) {
793  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
794  continue;
795  }
796 
797  debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
798  /* the rest of the details is dealt with in the helperServerFree
799  * close handler
800  */
801  srv->closePipesSafely();
802  }
803 
804  Assure(!hlp->childs.n_active);
805  hlp->dropQueued();
806 }
807 
808 void
810 {
811  dlink_node *link = hlp->servers.head;
813 
814  while (link) {
815  srv = (helper_stateful_server *)link->data;
816  link = link->next;
817 
818  if (srv->flags.shutdown) {
819  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " has already SHUT DOWN.");
820  continue;
821  }
822 
823  assert(hlp->childs.n_active > 0);
824  -- hlp->childs.n_active;
825  srv->flags.shutdown = true; /* request it to shut itself down */
826 
827  if (srv->stats.pending) {
828  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is BUSY.");
829  continue;
830  }
831 
832  if (srv->flags.closing) {
833  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is CLOSING.");
834  continue;
835  }
836 
837  if (srv->reserved()) {
838  if (shutting_down) {
839  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Closing anyway.");
840  } else {
841  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " is RESERVED. Not Shutting Down Yet.");
842  continue;
843  }
844  }
845 
846  debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index << " shutting down.");
847 
848  /* the rest of the details is dealt with in the helperStatefulServerFree
849  * close handler
850  */
851  srv->closePipesSafely();
852  }
853 }
854 
856 {
857  /* note, don't free id_name, it probably points to static memory */
858 
859  // A non-empty queue would leak Helper::Xaction objects, stalling any
860  // pending (and even future collapsed) transactions. To avoid stalling
861  // transactions, we must dropQueued(). We ought to do that when we
862  // discover that no progress is possible rather than here because
863  // reference counting may keep this object alive for a long time.
864  assert(queue.empty());
865 }
866 
867 void
869 {
870  if (!srv->flags.shutdown) {
871  assert(childs.n_active > 0);
872  --childs.n_active;
873  debugs(84, DBG_CRITICAL, "WARNING: " << id_name << " #" << srv->index << " exited");
874 
875  handleFewerServers(srv->stats.replies >= 1);
876 
877  if (childs.needNew() > 0) {
878  srv->flags.shutdown = true;
879  openSessions();
880  }
881  }
882 
883  if (!childs.n_active)
884  dropQueued();
885 }
886 
887 void
889 {
890  if (queue.empty())
891  return;
892 
893  Assure(!childs.n_active);
894  Assure(!GetFirstAvailable(this));
895 
896  // no helper servers means nobody can advance our queued transactions
897 
898  debugs(80, DBG_CRITICAL, "ERROR: Dropping " << queue.size() << ' ' <<
899  id_name << " helper requests due to lack of helper processes");
900  // similar to SessionBase::dropQueued()
901  while (const auto r = nextRequest()) {
902  r->reply.result = Helper::Unknown;
903  callBack(*r);
904  delete r;
905  }
906 }
907 
908 void
909 Helper::Client::handleFewerServers(const bool madeProgress)
910 {
911  const auto needNew = childs.needNew();
912 
913  if (!needNew)
914  return; // some server(s) have died, but we still have enough
915 
916  debugs(80, DBG_IMPORTANT, "Too few " << id_name << " processes are running (need " << needNew << "/" << childs.n_max << ")" <<
917  Debug::Extra << "active processes: " << childs.n_active <<
918  Debug::Extra << "processes configured to start at (re)configuration: " << childs.n_startup);
919 
920  if (childs.n_active < childs.n_startup && last_restart > squid_curtime - 30) {
921  if (madeProgress)
922  debugs(80, DBG_CRITICAL, "ERROR: The " << id_name << " helpers are crashing too rapidly, need help!");
923  else
924  fatalf("The %s helpers are crashing too rapidly, need help!", id_name);
925  }
926 }
927 
928 void
930 {
931  srv->helper().handleKilledServer(srv);
932  srv->dropQueued();
933  delete srv;
934 }
935 
937 Helper::Session::popRequest(const int request_number)
938 {
939  Xaction *r = nullptr;
940  if (parent->childs.concurrency) {
941  // If concurrency supported retrieve request from ID
942  const auto it = requestsIndex.find(request_number);
943  if (it != requestsIndex.end()) {
944  r = *(it->second);
945  requests.erase(it->second);
946  requestsIndex.erase(it);
947  }
948  } else if(!requests.empty()) {
949  // Else get the first request from queue, if any
950  r = requests.front();
951  requests.pop_front();
952  }
953 
954  return r;
955 }
956 
958 static void
959 helperReturnBuffer(Helper::Session * srv, const Helper::Client::Pointer &hlp, char * const msg, const size_t msgSize, const char * const msgEnd)
960 {
961  if (Helper::Xaction *r = srv->replyXaction) {
962  const bool hasSpace = r->reply.accumulate(msg, msgSize);
963  if (!hasSpace) {
964  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
965  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
966  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
967  srv->closePipesSafely();
968  return;
969  }
970 
971  if (!msgEnd)
972  return; // We are waiting for more data.
973 
974  bool retry = false;
975  if (cbdataReferenceValid(r->request.data)) {
976  r->reply.finalize();
977  if (r->reply.result == Helper::BrokenHelper && r->request.retries < MAX_RETRIES) {
978  debugs(84, DBG_IMPORTANT, "ERROR: helper: " << r->reply << ", attempt #" << (r->request.retries + 1) << " of 2");
979  retry = true;
980  } else {
981  hlp->callBack(*r);
982  }
983  }
984 
985  -- srv->stats.pending;
986  ++ srv->stats.replies;
987 
988  ++ hlp->stats.replies;
989 
990  srv->answer_time = current_time;
991 
992  srv->dispatch_time = r->request.dispatch_time;
993 
994  hlp->stats.avg_svc_time =
996  tvSubMsec(r->request.dispatch_time, current_time),
998 
999  // release or re-submit parsedRequestXaction object
1000  srv->replyXaction = nullptr;
1001  if (retry) {
1002  ++r->request.retries;
1003  hlp->submitRequest(r);
1004  } else
1005  delete r;
1006  }
1007 
1008  if (hlp->timeout && hlp->childs.concurrency)
1010 
1011  if (!srv->flags.shutdown) {
1012  helperKickQueue(hlp);
1013  } else if (!srv->flags.closing && !srv->stats.pending) {
1014  srv->closeWritePipeSafely();
1015  }
1016 }
1017 
1018 static void
1019 helperHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1020 {
1021  const auto srv = static_cast<Helper::Session *>(data);
1022  const auto hlp = srv->parent;
1024 
1025  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1026 
1027  if (flag == Comm::ERR_CLOSING) {
1028  return;
1029  }
1030 
1031  assert(conn->fd == srv->readPipe->fd);
1032 
1033  debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index);
1034 
1035  if (flag != Comm::OK || len == 0) {
1036  srv->closePipesSafely();
1037  return;
1038  }
1039 
1040  srv->roffset += len;
1041  srv->rbuf[srv->roffset] = '\0';
1042  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1043 
1044  if (!srv->stats.pending && !srv->stats.timedout) {
1045  /* someone spoke without being spoken to */
1046  debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1047  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1048  " bytes '" << srv->rbuf << "'");
1049 
1050  srv->roffset = 0;
1051  srv->rbuf[0] = '\0';
1052  srv->closePipesSafely();
1053  return;
1054  }
1055 
1056  bool needsMore = false;
1057  char *msg = srv->rbuf;
1058  while (*msg && !needsMore) {
1059  int skip = 0;
1060  char *eom = strchr(msg, hlp->eom);
1061  if (eom) {
1062  skip = 1;
1063  debugs(84, 3, "helperHandleRead: end of reply found");
1064  if (eom > msg && eom[-1] == '\r' && hlp->eom == '\n') {
1065  *eom = '\0';
1066  // rewind to the \r octet which is the real terminal now
1067  // and remember that we have to skip forward 2 places now.
1068  skip = 2;
1069  --eom;
1070  }
1071  *eom = '\0';
1072  }
1073 
1074  if (!srv->ignoreToEom && !srv->replyXaction) {
1075  int i = 0;
1076  if (hlp->childs.concurrency) {
1077  char *e = nullptr;
1078  i = strtol(msg, &e, 10);
1079  // Do we need to check for e == msg? Means wrong response from helper.
1080  // Will be dropped as "unexpected reply on channel 0"
1081  needsMore = !(xisspace(*e) || (eom && e == eom));
1082  if (!needsMore) {
1083  msg = e;
1084  while (*msg && xisspace(*msg))
1085  ++msg;
1086  } // else not enough data to compute request number
1087  }
1088  if (!(srv->replyXaction = srv->popRequest(i))) {
1089  if (srv->stats.timedout) {
1090  debugs(84, 3, "Timedout reply received for request-ID: " << i << " , ignore");
1091  } else {
1092  debugs(84, DBG_IMPORTANT, "ERROR: helperHandleRead: unexpected reply on channel " <<
1093  i << " from " << hlp->id_name << " #" << srv->index <<
1094  " '" << srv->rbuf << "'");
1095  }
1096  srv->ignoreToEom = true;
1097  }
1098  } // else we need to just append reply data to the current Xaction
1099 
1100  if (!needsMore) {
1101  size_t msgSize = eom ? eom - msg : (srv->roffset - (msg - srv->rbuf));
1102  assert(msgSize <= srv->rbuf_sz);
1103  helperReturnBuffer(srv, hlp, msg, msgSize, eom);
1104  msg += msgSize + skip;
1105  assert(static_cast<size_t>(msg - srv->rbuf) <= srv->rbuf_sz);
1106 
1107  // The next message should not ignored.
1108  if (eom && srv->ignoreToEom)
1109  srv->ignoreToEom = false;
1110  } else
1111  assert(skip == 0 && eom == nullptr);
1112  }
1113 
1114  if (needsMore) {
1115  size_t msgSize = (srv->roffset - (msg - srv->rbuf));
1116  assert(msgSize <= srv->rbuf_sz);
1117  memmove(srv->rbuf, msg, msgSize);
1118  srv->roffset = msgSize;
1119  srv->rbuf[srv->roffset] = '\0';
1120  } else {
1121  // All of the responses parsed and msg points at the end of read data
1122  assert(static_cast<size_t>(msg - srv->rbuf) == srv->roffset);
1123  srv->roffset = 0;
1124  }
1125 
1126  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1127  int spaceSize = srv->rbuf_sz - srv->roffset - 1;
1128  assert(spaceSize >= 0);
1129 
1130  AsyncCall::Pointer call = commCbCall(5,4, "helperHandleRead",
1132  comm_read(srv->readPipe, srv->rbuf + srv->roffset, spaceSize, call);
1133  }
1134 }
1135 
1136 static void
1137 helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *, size_t len, Comm::Flag flag, int, void *data)
1138 {
1139  char *t = nullptr;
1141  const auto hlp = srv->parent;
1143 
1144  /* Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us */
1145 
1146  if (flag == Comm::ERR_CLOSING) {
1147  return;
1148  }
1149 
1150  assert(conn->fd == srv->readPipe->fd);
1151 
1152  debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
1153  hlp->id_name << " #" << srv->index);
1154 
1155  if (flag != Comm::OK || len == 0) {
1156  srv->closePipesSafely();
1157  return;
1158  }
1159 
1160  srv->roffset += len;
1161  srv->rbuf[srv->roffset] = '\0';
1162  debugs(84, DBG_DATA, Raw("accumulated", srv->rbuf, srv->roffset));
1163 
1164  if (srv->requests.empty()) {
1165  /* someone spoke without being spoken to */
1166  debugs(84, DBG_IMPORTANT, "ERROR: Killing helper process after an unexpected read from " <<
1167  hlp->id_name << " #" << srv->index << ", " << (int)len <<
1168  " bytes '" << srv->rbuf << "'");
1169 
1170  srv->roffset = 0;
1171  srv->closePipesSafely();
1172  return;
1173  }
1174 
1175  if ((t = strchr(srv->rbuf, hlp->eom))) {
1176  debugs(84, 3, "helperStatefulHandleRead: end of reply found");
1177 
1178  if (t > srv->rbuf && t[-1] == '\r' && hlp->eom == '\n') {
1179  *t = '\0';
1180  // rewind to the \r octet which is the real terminal now
1181  --t;
1182  }
1183 
1184  *t = '\0';
1185  }
1186 
1187  const auto r = srv->requests.front();
1188 
1189  if (!r->reply.accumulate(srv->rbuf, t ? (t - srv->rbuf) : srv->roffset)) {
1190  debugs(84, DBG_IMPORTANT, "ERROR: Disconnecting from a " <<
1191  "helper that overflowed " << srv->rbuf_sz << "-byte " <<
1192  "Squid input buffer: " << hlp->id_name << " #" << srv->index);
1193  srv->closePipesSafely();
1194  return;
1195  }
1202  srv->roffset = 0;
1203 
1204  if (t) {
1205  /* end of reply found */
1206  srv->requests.pop_front(); // we already have it in 'r'
1207  int called = 1;
1208 
1209  if (cbdataReferenceValid(r->request.data)) {
1210  r->reply.finalize();
1211  r->reply.reservationId = srv->reservationId;
1212  hlp->callBack(*r);
1213  } else {
1214  debugs(84, DBG_IMPORTANT, "StatefulHandleRead: no callback data registered");
1215  called = 0;
1216  }
1217 
1218  delete r;
1219 
1220  -- srv->stats.pending;
1221  ++ srv->stats.replies;
1222 
1223  ++ hlp->stats.replies;
1224  srv->answer_time = current_time;
1225  hlp->stats.avg_svc_time =
1226  Math::intAverage(hlp->stats.avg_svc_time,
1228  hlp->stats.replies, REDIRECT_AV_FACTOR);
1229 
1230  if (called)
1232  else
1233  hlp->cancelReservation(srv->reservationId);
1234  }
1235 
1236  if (Comm::IsConnOpen(srv->readPipe) && !fd_table[srv->readPipe->fd].closing()) {
1237  int spaceSize = srv->rbuf_sz - 1;
1238 
1239  AsyncCall::Pointer call = commCbCall(5,4, "helperStatefulHandleRead",
1241  comm_read(srv->readPipe, srv->rbuf, spaceSize, call);
1242  }
1243 }
1244 
1246 static void
1247 Enqueue(Helper::Client * const hlp, Helper::Xaction * const r)
1248 {
1249  hlp->queue.push(r);
1250  ++ hlp->stats.queue_size;
1251 
1252  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1253  if (hlp->childs.needNew() > 0) {
1254  hlp->openSessions();
1255  return;
1256  }
1257 
1258  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1259  return;
1260 
1261  if (squid_curtime - hlp->last_queue_warn < 600)
1262  return;
1263 
1265  return;
1266 
1268 
1269  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1270  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1271  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1272 }
1273 
1274 static void
1276 {
1277  hlp->queue.push(r);
1278  ++ hlp->stats.queue_size;
1279 
1280  /* do this first so idle=N has a chance to grow the child pool before it hits critical. */
1281  if (hlp->childs.needNew() > 0) {
1282  hlp->openSessions();
1283  return;
1284  }
1285 
1286  if (hlp->stats.queue_size < (int)hlp->childs.queue_size)
1287  return;
1288 
1289  if (squid_curtime - hlp->last_queue_warn < 600)
1290  return;
1291 
1293  return;
1294 
1296 
1297  debugs(84, DBG_CRITICAL, "WARNING: All " << hlp->childs.n_active << "/" << hlp->childs.n_max << " " << hlp->id_name << " processes are busy.");
1298  debugs(84, DBG_CRITICAL, "WARNING: " << hlp->stats.queue_size << " pending requests queued");
1299  debugs(84, DBG_CRITICAL, "WARNING: Consider increasing the number of " << hlp->id_name << " processes in your config file.");
1300 }
1301 
1304 {
1305  if (queue.empty())
1306  return nullptr;
1307 
1308  auto *r = queue.front();
1309  queue.pop();
1310  --stats.queue_size;
1311  return r;
1312 }
1313 
1314 static Helper::Session *
1316 {
1317  dlink_node *n;
1318  Helper::Session *selected = nullptr;
1319  debugs(84, 5, "GetFirstAvailable: Running servers " << hlp->childs.n_running);
1320 
1321  if (hlp->childs.n_running == 0)
1322  return nullptr;
1323 
1324  /* Find "least" loaded helper (approx) */
1325  for (n = hlp->servers.head; n != nullptr; n = n->next) {
1326  const auto srv = static_cast<Helper::Session *>(n->data);
1327 
1328  if (selected && selected->stats.pending <= srv->stats.pending)
1329  continue;
1330 
1331  if (srv->flags.shutdown)
1332  continue;
1333 
1334  if (!srv->stats.pending)
1335  return srv;
1336 
1337  if (selected) {
1338  selected = srv;
1339  break;
1340  }
1341 
1342  selected = srv;
1343  }
1344 
1345  if (!selected) {
1346  debugs(84, 5, "GetFirstAvailable: None available.");
1347  return nullptr;
1348  }
1349 
1350  if (selected->stats.pending >= (hlp->childs.concurrency ? hlp->childs.concurrency : 1)) {
1351  debugs(84, 3, "GetFirstAvailable: Least-loaded helper is fully loaded!");
1352  return nullptr;
1353  }
1354 
1355  debugs(84, 5, "GetFirstAvailable: returning srv-" << selected->index);
1356  return selected;
1357 }
1358 
1359 static helper_stateful_server *
1361 {
1362  dlink_node *n;
1363  helper_stateful_server *srv = nullptr;
1364  helper_stateful_server *oldestReservedServer = nullptr;
1365  debugs(84, 5, "StatefulGetFirstAvailable: Running servers " << hlp->childs.n_running);
1366 
1367  if (hlp->childs.n_running == 0)
1368  return nullptr;
1369 
1370  for (n = hlp->servers.head; n != nullptr; n = n->next) {
1371  srv = (helper_stateful_server *)n->data;
1372 
1373  if (srv->stats.pending)
1374  continue;
1375 
1376  if (srv->reserved()) {
1378  if (!oldestReservedServer)
1379  oldestReservedServer = srv;
1380  else if (oldestReservedServer->reservationStart < srv->reservationStart)
1381  oldestReservedServer = srv;
1382  debugs(84, 5, "the earlier reserved server is the srv-" << oldestReservedServer->index);
1383  }
1384  continue;
1385  }
1386 
1387  if (srv->flags.shutdown)
1388  continue;
1389 
1390  debugs(84, 5, "StatefulGetFirstAvailable: returning srv-" << srv->index);
1391  return srv;
1392  }
1393 
1394  if (oldestReservedServer) {
1395  debugs(84, 5, "expired reservation " << oldestReservedServer->reservationId << " for srv-" << oldestReservedServer->index);
1396  return oldestReservedServer;
1397  }
1398 
1399  debugs(84, 5, "StatefulGetFirstAvailable: None available.");
1400  return nullptr;
1401 }
1402 
1403 static void
1404 helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
1405 {
1406  const auto srv = static_cast<Helper::Session *>(data);
1407 
1408  srv->writebuf->clean();
1409  delete srv->writebuf;
1410  srv->writebuf = nullptr;
1411  srv->flags.writing = false;
1412 
1413  if (flag != Comm::OK) {
1414  /* Helper server has crashed */
1415  debugs(84, DBG_CRITICAL, "helperDispatch: Helper " << srv->parent->id_name << " #" << srv->index << " has crashed");
1416  return;
1417  }
1418 
1419  if (!srv->wqueue->isNull()) {
1420  srv->writebuf = srv->wqueue;
1421  srv->wqueue = new MemBuf;
1422  srv->flags.writing = true;
1423  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1425  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1426  }
1427 }
1428 
1429 static void
1431 {
1432  const auto hlp = srv->parent;
1433  const uint64_t reqId = ++srv->nextRequestId;
1434 
1435  if (!cbdataReferenceValid(r->request.data)) {
1436  debugs(84, DBG_IMPORTANT, "ERROR: helperDispatch: invalid callback data");
1437  delete r;
1438  return;
1439  }
1440 
1441  r->request.Id = reqId;
1442  const auto it = srv->requests.insert(srv->requests.end(), r);
1444 
1445  if (srv->wqueue->isNull())
1446  srv->wqueue->init();
1447 
1448  if (hlp->childs.concurrency) {
1449  srv->requestsIndex.insert(Helper::Session::RequestIndex::value_type(reqId, it));
1450  assert(srv->requestsIndex.size() == srv->requests.size());
1451  srv->wqueue->appendf("%" PRIu64 " %s", reqId, r->request.buf);
1452  } else
1453  srv->wqueue->append(r->request.buf, strlen(r->request.buf));
1454 
1455  if (!srv->flags.writing) {
1456  assert(nullptr == srv->writebuf);
1457  srv->writebuf = srv->wqueue;
1458  srv->wqueue = new MemBuf;
1459  srv->flags.writing = true;
1460  AsyncCall::Pointer call = commCbCall(5,5, "helperDispatchWriteDone",
1462  Comm::Write(srv->writePipe, srv->writebuf->content(), srv->writebuf->contentSize(), call, nullptr);
1463  }
1464 
1465  debugs(84, 5, "helperDispatch: Request sent to " << hlp->id_name << " #" << srv->index << ", " << strlen(r->request.buf) << " bytes");
1466 
1467  ++ srv->stats.uses;
1468  ++ srv->stats.pending;
1469  ++ hlp->stats.requests;
1470 }
1471 
1472 static void
1474 {}
1475 
1476 static void
1478 {
1479  const auto hlp = srv->parent;
1480 
1481  if (!cbdataReferenceValid(r->request.data)) {
1482  debugs(84, DBG_IMPORTANT, "ERROR: helperStatefulDispatch: invalid callback data");
1483  delete r;
1484  hlp->cancelReservation(srv->reservationId);
1485  return;
1486  }
1487 
1488  debugs(84, 9, "helperStatefulDispatch busying helper " << hlp->id_name << " #" << srv->index);
1489 
1490  assert(srv->reservationId);
1491  r->reply.reservationId = srv->reservationId;
1492 
1493  if (r->request.placeholder == 1) {
1494  /* a callback is needed before this request can _use_ a helper. */
1495  /* we don't care about releasing this helper. The request NEVER
1496  * gets to the helper. So we throw away the return code */
1498  hlp->callBack(*r);
1499  /* throw away the placeholder */
1500  delete r;
1501  /* and push the queue. Note that the callback may have submitted a new
1502  * request to the helper which is why we test for the request */
1503 
1504  if (!srv->requests.size())
1506 
1507  return;
1508  }
1509 
1510  srv->requests.push_back(r);
1511  srv->dispatch_time = current_time;
1512  AsyncCall::Pointer call = commCbCall(5,5, "helperStatefulDispatchWriteDone",
1514  Comm::Write(srv->writePipe, r->request.buf, strlen(r->request.buf), call, nullptr);
1515  debugs(84, 5, "helperStatefulDispatch: Request sent to " <<
1516  hlp->id_name << " #" << srv->index << ", " <<
1517  (int) strlen(r->request.buf) << " bytes");
1518 
1519  ++ srv->stats.uses;
1520  ++ srv->stats.pending;
1521  ++ hlp->stats.requests;
1522 }
1523 
1524 static void
1526 {
1527  Helper::Xaction *r = nullptr;
1528  Helper::Session *srv = nullptr;
1529 
1530  while ((srv = GetFirstAvailable(hlp)) && (r = hlp->nextRequest()))
1531  helperDispatch(srv, r);
1532 
1533  if (!hlp->childs.n_active)
1534  hlp->dropQueued();
1535 }
1536 
1537 static void
1539 {
1540  Helper::Xaction *r;
1542  while ((srv = StatefulGetFirstAvailable(hlp)) && (r = hlp->nextRequest())) {
1543  debugs(84, 5, "found srv-" << srv->index);
1544  hlp->reserveServer(srv);
1545  helperStatefulDispatch(srv, r);
1546  }
1547 
1548  if (!hlp->childs.n_active)
1549  hlp->dropQueued();
1550 }
1551 
1552 static void
1554 {
1555  if (!srv->flags.shutdown) {
1557  } else if (!srv->flags.closing && !srv->reserved() && !srv->stats.pending) {
1558  srv->closeWritePipeSafely();
1559  return;
1560  }
1561 }
1562 
1563 void
1565 {
1566  assert(parent->childs.concurrency);
1567  while(!requests.empty() && requests.front()->request.timedOut(parent->timeout)) {
1568  const auto r = requests.front();
1569  RequestIndex::iterator it;
1570  it = requestsIndex.find(r->request.Id);
1571  assert(it != requestsIndex.end());
1572  requestsIndex.erase(it);
1573  requests.pop_front();
1574  debugs(84, 2, "Request " << r->request.Id << " timed-out, remove it from queue");
1575  bool retried = false;
1576  if (retry && r->request.retries < MAX_RETRIES && cbdataReferenceValid(r->request.data)) {
1577  debugs(84, 2, "Retry request " << r->request.Id);
1578  ++r->request.retries;
1579  parent->submitRequest(r);
1580  retried = true;
1581  } else if (cbdataReferenceValid(r->request.data)) {
1582  if (!parent->onTimedOutResponse.isEmpty()) {
1583  if (r->reply.accumulate(parent->onTimedOutResponse.rawContent(), parent->onTimedOutResponse.length()))
1584  r->reply.finalize();
1585  else
1586  r->reply.result = Helper::TimedOut;
1587  parent->callBack(*r);
1588  } else {
1589  r->reply.result = Helper::TimedOut;
1590  parent->callBack(*r);
1591  }
1592  }
1593  --stats.pending;
1594  ++stats.timedout;
1595  ++parent->stats.timedout;
1596  if (!retried)
1597  delete r;
1598  }
1599 }
1600 
1601 void
1603 {
1604  debugs(26, 3, io.conn);
1605  const auto srv = static_cast<Session *>(io.data);
1606 
1607  srv->checkForTimedOutRequests(srv->parent->retryTimedOut);
1608 
1609  debugs(84, 3, io.conn << " establish a new timeout");
1610  AsyncCall::Pointer timeoutCall = commCbCall(84, 4, "Helper::Session::requestTimeout",
1612 
1613  const time_t timeSpent = srv->requests.empty() ? 0 : (squid_curtime - srv->requests.front()->request.dispatch_time.tv_sec);
1614  const time_t minimumNewTimeout = 1; // second
1615  const auto timeLeft = max(minimumNewTimeout, srv->parent->timeout - timeSpent);
1616 
1617  commSetConnTimeout(io.conn, timeLeft, timeoutCall);
1618 }
1619 
static void helperReturnBuffer(Helper::Session *srv, const Helper::Client::Pointer &hlp, char *const msg, const size_t msgSize, const char *const msgEnd)
Calls back with a pointer to the buffer with the helper output.
Definition: helper.cc:959
#define FD_DESC_SZ
Definition: defines.h:32
static IOCB helperStatefulHandleRead
Definition: helper.cc:46
uint64_t replies
Definition: helper.h:252
time_t reservationStart
when the last reservation was made
Definition: helper.h:333
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:942
Xaction * nextRequest()
Definition: helper.cc:1303
void openSessions() override
Definition: helper.cc:328
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
#define DBG_CRITICAL
Definition: Stream.h:37
static Helper::Session * GetFirstAvailable(const Helper::Client::Pointer &)
Definition: helper.cc:1315
unsigned int queue_size
Definition: ChildConfig.h:91
static void helperKickQueue(const Helper::Client::Pointer &)
Definition: helper.cc:1525
#define xmalloc
static void helperStatefulDispatch(helper_stateful_server *srv, Helper::Xaction *r)
Definition: helper.cc:1477
struct timeval dispatch_time
Definition: helper.h:236
helper_stateful_server * findServer(const Helper::ReservationId &reservation)
Definition: helper.cc:634
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:239
uint64_t nextRequestId
Definition: helper.h:267
@ Error
Definition: ResultCode.h:19
void fd_note(int fd, const char *s)
Definition: fd.cc:211
@ actDie
kill the caller process (i.e., Squid worker)
Definition: ChildConfig.h:95
#define CBDATA_NAMESPACED_CLASS_INIT(namespace, type)
Definition: cbdata.h:333
uint64_t uses
Definition: helper.h:251
static IOCB helperHandleRead
Definition: helper.cc:45
@ Unknown
Definition: ResultCode.h:17
#define MAX_RETRIES
The maximum allowed request retries.
Definition: helper.cc:39
static Pointer Make(const char *name)
Definition: helper.cc:764
~SessionBase() override
Definition: helper.cc:135
Xaction * replyXaction
Definition: helper.h:278
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
virtual Client & helper() const =0
our creator (parent) object
char * QuoteMimeBlob(const char *header)
Definition: Quoting.cc:43
void handleFewerServers(bool madeProgress)
Definition: helper.cc:909
static void helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag, int, void *)
Definition: helper.cc:1473
struct Helper::SessionBase::_helper_flags flags
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:93
void submitRequest(Xaction *)
Definition: helper.cc:456
uint64_t Id
Definition: Request.h:47
virtual void append(const char *buf, int size)=0
Appends a c-string to existing packed data.
InstanceIdDefinitions(Helper::SessionBase, "Hlpr")
#define xstrdup
Comm::ConnectionPointer writePipe
Definition: helper.h:229
~helper_stateful_server() override
Definition: helper.cc:172
char * buf
Definition: Request.h:41
void packStatsInto(Packable *p, const char *label=nullptr) const
Dump some stats about the helper state to a Packable object.
Definition: helper.cc:695
const A & max(A const &lhs, A const &rhs)
#define PRIu64
Definition: types.h:114
int commSetNonBlocking(int fd)
Definition: comm.cc:1044
Client::Pointer parent
Definition: helper.h:272
Definition: cbdata.cc:37
time_t last_queue_warn
Definition: helper.h:124
Helper::Reply reply
Definition: helper.h:46
bool trySubmit(const char *buf, HLPCB *callback, void *data)
If possible, submit request. Otherwise, either kill Squid or return false.
Definition: helper.cc:554
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
@ OK
Definition: Flag.h:16
void callBack(Xaction &)
sends transaction response to the transaction initiator
Definition: helper.cc:573
dlink_node link
Definition: helper.h:239
static void requestTimeout(const CommTimeoutCbParams &io)
Read timeout handler.
Definition: helper.cc:1602
@ ERR_CLOSING
Definition: Flag.h:24
int intAverage(const int, const int, int, const int)
Definition: SquidMath.cc:40
static Pointer Make(const char *name)
Definition: helper.cc:758
unsigned int n_running
Definition: ChildConfig.h:80
HLPCB * callback
Definition: Request.h:42
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
void submit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:665
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
void helperStatefulSubmit(const statefulhelper::Pointer &hlp, const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
Definition: helper.cc:587
void cancelReservation(const Helper::ReservationId reservation)
undo reserveServer(), clear the reservation and kick the queue
Definition: helper.cc:618
#define DBG_DATA
Definition: Stream.h:40
static void * hIpc
Definition: IcmpSquid.cc:34
static pid_t pid
Definition: IcmpSquid.cc:35
static void helperDispatchWriteDone(const Comm::ConnectionPointer &, char *, size_t, Comm::Flag flag, int, void *data)
Definition: helper.cc:1404
void helperShutdown(const Helper::Client::Pointer &hlp)
Definition: helper.cc:770
Definition: Raw.h:20
void reserveServer(helper_stateful_server *srv)
reserve the given server
Definition: helper.cc:605
void comm_read(const Comm::ConnectionPointer &conn, char *buf, int len, AsyncCall::Pointer &callback)
Definition: Read.h:59
UnaryCbdataDialer< Argument1 > cbdataDialer(typename UnaryCbdataDialer< Argument1 >::Handler *handler, Argument1 *arg1)
void closeWritePipeSafely()
Definition: helper.cc:99
#define HELPER_MAX_ARGS
Definition: helper.cc:36
Comm::ConnectionPointer readPipe
Definition: helper.h:228
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
int isNull() const
Definition: MemBuf.cc:145
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:18
void helperStatefulShutdown(const statefulhelper::Pointer &hlp)
Definition: helper.cc:809
MemBuf * writebuf
Definition: helper.h:270
MemBuf * wqueue
Definition: helper.h:269
void append(const char *c, int sz) override
Definition: MemBuf.cc:209
int needNew() const
Definition: ChildConfig.cc:60
void checkForTimedOutRequests(bool const retry)
Definition: helper.cc:1564
int placeholder
Definition: Request.h:45
void handleKilledServer(SessionBase *)
Definition: helper.cc:868
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
bool reserved() override
whether the server is locked for exclusive use by a client
Definition: helper.h:323
struct timeval answer_time
Definition: helper.h:237
struct timeval dispatch_time
Definition: Request.h:46
static ReservationId Next()
Helper::ResultCode result
The helper response 'result' field.
Definition: Reply.h:59
virtual ~Client()
Definition: helper.cc:855
static void Enqueue(Helper::Client *, Helper::Xaction *)
Handles a request when all running helpers, if any, are busy.
Definition: helper.cc:1247
const size_t ReadBufSize(128 *1024)
Definition: MemBuf.h:23
static void HelperServerClosed(SessionBase *)
close handler to handle exited server processes
Definition: helper.cc:929
Requests requests
requests in order of submission/expiration
Definition: helper.h:248
struct Helper::Client::_stats stats
bool overloaded() const
Definition: helper.cc:494
void clean()
Definition: MemBuf.cc:110
struct Helper::SessionBase::@53 stats
bool willOverload() const
Definition: helper.cc:753
ChildConfig childs
Configuration settings for number running.
Definition: helper.h:119
void IOCB(const Comm::ConnectionPointer &conn, char *, size_t size, Comm::Flag flag, int xerrno, void *data)
Definition: CommCalls.h:34
int reconfiguring
const InstanceId< SessionBase > index
Definition: helper.h:224
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
Ip::Address addr
Definition: helper.h:227
#define safe_free(x)
Definition: xalloc.h:73
void submit(const char *buf, HLPCB *callback, void *data)
dispatches or enqueues a helper requests; does not enforce queue limits
Definition: helper.cc:565
CommCbFunPtrCallT< Dialer > * commCbCall(int debugSection, int debugLevel, const char *callName, const Dialer &dialer)
Definition: CommCalls.h:312
#define assert(EX)
Definition: assert.h:17
static void helperDispatch(Helper::Session *, Helper::Xaction *)
Definition: helper.cc:1430
SSL Connection
Definition: Session.h:49
a (temporary) lock on a (stateful) helper channel
Definition: ReservationId.h:17
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
unsigned int n_max
Definition: ChildConfig.h:48
helper protocol primitives
Definition: ChildConfig.h:12
std::queue< Xaction * > queue
Definition: helper.h:117
void helperSubmit(const Helper::Client::Pointer &hlp, const char *const buf, HLPCB *const callback, void *const data)
Definition: helper.cc:481
void * data
Definition: Request.h:43
#define Assure(condition)
Definition: Assure.h:35
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
pid_t ipcCreate(int type, const char *prog, const char *const args[], const char *name, Ip::Address &local_addr, int *rfd, int *wfd, void **hIpc)
Definition: ipc.cc:69
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:325
const char * id_name
Definition: helper.h:118
time_t squid_curtime
Definition: stub_libtime.cc:20
static std::ostream & Extra(std::ostream &)
Definition: debug.cc:1316
Xaction * popRequest(int requestId)
Definition: helper.cc:937
time_t reservationTimeout
older stateful helper server reservations may be forgotten
Definition: ChildConfig.h:109
void initStats()
Definition: helper.cc:63
wordlist * next
Definition: wordlist.h:60
bool retryTimedOut
Whether the timed-out requests must retried.
Definition: helper.h:127
static void helperStatefulKickQueue(const statefulhelper::Pointer &)
Definition: helper.cc:1538
Flag
Definition: Flag.h:15
#define REDIRECT_AV_FACTOR
Definition: defines.h:51
bool prepSubmit()
Definition: helper.cc:527
#define fd_table
Definition: fde.h:189
time_t timeout
Requests timeout.
Definition: helper.h:126
virtual void dropQueued()
dequeues and sends an Unknown answer to all queued requests
Definition: helper.cc:123
static void helperStatefulServerDone(helper_stateful_server *srv)
Definition: helper.cc:1553
void commSetConnTimeout(const Comm::ConnectionPointer &conn, time_t timeout, AsyncCall::Pointer &callback)
Definition: comm.cc:594
char * key
Definition: wordlist.h:59
bool trySubmit(const char *buf, HLPCB *callback, void *data, const Helper::ReservationId &reservation)
reserved servers indexed by reservation IDs
Definition: helper.cc:595
Helper::ReservationId reservationId
The stateful replies should include the reservation ID.
Definition: Reply.h:65
static helper_stateful_server * StatefulGetFirstAvailable(const statefulhelper::Pointer &)
Definition: helper.cc:1360
char * content()
start of the added data
Definition: MemBuf.h:41
statefulhelper::Pointer parent
Definition: helper.h:326
void closePipesSafely()
Definition: helper.cc:73
void dropQueued()
Definition: helper.cc:888
~Session() override
Definition: helper.cc:143
#define Important(id)
Definition: Messages.h:93
#define DBG_IMPORTANT
Definition: Stream.h:38
uint64_t pending
Definition: helper.h:253
static void SubmissionFailure(const Helper::Client::Pointer &hlp, HLPCB *callback, void *data)
handles helperSubmit() and helperStatefulSubmit() failures
Definition: helper.cc:468
void dropQueued() override
dequeues and sends an Unknown answer to all queued requests
Definition: helper.cc:166
@ BrokenHelper
Definition: ResultCode.h:20
Helper::Request request
Definition: helper.h:45
int shutting_down
void HLPCB(void *, const Helper::Reply &)
Definition: forward.h:33
static void StatefulEnqueue(statefulhelper *hlp, Helper::Xaction *r)
Definition: helper.cc:1275
SSL_SESSION Session
Definition: Session.h:51
virtual void openSessions()
Definition: helper.cc:189
#define xisspace(x)
Definition: xis.h:15
unsigned int concurrency
Definition: ChildConfig.h:72
RequestIndex requestsIndex
maps request IDs to requests
Definition: helper.h:285
char progname[]
void syncQueueStats()
synchronizes queue-dependent measurements with the current queue state
Definition: helper.cc:500
unsigned int n_active
Definition: ChildConfig.h:86
dlink_list servers
Definition: helper.h:116
represents a single helper process
Definition: helper.h:192
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
Helper::ReservationId reservationId
"confirmation ID" of the last
Definition: helper.h:332
@ TimedOut
Definition: ResultCode.h:21
int unsigned int
Definition: stub_fd.cc:19
void memFreeBuf(size_t size, void *)
Definition: minimal.cc:67
Holds the required data to serve a helper request.
Definition: helper.h:41
bool queueFull() const
whether queuing an additional request would overload the helper
Definition: helper.cc:489

 

Introduction

Documentation

Support

Miscellaneous