peer_digest.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 72 Peer Digest Routines */
10 
11 #include "squid.h"
12 #if USE_CACHE_DIGESTS
13 #include "base/IoManip.h"
14 #include "CacheDigest.h"
15 #include "CachePeer.h"
16 #include "event.h"
17 #include "FwdState.h"
18 #include "globals.h"
19 #include "HttpReply.h"
20 #include "HttpRequest.h"
21 #include "internal.h"
22 #include "MemObject.h"
23 #include "mime_header.h"
24 #include "neighbors.h"
25 #include "PeerDigest.h"
26 #include "Store.h"
27 #include "store_key_md5.h"
28 #include "StoreClient.h"
29 #include "tools.h"
30 #include "util.h"
31 
32 /* local types */
33 
34 /* local prototypes */
35 static time_t peerDigestIncDelay(const PeerDigest * pd);
36 static time_t peerDigestNewDelay(const StoreEntry * e);
37 static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
39 static void peerDigestRequest(PeerDigest * pd);
41 static int peerDigestFetchReply(void *, char *, ssize_t);
42 int peerDigestSwapInCBlock(void *, char *, ssize_t);
43 int peerDigestSwapInMask(void *, char *, ssize_t);
44 static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
45 static void finishAndDeleteFetch(DigestFetchState *, const char *reason, bool sawError);
46 static void peerDigestFetchSetStats(DigestFetchState * fetch);
47 static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
48 static int peerDigestUseful(const PeerDigest * pd);
49 
50 /* local constants */
51 Version const CacheDigestVer = { 5, 3 };
52 
53 #define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
54 
55 /* min interval for requesting digests from a given peer */
56 static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
57 /* min interval for requesting digests (cumulative request stream) */
58 static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
59 
60 /* local vars */
61 
62 static time_t pd_last_req_time = 0; /* last call to Check */
63 
65  peer(p),
66  host(peer->host) // if peer disappears, we will know its name
67 {
68  times.initialized = squid_curtime;
69 }
70 
72 
74 
76  pd(aPd),
77  entry(nullptr),
78  old_entry(nullptr),
79  sc(nullptr),
80  old_sc(nullptr),
81  request(req),
82  offset(0),
83  mask_offset(0),
84  start_time(squid_curtime),
85  resp_time(0),
86  expires(0),
87  bufofs(0),
88  state(DIGEST_READ_REPLY)
89 {
91 
92  sent.msg = 0;
93  sent.bytes = 0;
94 
95  recv.msg = 0;
96  recv.bytes = 0;
97 
98  *buf = 0;
99 }
100 
102 {
103  if (old_entry) {
104  debugs(72, 3, "deleting old entry");
107  old_entry->unlock("DigestFetchState destructed old");
108  old_entry = nullptr;
109  }
110 
111  /* unlock everything */
112  storeUnregister(sc, entry, this);
113 
114  entry->unlock("DigestFetchState destructed");
115  entry = nullptr;
116 
118 }
119 
121 {
122  if (times.next_check && eventFind(peerDigestCheck, this))
124  delete cd;
125  // req_result pointer is not owned by us
126 }
127 
128 /* called by peer to indicate that somebody actually needs this digest */
129 void
131 {
132  assert(pd);
133  assert(!pd->flags.needed);
134  assert(!pd->cd);
135 
136  pd->flags.needed = true;
137  pd->times.needed = squid_curtime;
138  peerDigestSetCheck(pd, 0); /* check asap */
139 }
140 
141 /* increment retry delay [after an unsuccessful attempt] */
142 static time_t
144 {
145  assert(pd);
146  return pd->times.retry_delay > 0 ?
147  2 * pd->times.retry_delay : /* exponential backoff */
148  PeerDigestReqMinGap; /* minimal delay */
149 }
150 
151 /* artificially increases Expires: setting to avoid race conditions
152  * returns the delay till that [increased] expiration time */
153 static time_t
155 {
156  assert(e);
157 
158  if (e->expires > 0)
160 
161  return PeerDigestReqMinGap;
162 }
163 
164 /* registers next digest verification */
165 static void
166 peerDigestSetCheck(PeerDigest * pd, time_t delay)
167 {
168  eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
169  pd->times.next_check = squid_curtime + delay;
170  debugs(72, 3, "peerDigestSetCheck: will check peer " << pd->host << " in " << delay << " secs");
171 }
172 
173 /* callback for eventAdd() (with peer digest locked)
174  * request new digest if our copy is too old or if we lack one;
175  * schedule next check otherwise */
176 static void
177 peerDigestCheck(void *data)
178 {
179  PeerDigest *pd = (PeerDigest *)data;
180  time_t req_time;
181 
182  assert(!pd->flags.requested);
183 
184  pd->times.next_check = 0; /* unknown */
185 
186  Assure(pd->peer.valid());
187 
188  debugs(72, 3, "cache_peer " << RawPointer(pd->peer).orNil());
189  debugs(72, 3, "peerDigestCheck: time: " << squid_curtime <<
190  ", last received: " << (long int) pd->times.received << " (" <<
191  std::showpos << (int) (squid_curtime - pd->times.received) << ")");
192 
193  /* decide when we should send the request:
194  * request now unless too close to other requests */
195  req_time = squid_curtime;
196 
197  /* per-peer limit */
198 
199  if (req_time - pd->times.received < PeerDigestReqMinGap) {
200  debugs(72, 2, "peerDigestCheck: " << pd->host <<
201  ", avoiding close peer requests (" <<
202  (int) (req_time - pd->times.received) << " < " <<
203  (int) PeerDigestReqMinGap << " secs).");
204 
205  req_time = pd->times.received + PeerDigestReqMinGap;
206  }
207 
208  /* global limit */
209  if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
210  debugs(72, 2, "peerDigestCheck: " << pd->host <<
211  ", avoiding close requests (" <<
212  (int) (req_time - pd_last_req_time) << " < " <<
213  (int) GlobDigestReqMinGap << " secs).");
214 
216  }
217 
218  if (req_time <= squid_curtime)
219  peerDigestRequest(pd); /* will set pd->flags.requested */
220  else
221  peerDigestSetCheck(pd, req_time - squid_curtime);
222 }
223 
224 /* ask store for a digest */
225 static void
227 {
228  const auto p = pd->peer.get(); // TODO: Replace with a reference.
229  StoreEntry *e, *old_e;
230  char *url = nullptr;
231  HttpRequest *req;
232  StoreIOBuffer tempBuffer;
233 
234  pd->req_result = nullptr;
235  pd->flags.requested = true;
236 
237  /* compute future request components */
238 
239  if (p->digest_url)
240  url = xstrdup(p->digest_url);
241  else
242  url = xstrdup(internalRemoteUri(p->secure.encryptTransport, p->host, p->http_port, "/squid-internal-periodic/", SBuf(StoreDigestFileName)));
243  debugs(72, 2, url);
244 
245  const auto mx = MasterXaction::MakePortless<XactionInitiator::initCacheDigest>();
246  req = HttpRequest::FromUrlXXX(url, mx);
247 
248  assert(req);
249 
250  /* add custom headers */
251  assert(!req->header.len);
252 
254 
255  req->header.putStr(Http::HdrType::ACCEPT, "text/html");
256 
257  if (p->login &&
258  p->login[0] != '*' &&
259  strcmp(p->login, "PASS") != 0 &&
260  strcmp(p->login, "PASSTHRU") != 0 &&
261  strncmp(p->login, "NEGOTIATE",9) != 0 &&
262  strcmp(p->login, "PROXYPASS") != 0) {
263  req->url.userInfo(SBuf(p->login)); // XXX: performance regression make peer login SBuf as well.
264  }
265  /* create fetch state structure */
266  DigestFetchState *fetch = new DigestFetchState(pd, req);
267 
268  /* update timestamps */
271  req->flags.cachable.support(); // prevent RELEASE_REQUEST in storeCreateEntry()
272 
273  /* the rest is based on clientReplyContext::processExpired() */
274  req->flags.refresh = true;
275 
276  old_e = fetch->old_entry = storeGetPublicByRequest(req);
277 
278  // XXX: Missing a hittingRequiresCollapsing() && startCollapsingOn() check.
279  if (old_e) {
280  debugs(72, 5, "found old " << *old_e);
281 
282  old_e->lock("peerDigestRequest");
283  old_e->ensureMemObject(url, url, req->method);
284 
285  fetch->old_sc = storeClientListAdd(old_e, fetch);
286  }
287 
288  e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
289  debugs(72, 5, "created " << *e);
291  fetch->sc = storeClientListAdd(e, fetch);
292  /* set lastmod to trigger IMS request if possible */
293 
294  // TODO: Also check for fetch->pd->cd presence as a precondition for sending
295  // IMS requests because peerDigestFetchReply() does not accept 304 responses
296  // without an in-memory cache digest.
297  if (old_e)
298  e->lastModified(old_e->lastModified());
299 
300  /* push towards peer cache */
302 
303  tempBuffer.offset = 0;
304 
305  tempBuffer.length = SM_PAGE_SIZE;
306 
307  tempBuffer.data = fetch->buf;
308 
309  storeClientCopy(fetch->sc, e, tempBuffer,
310  peerDigestHandleReply, fetch);
311 
312  safe_free(url);
313 }
314 
315 /* Handle the data copying .. */
316 
317 /*
318  * This routine handles the copy data and then redirects the
319  * copy to a bunch of subfunctions depending upon the copy state.
320  * It also tracks the buffer offset and "seen", since I'm actually
321  * not interested in rewriting everything to suit my little idea.
322  */
323 static void
324 peerDigestHandleReply(void *data, StoreIOBuffer receivedData)
325 {
326  DigestFetchState *fetch = (DigestFetchState *)data;
327  int retsize = -1;
328  digest_read_state_t prevstate;
329  int newsize;
330 
331  if (receivedData.flags.error) {
332  finishAndDeleteFetch(fetch, "failure loading digest reply from Store", true);
333  return;
334  }
335 
336  if (!fetch->pd) {
337  finishAndDeleteFetch(fetch, "digest disappeared while loading digest reply from Store", true);
338  return;
339  }
340 
341  /* The existing code assumes that the received pointer is
342  * where we asked the data to be put
343  */
344  assert(!receivedData.data || fetch->buf + fetch->bufofs == receivedData.data);
345 
346  /* Update the buffer size */
347  fetch->bufofs += receivedData.length;
348 
349  assert(fetch->bufofs <= SM_PAGE_SIZE);
350 
351  /* If we've fetched enough, return */
352 
353  if (peerDigestFetchedEnough(fetch, fetch->buf, fetch->bufofs, "peerDigestHandleReply"))
354  return;
355 
356  /* Call the right function based on the state */
357  /* (Those functions will update the state if needed) */
358 
359  /* Give us a temporary reference. Some of the calls we make may
360  * try to destroy the fetch structure, and we like to know if they
361  * do
362  */
363  CbcPointer<DigestFetchState> tmpLock = fetch;
364 
365  /* Repeat this loop until we're out of data OR the state changes */
366  /* (So keep going if the state has changed and we still have data */
367  do {
368  prevstate = fetch->state;
369 
370  switch (fetch->state) {
371 
372  case DIGEST_READ_REPLY:
373  retsize = peerDigestFetchReply(fetch, fetch->buf, fetch->bufofs);
374  break;
375 
376  case DIGEST_READ_CBLOCK:
377  retsize = peerDigestSwapInCBlock(fetch, fetch->buf, fetch->bufofs);
378  break;
379 
380  case DIGEST_READ_MASK:
381  retsize = peerDigestSwapInMask(fetch, fetch->buf, fetch->bufofs);
382  break;
383 
384  case DIGEST_READ_NONE:
385  break;
386 
387  default:
388  fatal("Bad digest transfer mode!\n");
389  }
390 
391  if (retsize < 0)
392  return;
393 
394  /*
395  * The returned size indicates how much of the buffer was read -
396  * so move the remainder of the buffer to the beginning
397  * and update the bufofs / bufsize
398  */
399  newsize = fetch->bufofs - retsize;
400 
401  memmove(fetch->buf, fetch->buf + retsize, fetch->bufofs - newsize);
402 
403  fetch->bufofs = newsize;
404 
405  } while (cbdataReferenceValid(fetch) && prevstate != fetch->state && fetch->bufofs > 0);
406 
407  // Check for EOF here, thus giving the parser one extra run. We could avoid this overhead by
408  // checking at the beginning of this function. However, in this case, we would have to require
409  // that the parser does not regard EOF as a special condition (it is true now but may change
410  // in the future).
411  if (fetch->sc->atEof()) {
412  finishAndDeleteFetch(fetch, "premature end of digest reply", true);
413  return;
414  }
415 
416  /* Update the copy offset */
417  fetch->offset += receivedData.length;
418 
419  /* Schedule another copy */
420  if (cbdataReferenceValid(fetch)) {
421  StoreIOBuffer tempBuffer;
422  tempBuffer.offset = fetch->offset;
423  tempBuffer.length = SM_PAGE_SIZE - fetch->bufofs;
424  tempBuffer.data = fetch->buf + fetch->bufofs;
425  storeClientCopy(fetch->sc, fetch->entry, tempBuffer,
426  peerDigestHandleReply, fetch);
427  }
428 }
429 
431 static int
432 peerDigestFetchReply(void *data, char *buf, ssize_t size)
433 {
434  DigestFetchState *fetch = (DigestFetchState *)data;
435  const auto pd = fetch->pd.get();
436  assert(pd && buf);
437  assert(!fetch->offset);
438 
439  assert(fetch->state == DIGEST_READ_REPLY);
440 
441  if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
442  return -1;
443 
444  {
445  const auto &reply = fetch->entry->mem().freshestReply();
446  const auto status = reply.sline.status();
447  assert(status != Http::scNone);
448  debugs(72, 3, "peerDigestFetchReply: " << pd->host << " status: " << status <<
449  ", expires: " << (long int) reply.expires << " (" << std::showpos <<
450  (int) (reply.expires - squid_curtime) << ")");
451 
452  /* this "if" is based on clientHandleIMSReply() */
453 
454  if (status == Http::scNotModified) {
455  /* our old entry is fine */
456  assert(fetch->old_entry);
457 
458  if (!fetch->old_entry->mem_obj->request)
459  fetch->old_entry->mem_obj->request = fetch->entry->mem_obj->request;
460 
461  assert(fetch->old_entry->mem_obj->request);
462 
463  if (!Store::Root().updateOnNotModified(fetch->old_entry, *fetch->entry)) {
464  finishAndDeleteFetch(fetch, "header update failure after a 304 response", true);
465  return -1;
466  }
467 
468  /* get rid of 304 reply */
469  storeUnregister(fetch->sc, fetch->entry, fetch);
470 
471  fetch->entry->unlock("peerDigestFetchReply 304");
472 
473  fetch->entry = fetch->old_entry;
474 
475  fetch->old_entry = nullptr;
476 
477  /* preserve request -- we need its size to update counters */
478  /* requestUnlink(r); */
479  /* fetch->entry->mem_obj->request = nullptr; */
480 
481  if (!fetch->pd->cd) {
482  finishAndDeleteFetch(fetch, "304 without the old in-memory digest", true);
483  return -1;
484  }
485 
486  // stay with the old in-memory digest
487  finishAndDeleteFetch(fetch, "Not modified", false);
488  return -1;
489  } else if (status == Http::scOkay) {
490  /* get rid of old entry if any */
491 
492  if (fetch->old_entry) {
493  debugs(72, 3, "peerDigestFetchReply: got new digest, releasing old one");
494  storeUnregister(fetch->old_sc, fetch->old_entry, fetch);
495  fetch->old_entry->releaseRequest();
496  fetch->old_entry->unlock("peerDigestFetchReply 200");
497  fetch->old_entry = nullptr;
498  }
499 
500  fetch->state = DIGEST_READ_CBLOCK;
501  } else {
502  /* some kind of a bug */
503  finishAndDeleteFetch(fetch, reply.sline.reason(), true);
504  return -1; /* XXX -1 will abort stuff in ReadReply! */
505  }
506  }
507 
508  return 0; // we consumed/used no buffered bytes
509 }
510 
511 int
512 peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
513 {
514  DigestFetchState *fetch = (DigestFetchState *)data;
515 
516  assert(fetch->state == DIGEST_READ_CBLOCK);
517 
518  if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
519  return -1;
520 
521  if (size >= (ssize_t)StoreDigestCBlockSize) {
522  const auto pd = fetch->pd.get();
523 
524  assert(pd);
525  assert(fetch->entry->mem_obj);
526 
527  if (peerDigestSetCBlock(pd, buf)) {
528  /* XXX: soon we will have variable header size */
529  /* switch to CD buffer and fetch digest guts */
530  buf = nullptr;
531  assert(pd->cd->mask);
532  fetch->state = DIGEST_READ_MASK;
533  return StoreDigestCBlockSize;
534  } else {
535  finishAndDeleteFetch(fetch, "invalid digest cblock", true);
536  return -1;
537  }
538  }
539 
540  /* need more data, do we have space? */
541  if (size >= SM_PAGE_SIZE) {
542  finishAndDeleteFetch(fetch, "digest cblock too big", true);
543  return -1;
544  }
545 
546  return 0; /* We need more data */
547 }
548 
549 int
550 peerDigestSwapInMask(void *data, char *buf, ssize_t size)
551 {
552  DigestFetchState *fetch = (DigestFetchState *)data;
553  const auto pd = fetch->pd.get();
554  assert(pd);
555  assert(pd->cd && pd->cd->mask);
556 
557  /*
558  * NOTENOTENOTENOTENOTE: buf doesn't point to pd->cd->mask anymore!
559  * we need to do the copy ourselves!
560  */
561  memcpy(pd->cd->mask + fetch->mask_offset, buf, size);
562 
563  /* NOTE! buf points to the middle of pd->cd->mask! */
564 
565  if (peerDigestFetchedEnough(fetch, nullptr, size, "peerDigestSwapInMask"))
566  return -1;
567 
568  fetch->mask_offset += size;
569 
570  if (fetch->mask_offset >= pd->cd->mask_size) {
571  debugs(72, 2, "peerDigestSwapInMask: Done! Got " <<
572  fetch->mask_offset << ", expected " << pd->cd->mask_size);
573  assert(fetch->mask_offset == pd->cd->mask_size);
574  assert(peerDigestFetchedEnough(fetch, nullptr, 0, "peerDigestSwapInMask"));
575  return -1; /* XXX! */
576  }
577 
578  /* We always read everything, so return size */
579  return size;
580 }
581 
582 static int
583 peerDigestFetchedEnough(DigestFetchState * fetch, char *, ssize_t size, const char *step_name)
584 {
585  static const SBuf hostUnknown("<unknown>"); // peer host (if any)
586  SBuf host = hostUnknown;
587 
588  const auto pd = fetch->pd.get();
589  Assure(pd);
590  const char *reason = nullptr; /* reason for completion */
591  const char *no_bug = nullptr; /* successful completion if set */
592 
593  /* test possible exiting conditions (the same for most steps!)
594  * cases marked with '?!' should not happen */
595 
596  if (!reason) {
597  host = pd->host;
598  }
599 
600  debugs(72, 6, step_name << ": peer " << host << ", offset: " <<
601  fetch->offset << " size: " << size << ".");
602 
603  /* continue checking (with pd and host known and valid) */
604 
605  if (!reason) {
606  if (size < 0)
607  reason = "swap failure";
608  else if (!fetch->entry)
609  reason = "swap aborted?!";
610  else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
611  reason = "swap aborted";
612  }
613 
614  /* continue checking (maybe-successful eof case) */
615  if (!reason && !size && fetch->state != DIGEST_READ_REPLY) {
616  if (!pd->cd)
617  reason = "null digest?!";
618  else if (fetch->mask_offset != pd->cd->mask_size)
619  reason = "premature end of digest?!";
620  else if (!peerDigestUseful(pd))
621  reason = "useless digest";
622  else
623  reason = no_bug = "success";
624  }
625 
626  /* finish if we have a reason */
627  if (reason) {
628  const int level = strstr(reason, "?!") ? 1 : 3;
629  debugs(72, level, "" << step_name << ": peer " << host << ", exiting after '" << reason << "'");
630  finishAndDeleteFetch(fetch, reason, !no_bug);
631  }
632 
633  return reason != nullptr;
634 }
635 
636 /* complete the digest transfer, update stats, unlock/release everything */
637 static void
638 finishAndDeleteFetch(DigestFetchState * const fetch, const char * const reason, const bool err)
639 {
640  assert(reason);
641 
642  debugs(72, 2, "peer: " << RawPointer(fetch->pd.valid() ? fetch->pd->peer : nullptr).orNil() << ", reason: " << reason << ", err: " << err);
643 
644  /* note: order is significant */
646  if (const auto pd = fetch->pd.get())
647  pd->noteFetchFinished(*fetch, reason, err);
648 
649  delete fetch;
650 }
651 
652 void
653 PeerDigest::noteFetchFinished(const DigestFetchState &finishedFetch, const char * const outcomeDescription, const bool sawError)
654 {
655  const auto pd = this; // TODO: remove this diff reducer
656  const auto fetch = &finishedFetch; // TODO: remove this diff reducer
657 
658  pd->flags.requested = false;
659  pd->req_result = outcomeDescription;
660 
661  pd->times.received = squid_curtime;
662  pd->times.req_delay = fetch->resp_time;
663  pd->stats.sent.kbytes += fetch->sent.bytes;
664  pd->stats.recv.kbytes += fetch->recv.bytes;
665  pd->stats.sent.msgs += fetch->sent.msg;
666  pd->stats.recv.msgs += fetch->recv.msg;
667 
668  if (sawError) {
669  debugs(72, DBG_IMPORTANT, "disabling (" << outcomeDescription << ") digest from " << host);
670 
671  pd->times.retry_delay = peerDigestIncDelay(pd);
672  peerDigestSetCheck(pd, pd->times.retry_delay);
673  delete pd->cd;
674  pd->cd = nullptr;
675 
676  pd->flags.usable = false;
677  } else {
678  pd->flags.usable = true;
679  pd->times.retry_delay = 0;
680  peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
681 
682  /* XXX: ugly condition, but how? */
683 
684  if (fetch->entry->store_status == STORE_OK)
685  debugs(72, 2, "re-used old digest from " << host);
686  else
687  debugs(72, 2, "received valid digest from " << host);
688  }
689 }
690 
691 /* calculate fetch stats after completion */
692 static void
694 {
695  MemObject *mem;
696  assert(fetch->entry && fetch->request);
697 
698  mem = fetch->entry->mem_obj;
699  assert(mem);
700 
701  /* XXX: outgoing numbers are not precise */
702  /* XXX: we must distinguish between 304 hits and misses here */
703  fetch->sent.bytes = fetch->request->prefixLen();
704  /* XXX: this is slightly wrong: we don't KNOW that the entire memobject
705  * was fetched. We only know how big it is
706  */
707  fetch->recv.bytes = mem->size();
708  fetch->sent.msg = fetch->recv.msg = 1;
709  fetch->expires = fetch->entry->expires;
710  fetch->resp_time = squid_curtime - fetch->start_time;
711 
712  debugs(72, 3, "peerDigestFetchFinish: recv " << fetch->recv.bytes <<
713  " bytes in " << (int) fetch->resp_time << " secs");
714 
715  debugs(72, 3, "peerDigestFetchFinish: expires: " <<
716  (long int) fetch->expires << " (" << std::showpos <<
717  (int) (fetch->expires - squid_curtime) << "), lmt: " <<
718  std::noshowpos << (long int) fetch->entry->lastModified() << " (" <<
719  std::showpos << (int) (fetch->entry->lastModified() - squid_curtime) <<
720  ")");
721 
722  statCounter.cd.kbytes_sent += fetch->sent.bytes;
723  statCounter.cd.kbytes_recv += fetch->recv.bytes;
724  statCounter.cd.msgs_sent += fetch->sent.msg;
725  statCounter.cd.msgs_recv += fetch->recv.msg;
726 }
727 
728 static int
729 peerDigestSetCBlock(PeerDigest * pd, const char *buf)
730 {
731  StoreDigestCBlock cblock;
732  int freed_size = 0;
733  const auto host = pd->host;
734 
735  memcpy(&cblock, buf, sizeof(cblock));
736  /* network -> host conversions */
737  cblock.ver.current = ntohs(cblock.ver.current);
738  cblock.ver.required = ntohs(cblock.ver.required);
739  cblock.capacity = ntohl(cblock.capacity);
740  cblock.count = ntohl(cblock.count);
741  cblock.del_count = ntohl(cblock.del_count);
742  cblock.mask_size = ntohl(cblock.mask_size);
743  debugs(72, 2, "got digest cblock from " << host << "; ver: " <<
744  (int) cblock.ver.current << " (req: " << (int) cblock.ver.required <<
745  ")");
746 
747  debugs(72, 2, "\t size: " <<
748  cblock.mask_size << " bytes, e-cnt: " <<
749  cblock.count << ", e-util: " <<
750  xpercentInt(cblock.count, cblock.capacity) << "%" );
751  /* check version requirements (both ways) */
752 
753  if (cblock.ver.required > CacheDigestVer.current) {
754  debugs(72, DBG_IMPORTANT, "" << host << " digest requires version " <<
755  cblock.ver.required << "; have: " << CacheDigestVer.current);
756 
757  return 0;
758  }
759 
760  if (cblock.ver.current < CacheDigestVer.required) {
761  debugs(72, DBG_IMPORTANT, "" << host << " digest is version " <<
762  cblock.ver.current << "; we require: " <<
764 
765  return 0;
766  }
767 
768  /* check consistency */
769  if (cblock.ver.required > cblock.ver.current ||
770  cblock.mask_size <= 0 || cblock.capacity <= 0 ||
771  cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
772  debugs(72, DBG_CRITICAL, "" << host << " digest cblock is corrupted.");
773  return 0;
774  }
775 
776  /* check consistency further */
777  if ((size_t)cblock.mask_size != CacheDigest::CalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
778  debugs(72, DBG_CRITICAL, host << " digest cblock is corrupted " <<
779  "(mask size mismatch: " << cblock.mask_size << " ? " <<
781  << ").");
782  return 0;
783  }
784 
785  /* there are some things we cannot do yet */
787  debugs(72, DBG_CRITICAL, "ERROR: " << host << " digest: unsupported #hash functions: " <<
788  cblock.hash_func_count << " ? " << CacheDigestHashFuncCount << ".");
789  return 0;
790  }
791 
792  /*
793  * no cblock bugs below this point
794  */
795  /* check size changes */
796  if (pd->cd && cblock.mask_size != (ssize_t)pd->cd->mask_size) {
797  debugs(72, 2, host << " digest changed size: " << cblock.mask_size <<
798  " -> " << pd->cd->mask_size);
799  freed_size = pd->cd->mask_size;
800  delete pd->cd;
801  pd->cd = nullptr;
802  }
803 
804  if (!pd->cd) {
805  debugs(72, 2, "creating " << host << " digest; size: " << cblock.mask_size << " (" <<
806  std::showpos << (int) (cblock.mask_size - freed_size) << ") bytes");
807  pd->cd = new CacheDigest(cblock.capacity, cblock.bits_per_entry);
808 
809  if (cblock.mask_size >= freed_size)
810  statCounter.cd.memory += (cblock.mask_size - freed_size);
811  }
812 
813  assert(pd->cd);
814  /* these assignments leave us in an inconsistent state until we finish reading the digest */
815  pd->cd->count = cblock.count;
816  pd->cd->del_count = cblock.del_count;
817  return 1;
818 }
819 
820 static int
822 {
823  /* TODO: we should calculate the prob of a false hit instead of bit util */
824  const auto bit_util = pd->cd->usedMaskPercent();
825 
826  if (bit_util > 65.0) {
827  debugs(72, DBG_CRITICAL, "WARNING: " << pd->host <<
828  " peer digest has too many bits on (" << bit_util << "%).");
829  return 0;
830  }
831 
832  return 1;
833 }
834 
835 static int
836 saneDiff(time_t diff)
837 {
838  return abs((int) diff) > squid_curtime / 2 ? 0 : diff;
839 }
840 
841 void
843 {
844 #define f2s(flag) (pd->flags.flag ? "yes" : "no")
845 #define appendTime(tm) storeAppendPrintf(e, "%s\t %10ld\t %+d\t %+d\n", \
846  ""#tm, (long int)pd->times.tm, \
847  saneDiff(pd->times.tm - squid_curtime), \
848  saneDiff(pd->times.tm - pd->times.initialized))
849 
850  assert(pd);
851 
852  auto host = pd->host;
853  storeAppendPrintf(e, "\npeer digest from " SQUIDSBUFPH "\n", SQUIDSBUFPRINT(host));
854 
855  cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
856 
857  storeAppendPrintf(e, "\nevent\t timestamp\t secs from now\t secs from init\n");
858  appendTime(initialized);
859  appendTime(needed);
860  appendTime(requested);
861  appendTime(received);
862  appendTime(next_check);
863 
864  storeAppendPrintf(e, "peer digest state:\n");
865  storeAppendPrintf(e, "\tneeded: %3s, usable: %3s, requested: %3s\n",
866  f2s(needed), f2s(usable), f2s(requested));
867  storeAppendPrintf(e, "\n\tlast retry delay: %d secs\n",
868  (int) pd->times.retry_delay);
869  storeAppendPrintf(e, "\tlast request response time: %d secs\n",
870  (int) pd->times.req_delay);
871  storeAppendPrintf(e, "\tlast request result: %s\n",
872  pd->req_result ? pd->req_result : "(none)");
873 
874  storeAppendPrintf(e, "\npeer digest traffic:\n");
875  storeAppendPrintf(e, "\trequests sent: %d, volume: %d KB\n",
876  pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
877  storeAppendPrintf(e, "\treplies recv: %d, volume: %d KB\n",
878  pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
879 
880  storeAppendPrintf(e, "\npeer digest structure:\n");
881 
882  if (pd->cd)
883  cacheDigestReport(pd->cd, host, e);
884  else
885  storeAppendPrintf(e, "\tno in-memory copy\n");
886 }
887 
888 #endif
889 
unsigned char bits_per_entry
Definition: PeerDigest.h:34
void fatal(const char *message)
Definition: fatal.cc:28
static void finishAndDeleteFetch(DigestFetchState *, const char *reason, bool sawError)
Definition: peer_digest.cc:638
int eventFind(EVH *func, void *arg)
Definition: event.cc:145
double usedMaskPercent() const
percentage of mask bits which are used
Definition: CacheDigest.cc:183
Cbc * get() const
a temporary valid raw Cbc pointer or NULL
Definition: CbcPointer.h:159
static EVH peerDigestCheck
Definition: peer_digest.cc:38
#define SM_PAGE_SIZE
Definition: defines.h:63
void userInfo(const SBuf &s)
Definition: Uri.h:70
#define StoreDigestCBlockSize
Definition: peer_digest.cc:53
#define DBG_CRITICAL
Definition: Stream.h:37
struct StoreIOBuffer::@130 flags
AnyP::Uri url
the request URI
Definition: HttpRequest.h:115
bool needed
Definition: PeerDigest.h:92
void ensureMemObject(const char *storeId, const char *logUri, const HttpRequestMethod &aMethod)
initialize mem_obj (if needed) and set URIs/method (if missing)
Definition: store.cc:1589
@ DIGEST_READ_NONE
Definition: enums.h:198
#define appendTime(tm)
void releaseRequest(const bool shareable=false)
Definition: store.cc:458
time_t req_delay
Definition: PeerDigest.h:104
digest_read_state_t state
Definition: PeerDigest.h:71
digest_read_state_t
Definition: enums.h:197
@ scNone
Definition: StatusCode.h:21
SupportOrVeto cachable
whether the response may be stored in the cache
Definition: RequestFlags.h:35
HttpHeader header
Definition: Message.h:74
void eventDelete(EVH *func, void *arg)
Definition: event.cc:127
StoreEntry * entry
Definition: PeerDigest.h:53
MemObject * mem_obj
Definition: Store.h:220
RequestFlags flags
Definition: HttpRequest.h:141
int peerDigestSwapInMask(void *, char *, ssize_t)
Definition: peer_digest.cc:550
static int saneDiff(time_t diff)
Definition: peer_digest.cc:836
struct StatCounters::@117 cd
ByteCounter memory
Definition: StatCounters.h:106
CacheDigest * cd
Definition: PeerDigest.h:87
MemObject & mem()
Definition: Store.h:47
time_t next_check
Definition: PeerDigest.h:101
void storeAppendPrintf(StoreEntry *e, const char *fmt,...)
Definition: store.cc:855
void lock(const char *context)
Definition: store.cc:445
@ ENTRY_ABORTED
Definition: enums.h:110
@ KEY_PRIVATE
Definition: enums.h:97
bool requested
Definition: PeerDigest.h:94
int64_t offset
Definition: StoreIOBuffer.h:58
Definition: SBuf.h:93
#define xstrdup
store_client * old_sc
Definition: PeerDigest.h:56
const Version CacheDigestVer
Definition: peer_digest.cc:51
time_t start_time
Definition: PeerDigest.h:60
uint16_t flags
Definition: Store.h:231
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
void cacheDigestGuessStatsReport(const CacheDigestGuessStats *stats, StoreEntry *sentry, const SBuf &label)
Definition: CacheDigest.cc:209
struct DigestFetchState::@74 sent
static void peerDigestFetchSetStats(DigestFetchState *fetch)
Definition: peer_digest.cc:693
Http::StatusLine sline
Definition: HttpReply.h:56
time_t expires
Definition: Store.h:225
void HTTPMSGUNLOCK(M *&a)
Definition: Message.h:150
static int peerDigestSetCBlock(PeerDigest *pd, const char *buf)
Definition: peer_digest.cc:729
const SBuf host
copy of peer->host
Definition: PeerDigest.h:88
uint32_t mask_offset
Definition: PeerDigest.h:59
StoreEntry * storeGetPublicByRequest(HttpRequest *req, const KeyScope keyScope)
Definition: store.cc:516
CbcPointer< PeerDigest > pd
Definition: PeerDigest.h:52
static const time_t GlobDigestReqMinGap
Definition: peer_digest.cc:58
void peerDigestNeeded(PeerDigest *pd)
Definition: peer_digest.cc:130
StoreEntry * old_entry
Definition: PeerDigest.h:54
ByteCounter kbytes_sent
Definition: StatCounters.h:77
void peerDigestStatsReport(const PeerDigest *pd, StoreEntry *e)
Definition: peer_digest.cc:842
struct DigestFetchState::@74 recv
struct PeerDigest::@77 stats
static STCB peerDigestHandleReply
Definition: peer_digest.cc:40
CbcPointer< CachePeer > peer
pointer back to peer structure, argh
Definition: PeerDigest.h:86
store_client * sc
Definition: PeerDigest.h:55
static const time_t PeerDigestReqMinGap
Definition: peer_digest.cc:56
const char * StoreDigestFileName
char * internalRemoteUri(bool encrypt, const char *host, unsigned short port, const char *dir, const SBuf &name)
Definition: internal.cc:95
int size
Definition: ModDevPoll.cc:69
struct PeerDigest::@75 flags
void noteFetchFinished(const DigestFetchState &, const char *outcomeDescription, bool sawError)
Definition: peer_digest.cc:653
void cacheDigestReport(CacheDigest *cd, const SBuf &label, StoreEntry *e)
Definition: CacheDigest.cc:245
#define SQUIDSBUFPRINT(s)
Definition: SBuf.h:32
static int peerDigestFetchedEnough(DigestFetchState *fetch, char *buf, ssize_t size, const char *step_name)
Definition: peer_digest.cc:583
short int required
Definition: PeerDigest.h:21
Http::StatusCode status() const
retrieve the status code for this status line
Definition: StatusLine.h:45
time_t resp_time
Definition: PeerDigest.h:61
unsigned error
Definition: StoreIOBuffer.h:55
#define EBIT_TEST(flag, bit)
Definition: defines.h:67
int unlock(const char *context)
Definition: store.cc:469
#define safe_free(x)
Definition: xalloc.h:73
time_t retry_delay
Definition: PeerDigest.h:102
#define assert(EX)
Definition: assert.h:17
static uint32_t CalcMaskSize(uint64_t cap, uint8_t bpe)
Definition: CacheDigest.cc:273
const char * StoreDigestMimeStr
static time_t peerDigestIncDelay(const PeerDigest *pd)
Definition: peer_digest.cc:143
static int peerDigestFetchReply(void *, char *, ssize_t)
handle HTTP response headers in the initial storeClientCopy() response
Definition: peer_digest.cc:432
int xpercentInt(double part, double whole)
Definition: util.cc:46
void HTTPMSGLOCK(Http::Message *a)
Definition: Message.h:161
int64_t size() const
Definition: MemObject.cc:229
uint64_t del_count
Definition: CacheDigest.h:56
#define Assure(condition)
Definition: Assure.h:35
@ DIGEST_READ_CBLOCK
Definition: enums.h:200
static int sc[16]
Definition: smbdes.c:121
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:325
int prefixLen() const
Definition: HttpRequest.cc:369
@ STORE_OK
Definition: enums.h:45
time_t squid_curtime
Definition: stub_libtime.cc:20
int peerDigestSwapInCBlock(void *, char *, ssize_t)
Definition: peer_digest.cc:512
PeerDigest(CachePeer *)
Definition: peer_digest.cc:64
StoreEntry * storeCreateEntry(const char *url, const char *logUrl, const RequestFlags &flags, const HttpRequestMethod &method)
Definition: store.cc:759
#define f2s(flag)
@ scNotModified
Definition: StatusCode.h:41
void EVH(void *)
Definition: event.h:18
static void peerDigestRequest(PeerDigest *pd)
Definition: peer_digest.cc:226
static void peerDigestSetCheck(PeerDigest *pd, time_t delay)
Definition: peer_digest.cc:166
HttpRequest * request
Definition: PeerDigest.h:57
HttpRequestMethod method
Definition: HttpRequest.h:114
short int current
Definition: PeerDigest.h:20
char buf[SM_PAGE_SIZE]
Definition: PeerDigest.h:69
static time_t pd_last_req_time
Definition: peer_digest.cc:62
void putStr(Http::HdrType id, const char *str)
Definition: HttpHeader.cc:995
const char * req_result
Definition: PeerDigest.h:89
struct PeerDigest::@76 times
static HttpRequest * FromUrlXXX(const char *url, const MasterXaction::Pointer &, const HttpRequestMethod &method=Http::METHOD_GET)
Definition: HttpRequest.cc:528
CacheDigestGuessStats guess
Definition: PeerDigest.h:110
#define DBG_IMPORTANT
Definition: Stream.h:38
@ DIGEST_READ_MASK
Definition: enums.h:201
const HttpReply & freshestReply() const
Definition: MemObject.h:68
unsigned char hash_func_count
Definition: PeerDigest.h:35
DigestFetchState(PeerDigest *, HttpRequest *)
Definition: peer_digest.cc:75
static time_t peerDigestNewDelay(const StoreEntry *e)
Definition: peer_digest.cc:154
time_t received
Definition: PeerDigest.h:105
@ DIGEST_READ_REPLY
Definition: enums.h:199
static void fwdStart(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *)
Same as Start() but no master xaction info (AccessLogEntry) available.
Definition: FwdState.cc:407
bool atEof() const
Definition: StoreClient.h:106
void lastModified(const time_t when)
Definition: Store.h:175
uint32_t mask_size
Definition: CacheDigest.h:59
@ scOkay
Definition: StatusCode.h:27
void(void *, StoreIOBuffer) STCB
Definition: StoreClient.h:32
RawPointerT< Pointer > RawPointer(const char *label, const Pointer &ptr)
convenience wrapper for creating RawPointerT<> objects
Definition: IoManip.h:73
uint64_t count
Definition: CacheDigest.h:55
int storeUnregister(store_client *sc, StoreEntry *e, void *data)
void storeClientCopy(store_client *sc, StoreEntry *e, StoreIOBuffer copyInto, STCB *callback, void *data)
static int peerDigestUseful(const PeerDigest *pd)
Definition: peer_digest.cc:821
struct PeerDigest::@77::@78 recv
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
ByteCounter kbytes_recv
Definition: StatCounters.h:80
HttpRequestPointer request
Definition: MemObject.h:205
int CacheDigestHashFuncCount
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
#define SQUIDSBUFPH
Definition: SBuf.h:31
struct PeerDigest::@77::@78 sent
store_client * storeClientListAdd(StoreEntry *e, void *data)
ssize_t bufofs
Definition: PeerDigest.h:70
StatCounters statCounter
Definition: StatCounters.cc:12
Controller & Root()
safely access controller singleton
Definition: Controller.cc:926

 

Introduction

Documentation

Support

Miscellaneous