RockRebuild.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Disk IO Routines */
10 
11 #include "squid.h"
12 #include "base/AsyncJobCalls.h"
13 #include "compat/unistd.h"
14 #include "debug/Messages.h"
15 #include "fs/rock/RockDbCell.h"
16 #include "fs/rock/RockRebuild.h"
17 #include "fs/rock/RockSwapDir.h"
18 #include "fs_io.h"
19 #include "globals.h"
20 #include "md5.h"
21 #include "sbuf/Stream.h"
22 #include "SquidMath.h"
23 #include "Store.h"
24 #include "tools.h"
25 
26 #include <array>
27 #include <cerrno>
28 #include <cstring>
29 
31 
77 namespace Rock
78 {
79 
80 static bool
81 DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
82 {
83  return loadingPos >= dbSlotLimit;
84 }
85 
86 static bool
87 DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
88 {
89  // paranoid slot checking is only enabled with squid -S
90  const auto extraWork = opt_store_doublecheck ? dbSlotLimit : 0;
91  return validationPos >= (dbEntryLimit + extraWork);
92 }
93 
96 {
97 public:
98  LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {}
99 
100  /* for LoadingEntry */
101  uint8_t state:3;
102  uint8_t anchored:1;
103 
104  /* for LoadingSlot */
105  uint8_t mapped:1;
106  uint8_t finalized:1;
107  uint8_t freed:1;
108 };
109 
112 {
113 public:
114  LoadingEntry(const sfileno fileNo, LoadingParts &source);
115 
116  uint64_t &size;
117  uint32_t &version;
118 
121 
122  /* LoadingFlags::state */
123  State state() const { return static_cast<State>(flags.state); }
124  void state(State aState) const { flags.state = aState; }
125 
126  /* LoadingFlags::anchored */
127  bool anchored() const { return flags.anchored; }
128  void anchored(const bool beAnchored) { flags.anchored = beAnchored; }
129 
130 private:
132 };
133 
136 {
137 public:
138  LoadingSlot(const SlotId slotId, LoadingParts &source);
139 
142 
143  /* LoadingFlags::mapped */
144  bool mapped() const { return flags.mapped; }
145  void mapped(const bool beMapped) { flags.mapped = beMapped; }
146 
147  /* LoadingFlags::finalized */
148  bool finalized() const { return flags.finalized; }
149  void finalized(const bool beFinalized) { flags.finalized = beFinalized; }
150 
151  /* LoadingFlags::freed */
152  bool freed() const { return flags.freed; }
153  void freed(const bool beFreed) { flags.freed = beFreed; }
154 
155  bool used() const { return freed() || mapped() || more != -1; }
156 
157 private:
159 };
160 
164 {
165 public:
170 
171  LoadingParts(const SwapDir &dir, const bool resuming);
172  ~LoadingParts();
173 
174  // lacking copying/moving code and often too huge to copy
175  LoadingParts(LoadingParts&&) = delete;
176 
177  Sizes &sizes() const { return *sizesOwner->object(); }
178  Versions &versions() const { return *versionsOwner->object(); }
179  Mores &mores() const { return *moresOwner->object(); }
180  Flags &flags() const { return *flagsOwner->object(); }
181 
182 private:
183  /* Anti-padding storage. With millions of entries, padding matters! */
184 
185  /* indexed by sfileno */
188 
189  /* indexed by SlotId */
191 
192  /* entry flags are indexed by sfileno; slot flags -- by SlotId */
194 };
195 
196 } /* namespace Rock */
197 
198 /* LoadingEntry */
199 
201  size(source.sizes().at(fileNo)),
202  version(source.versions().at(fileNo)),
203  flags(source.flags().at(fileNo))
204 {
205 }
206 
207 /* LoadingSlot */
208 
210  more(source.mores().at(slotId)),
211  flags(source.flags().at(slotId))
212 {
213 }
214 
215 /* LoadingParts */
216 
217 template <class T>
218 inline typename T::Owner *
219 createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
220 {
221  auto id = Ipc::Mem::Segment::Name(SBuf(dirPath), sfx);
222  return resuming ? Ipc::Mem::Owner<T>::Old(id.c_str()) : shm_new(T)(id.c_str(), limit);
223 }
224 
225 Rock::LoadingParts::LoadingParts(const SwapDir &dir, const bool resuming):
226  sizesOwner(createOwner<Sizes>(dir.path, "rebuild_sizes", dir.entryLimitActual(), resuming)),
227  versionsOwner(createOwner<Versions>(dir.path, "rebuild_versions", dir.entryLimitActual(), resuming)),
228  moresOwner(createOwner<Mores>(dir.path, "rebuild_mores", dir.slotLimitActual(), resuming)),
229  flagsOwner(createOwner<Flags>(dir.path, "rebuild_flags", dir.slotLimitActual(), resuming))
230 {
231  assert(sizes().capacity == versions().capacity); // every entry has both fields
232  assert(sizes().capacity <= mores().capacity); // every entry needs slot(s)
233  assert(mores().capacity == flags().capacity); // every slot needs a set of flags
234 
235  if (!resuming) {
236  // other parts rely on shared memory segments being zero-initialized
237  // TODO: refactor the next slot pointer to use 0 for nil values
238  mores().fill(-1);
239  }
240 }
241 
243 {
244  delete sizesOwner;
245  delete versionsOwner;
246  delete moresOwner;
247  delete flagsOwner;
248 }
249 
250 /* Rock::Rebuild::Stats */
251 
252 SBuf
253 Rock::Rebuild::Stats::Path(const char *dirPath)
254 {
255  return Ipc::Mem::Segment::Name(SBuf(dirPath), "rebuild_stats");
256 }
257 
260 {
261  return shm_new(Stats)(Path(dir.path).c_str());
262 }
263 
264 bool
266 {
267  return DoneLoading(counts.scancount, dir.slotLimitActual()) &&
269 }
270 
271 /* Rebuild */
272 
273 bool
275 {
276  // in SMP mode, only the disker is responsible for populating the map
277  return !UsingSmp() || IamDiskProcess();
278 }
279 
280 bool
282 {
283  if (!IsResponsible(dir)) {
284  debugs(47, 2, "not responsible for indexing cache_dir #" <<
285  dir.index << " from " << dir.filePath);
286  return false;
287  }
288 
289  const auto stats = shm_old(Rebuild::Stats)(Stats::Path(dir.path).c_str());
290  if (stats->completed(dir)) {
291  debugs(47, 2, "already indexed cache_dir #" <<
292  dir.index << " from " << dir.filePath);
293  return false;
294  }
295 
296  AsyncJob::Start(new Rebuild(&dir, stats));
297  return true;
298 }
299 
301  sd(dir),
302  parts(nullptr),
303  stats(s),
304  dbSize(0),
305  dbSlotSize(0),
306  dbSlotLimit(0),
307  dbEntryLimit(0),
308  fd(-1),
309  dbOffset(0),
310  loadingPos(stats->counts.scancount),
311  validationPos(stats->counts.validations),
312  counts(stats->counts),
313  resuming(stats->counts.started())
314 {
315  assert(sd);
316  dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
321  registerRunner();
322 }
323 
325 {
326  if (fd >= 0)
327  file_close(fd);
328  // normally, segments are used until the Squid instance quits,
329  // but these indexing-only segments are no longer needed
330  delete parts;
331 }
332 
333 void
335 {
336  mustStop("startShutdown");
337 }
338 
340 void
342 {
343  assert(IsResponsible(*sd));
344 
345  if (!resuming) {
346  debugs(47, Important(18), "Loading cache_dir #" << sd->index <<
347  " from " << sd->filePath);
348  } else {
349  debugs(47, Important(63), "Resuming indexing cache_dir #" << sd->index <<
350  " from " << sd->filePath << ':' << progressDescription());
351  }
352 
353  fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
354  if (fd < 0)
355  failure("cannot open db", errno);
356 
357  char hdrBuf[SwapDir::HeaderSize];
358  if (xread(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize)
359  failure("cannot read db header", errno);
360 
361  // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours
362  assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
363  buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
364 
365  dbOffset = SwapDir::HeaderSize + loadingPos * dbSlotSize;
366 
367  assert(!parts);
368  parts = new LoadingParts(*sd, resuming);
369 
371 
372  checkpoint();
373 }
374 
376 void
378 {
379  if (!done())
380  eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
381 }
382 
383 bool
385 {
386  return DoneLoading(loadingPos, dbSlotLimit);
387 }
388 
389 bool
391 {
392  return DoneValidating(validationPos, dbSlotLimit, dbEntryLimit);
393 }
394 
395 bool
397 {
398  return doneLoading() && doneValidating() && AsyncJob::doneAll();
399 }
400 
401 void
403 {
404  // use async call to enable job call protection that time events lack
405  CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
406 }
407 
408 void
410 {
411  if (!doneLoading())
412  loadingSteps();
413  else
414  validationSteps();
415 
416  checkpoint();
417 }
418 
419 void
421 {
422  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
423  dbOffset << " <= " << dbSize);
424 
425  // Balance our desire to maximize the number of entries processed at once
426  // (and, hence, minimize overheads and total rebuild time) with a
427  // requirement to also process Coordinator events, disk I/Os, etc.
428  const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
429  const timeval loopStart = current_time;
430 
431  int64_t loaded = 0;
432  while (!doneLoading()) {
433  loadOneSlot();
434  dbOffset += dbSlotSize;
435  ++loadingPos;
436  ++loaded;
437 
438  if (counts.scancount % 1000 == 0)
439  storeRebuildProgress(sd->index, dbSlotLimit, counts.scancount);
440 
442  continue; // skip "few entries at a time" check below
443 
444  getCurrentTime();
445  const double elapsedMsec = tvSubMsec(loopStart, current_time);
446  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
447  debugs(47, 5, "pausing after " << loaded << " entries in " <<
448  elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry");
449  break;
450  }
451  }
452 }
453 
456 {
457  Must(0 <= fileNo && fileNo < dbEntryLimit);
458  return LoadingEntry(fileNo, *parts);
459 }
460 
463 {
464  Must(0 <= slotId && slotId < dbSlotLimit);
465  Must(slotId <= loadingPos); // cannot look ahead
466  return LoadingSlot(slotId, *parts);
467 }
468 
469 void
471 {
472  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
473  dbOffset << " <= " << dbSize);
474 
475  // increment before loadingPos to avoid getting stuck at a slot
476  // in a case of crash
477  ++counts.scancount;
478 
479  if (lseek(fd, dbOffset, SEEK_SET) < 0)
480  failure("cannot seek to db entry", errno);
481 
482  buf.reset();
483 
484  if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
485  return;
486 
487  const SlotId slotId = loadingPos;
488 
489  // get our header
490  DbCellHeader header;
491  if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
492  debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
493  "Ignoring truncated " << buf.contentSize() << "-byte " <<
494  "cache entry meta data at " << dbOffset);
495  freeUnusedSlot(slotId, true);
496  return;
497  }
498  memcpy(&header, buf.content(), sizeof(header));
499  if (header.empty()) {
500  freeUnusedSlot(slotId, false);
501  return;
502  }
503  if (!header.sane(dbSlotSize, dbSlotLimit)) {
504  debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
505  "Ignoring malformed cache entry meta data at " << dbOffset);
506  freeUnusedSlot(slotId, true);
507  return;
508  }
509  buf.consume(sizeof(header)); // optimize to avoid memmove()
510 
511  useNewSlot(slotId, header);
512 }
513 
516 static bool
517 ZeroedSlot(const MemBuf &buf)
518 {
519  // We could memcmp the entire buffer, but it is probably safe enough to test
520  // a few bytes because even if we do not detect a corrupted entry, it is not
521  // a big deal: Store::UnpackPrefix() rejects all-0s metadata prefix.
522  static const std::array<char, 10> zeros = {};
523 
524  if (static_cast<size_t>(buf.contentSize()) < zeros.size())
525  return false; // cannot be sure enough
526 
527  return memcmp(buf.content(), zeros.data(), zeros.size()) == 0;
528 }
529 
531 bool
533 {
535  StoreEntry loadedE;
536  const uint64_t knownSize = header.entrySize > 0 ?
537  header.entrySize : anchor.basics.swap_file_sz.load();
538 
539  if (ZeroedSlot(buf))
540  return false;
541 
542  if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize))
543  return false;
544 
545  // the entry size may be unknown, but if it is known, it is authoritative
546 
547  debugs(47, 8, "importing basics for entry " << fileno <<
548  " inode.entrySize: " << header.entrySize <<
549  " swap_file_sz: " << loadedE.swap_file_sz);
550  anchor.set(loadedE);
551 
552  // we have not validated whether all db cells for this entry were loaded
554 
555  // loadedE->dump(5);
556 
557  return true;
558 }
559 
560 void
562 {
563  debugs(47, 5, sd->index << " validating from " << validationPos);
564 
565  // see loadingSteps() for the rationale; TODO: avoid duplication
566  const int maxSpentMsec = 50; // keep small: validation does not do I/O
567  const timeval loopStart = current_time;
568 
569  int64_t validated = 0;
570  while (!doneValidating()) {
571  // increment before validationPos to avoid getting stuck at a slot
572  // in a case of crash
574  if (validationPos < dbEntryLimit)
575  validateOneEntry(validationPos);
576  else
577  validateOneSlot(validationPos - dbEntryLimit);
578  ++validationPos;
579  ++validated;
580 
581  if (validationPos % 1000 == 0)
582  debugs(20, 2, "validated: " << validationPos);
583 
585  continue; // skip "few entries at a time" check below
586 
587  getCurrentTime();
588  const double elapsedMsec = tvSubMsec(loopStart, current_time);
589  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
590  debugs(47, 5, "pausing after " << validated << " entries in " <<
591  elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry");
592  break;
593  }
594  }
595 }
596 
599 void
601 {
602  // walk all map-linked slots, starting from inode, and mark each
603  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo);
604  Must(le.size > 0); // paranoid
605  uint64_t mappedSize = 0;
606  SlotId slotId = anchor.start;
607  while (slotId >= 0 && mappedSize < le.size) {
608  LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot
609  Must(!slot.finalized()); // no loops or stealing from other entries
610  Must(slot.mapped()); // all our slots should be in the sd->map
611  Must(!slot.freed()); // all our slots should still be present
612  slot.finalized(true);
613 
614  Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId);
615  Must(mapSlice.size > 0); // paranoid
616  mappedSize += mapSlice.size;
617  slotId = mapSlice.next;
618  }
619  /* no hodgepodge entries: one entry - one full chain and no leftovers */
620  Must(slotId < 0);
621  Must(mappedSize == le.size);
622 
623  if (!anchor.basics.swap_file_sz)
624  anchor.basics.swap_file_sz = le.size;
627  sd->map->closeForWriting(fileNo);
628  ++counts.objcount;
629 }
630 
633 void
635 {
636  try {
637  finalizeOrThrow(fileNo, le);
638  } catch (const std::exception &ex) {
639  freeBadEntry(fileNo, ex.what());
640  }
641 }
642 
643 void
645 {
646  LoadingEntry entry = loadingEntry(fileNo);
647  switch (entry.state()) {
648 
650  finalizeOrFree(fileNo, entry);
651  break;
652 
653  case LoadingEntry::leEmpty: // no entry hashed to this position
654  case LoadingEntry::leLoaded: // we have already unlocked this entry
655  case LoadingEntry::leCorrupted: // we have already removed this entry
656  case LoadingEntry::leIgnored: // we have already discarded this entry
657  break;
658  }
659 }
660 
661 void
663 {
664  const LoadingSlot slot = loadingSlot(slotId);
665  // there should not be any unprocessed slots left
666  Must(slot.freed() || (slot.mapped() && slot.finalized()));
667 }
668 
671 void
672 Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
673 {
674  debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
675  " entry " << fileno << " is ignored during rebuild");
676 
677  LoadingEntry le = loadingEntry(fileno);
679 
680  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
681  assert(anchor.start < 0 || le.size > 0);
682  for (SlotId slotId = anchor.start; slotId >= 0;) {
683  const SlotId next = loadingSlot(slotId).more;
684  freeSlot(slotId, true);
685  slotId = next;
686  }
687 
688  sd->map->forgetWritingEntry(fileno);
689 }
690 
691 void
693 {
694  debugs(47,3, "cache_dir #" << sd->index << " rebuild level: " <<
697 }
698 
699 void
700 Rock::Rebuild::callException(const std::exception &)
701 {
702  // For now, treat all Rebuild exceptions as fatal errors rather than letting
703  // the default callException() implementation to silently stop this job.
704  throw;
705 }
706 
708 void
709 Rock::Rebuild::failure(const char * const msg, const int errNo)
710 {
711  assert(sd);
712  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
713  dbOffset << " <= " << dbSize);
714 
716  error << "Cannot rebuild rock cache_dir index for " << sd->filePath <<
717  Debug::Extra << "problem: " << msg;
718  if (errNo)
719  error << Debug::Extra << "I/O error: " << xstrerr(errNo);
720  error << Debug::Extra << "scan progress: " << Math::int64Percent(loadingPos, dbSlotLimit) << '%';
721 
722  throw TextException(error.buf(), Here());
723 }
724 
726 void
727 Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
728 {
729  debugs(47,5, sd->index << " frees slot " << slotId);
730  LoadingSlot slot = loadingSlot(slotId);
731  assert(!slot.freed());
732  slot.freed(true);
733 
734  if (invalid) {
735  ++counts.invalid;
736  //sd->unlink(fileno); leave garbage on disk, it should not hurt
737  }
738 
739  Ipc::Mem::PageId pageId;
740  pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(sd->index);
741  pageId.number = slotId+1;
742  sd->freeSlots->push(pageId);
743 }
744 
746 void
747 Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid)
748 {
749  LoadingSlot slot = loadingSlot(slotId);
750  // mapped slots must be freed via freeBadEntry() to keep the map in sync
751  assert(!slot.mapped());
752  freeSlot(slotId, invalid);
753 }
754 
756 void
757 Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
758 {
759  LoadingSlot slot = loadingSlot(slotId);
760  assert(!slot.mapped());
761  assert(!slot.freed());
762  slot.mapped(true);
763 
764  Ipc::StoreMapSlice slice;
765  slice.next = header.nextSlot;
766  slice.size = header.payloadSize;
767  sd->map->importSlice(slotId, slice);
768 }
769 
770 template <class SlotIdType> // accommodates atomic and simple SlotIds.
771 void
772 Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to)
773 {
774  LoadingSlot slot = loadingSlot(to);
775  assert(slot.more < 0);
776  slot.more = from; // may still be unset
777  from = to;
778 }
779 
782 void
783 Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
784 {
785  LoadingEntry le = loadingEntry(fileno);
786  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
787 
788  debugs(47,9, "adding " << slotId << " to entry " << fileno);
789  // we do not need to preserve the order
790  if (le.anchored()) {
791  LoadingSlot inode = loadingSlot(anchor.start);
792  chainSlots(inode.more, slotId);
793  } else {
794  chainSlots(anchor.start, slotId);
795  }
796 
797  le.size += header.payloadSize; // must precede freeBadEntry() calls
798 
799  if (header.firstSlot == slotId) {
800  debugs(47,5, "added inode");
801 
802  if (le.anchored()) { // we have already added another inode slot
803  freeBadEntry(fileno, "inode conflict");
804  ++counts.clashcount;
805  return;
806  }
807 
808  le.anchored(true);
809 
810  if (!importEntry(anchor, fileno, header)) {
811  freeBadEntry(fileno, "corrupted metainfo");
812  return;
813  }
814 
815  // set total entry size and/or check it for consistency
816  if (const uint64_t totalSize = header.entrySize) {
817  assert(totalSize != static_cast<uint64_t>(-1));
818  if (!anchor.basics.swap_file_sz) {
819  anchor.basics.swap_file_sz = totalSize;
820  assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
821  } else if (totalSize != anchor.basics.swap_file_sz) {
822  freeBadEntry(fileno, "size mismatch");
823  return;
824  }
825  }
826  }
827 
828  const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown
829 
830  if (totalSize > 0 && le.size > totalSize) { // overflow
831  debugs(47, 8, "overflow: " << le.size << " > " << totalSize);
832  freeBadEntry(fileno, "overflowing");
833  return;
834  }
835 
836  mapSlot(slotId, header);
837  if (totalSize > 0 && le.size == totalSize)
838  finalizeOrFree(fileno, le); // entry is probably fully loaded now
839 }
840 
842 void
844 {
845  anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
846  assert(header.firstSlot >= 0);
847  anchor.start = -1; // addSlotToEntry() will set it
848 
849  assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
850 
851  LoadingEntry le = loadingEntry(fileno);
853  le.version = header.version;
854  le.size = 0;
855 }
856 
858 void
859 Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
860 {
861  // A miss may have been stored at our fileno while we were loading other
862  // slots from disk. We ought to preserve that entry because it is fresher.
863  const bool overwriteExisting = false;
864  if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) {
865  primeNewEntry(*anchor, fileno, header);
866  addSlotToEntry(fileno, slotId, header); // may fail
867  assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1));
868  } else {
869  // A new from-network entry is occupying our map slot; let it be, but
870  // save us from the trouble of going through the above motions again.
871  LoadingEntry le = loadingEntry(fileno);
873  freeUnusedSlot(slotId, false);
874  }
875 }
876 
878 bool
879 Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
880 {
881  // Header updates always result in multi-start chains and often
882  // result in multi-version chains so we can only compare the keys.
883  const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
884  return anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
885 }
886 
888 void
889 Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
890 {
891  const cache_key *const key =
892  reinterpret_cast<const cache_key*>(header.key);
893  const sfileno fileno = sd->map->fileNoByKey(key);
894  assert(0 <= fileno && fileno < dbEntryLimit);
895 
896  LoadingEntry le = loadingEntry(fileno);
897  debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " <<
898  header.firstSlot << ", size: " << header.payloadSize);
899 
900  switch (le.state()) {
901 
902  case LoadingEntry::leEmpty: {
903  startNewEntry(fileno, slotId, header);
904  break;
905  }
906 
908  if (sameEntry(fileno, header)) {
909  addSlotToEntry(fileno, slotId, header); // may fail
910  } else {
911  // either the loading chain or this slot is stale;
912  // be conservative and ignore both (and any future ones)
913  freeBadEntry(fileno, "duplicated");
914  freeUnusedSlot(slotId, true);
915  ++counts.dupcount;
916  }
917  break;
918  }
919 
920  case LoadingEntry::leLoaded: {
921  // either the previously loaded chain or this slot is stale;
922  // be conservative and ignore both (and any future ones)
924  sd->map->freeEntry(fileno); // may not be immediately successful
925  freeUnusedSlot(slotId, true);
926  ++counts.dupcount;
927  break;
928  }
929 
931  // previously seen slots messed things up so we must ignore this one
932  freeUnusedSlot(slotId, true);
933  break;
934  }
935 
937  // already replaced by a fresher or colliding from-network entry
938  freeUnusedSlot(slotId, false);
939  break;
940  }
941  }
942 }
943 
944 SBuf
946 {
947  SBufStream str;
948 
949  str << Debug::Extra << "slots loaded: " << Progress(loadingPos, dbSlotLimit);
950 
951  const auto validatingEntries = validationPos < dbEntryLimit;
952  const auto entriesValidated = validatingEntries ? validationPos : dbEntryLimit;
953  str << Debug::Extra << "entries validated: " << Progress(entriesValidated, dbEntryLimit);
954  if (opt_store_doublecheck) {
955  const auto slotsValidated = validatingEntries ? 0 : (validationPos - dbEntryLimit);
956  str << Debug::Extra << "slots validated: " << Progress(slotsValidated, dbSlotLimit);
957  }
958 
959  return str.buf();
960 }
961 
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
#define EBIT_CLR(flag, bit)
Definition: defines.h:66
const char * xstrerr(int error)
Definition: xstrerror.cc:83
@ ENTRY_VALIDATED
Definition: enums.h:108
#define Here()
source code location of the caller
Definition: Here.h:15
#define SM_PAGE_SIZE
Definition: defines.h:63
void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
handle a slot from an entry that we have not seen before
Definition: RockRebuild.cc:859
uint8_t state
current entry state (one of the LoadingEntry::State values)
Definition: RockRebuild.cc:101
SBuf progressDescription() const
Definition: RockRebuild.cc:945
bool sane(const size_t slotSize, int slotLimit) const
whether this slot is not corrupted
Definition: RockDbCell.h:33
bool used() const
Definition: RockRebuild.cc:155
LoadingSlot loadingSlot(const SlotId slotId)
Definition: RockRebuild.cc:462
smart db slot-level info pointer (hides anti-padding LoadingParts arrays)
Definition: RockRebuild.cc:135
int64_t dbEntryLimit
maximum number of entries that can be stored in db
Definition: RockRebuild.h:117
Definition: forward.h:27
unsigned char cache_key
Store key.
Definition: forward.h:29
smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays)
Definition: RockRebuild.cc:111
#define EBIT_SET(flag, bit)
Definition: defines.h:65
bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header)
parse StoreEntry basics and add them to the map, returning true on success
Definition: RockRebuild.cc:532
int opt_store_doublecheck
bool storeRebuildParseEntry(MemBuf &buf, StoreEntry &tmpe, cache_key *key, StoreRebuildData &stats, uint64_t expectedSize)
Rebuild(SwapDir *dir, const Ipc::Mem::Pointer< Stats > &)
Definition: RockRebuild.cc:300
PoolId pool
Definition: Page.h:39
SwapDir * sd
Definition: RockRebuild.h:109
void chainSlots(SlotIdType &from, const SlotId to)
Definition: RockRebuild.cc:772
Flags & flags() const
Definition: RockRebuild.cc:180
Shared memory page identifier, address, or handler.
Definition: Page.h:23
CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild)
bool sameKey(const cache_key *const aKey) const
Definition: StoreMap.cc:952
void error(char *format,...)
static void Steps(void *data)
Definition: RockRebuild.cc:402
bool sameEntry(const sfileno fileno, const DbCellHeader &header) const
does the header belong to the fileno entry being loaded?
Definition: RockRebuild.cc:879
Definition: SBuf.h:93
Mores & mores() const
Definition: RockRebuild.cc:179
void set(const StoreEntry &anEntry, const cache_key *aKey=nullptr)
store StoreEntry key and basics for an inode slot
Definition: StoreMap.cc:959
void updateStartTime(const timeval &dirStartTime)
maintain earliest initiation time across multiple indexing cache_dirs
uint8_t anchored
whether we loaded the inode slot for this entry
Definition: RockRebuild.cc:102
void mapped(const bool beMapped)
Definition: RockRebuild.cc:145
static bool DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
Definition: RockRebuild.cc:81
LoadingParts(const SwapDir &dir, const bool resuming)
Definition: RockRebuild.cc:225
bool finalized() const
Definition: RockRebuild.cc:148
bool doneValidating() const
Definition: RockRebuild.cc:390
#define shm_new(Class)
Definition: Pointer.h:200
bool anchored() const
Definition: RockRebuild.cc:127
LoadingEntry loadingEntry(const sfileno fileNo)
Definition: RockRebuild.cc:455
void file_close(int fd)
Definition: fs_io.cc:92
bool empty() const
true iff no entry occupies this slot
Definition: RockDbCell.h:28
State
possible store entry states during index rebuild
Definition: RockRebuild.cc:120
void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
initialize housekeeping information for a newly accepted entry
Definition: RockRebuild.cc:843
uint8_t mapped
whether the slot was added to a mapped entry
Definition: RockRebuild.cc:105
State state() const
Definition: RockRebuild.cc:123
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
void storeRebuildComplete(StoreRebuildData *dc)
void freeBadEntry(const sfileno fileno, const char *eDescription)
Definition: RockRebuild.cc:672
void freeUnusedSlot(const SlotId slotId, const bool invalid)
freeSlot() for never-been-mapped slots
Definition: RockRebuild.cc:747
std::atomic< StoreMapSliceId > start
where the chain of StoreEntry slices begins [app]
Definition: StoreMap.h:111
int64_t dbSize
Definition: RockRebuild.h:114
void finalizeOrThrow(const sfileno fileNo, LoadingEntry &le)
Definition: RockRebuild.cc:600
uint64_t slotSize
all db slots are of this size
Definition: RockSwapDir.h:83
int dbSlotSize
the size of a db cell, including the cell header
Definition: RockRebuild.h:115
void anchored(const bool beAnchored)
Definition: RockRebuild.cc:128
#define SQUID_MD5_DIGEST_LENGTH
Definition: md5.h:66
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
std::atomic< StoreMapSliceId > next
ID of the next entry slice.
Definition: StoreMap.h:49
#define O_BINARY
Definition: defines.h:134
struct Ipc::StoreMapAnchor::Basics basics
int size
Definition: ModDevPoll.cc:70
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:18
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
Versions & versions() const
Definition: RockRebuild.cc:178
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
bool IamDiskProcess() STUB_RETVAL_NOP(false) bool InDaemonMode() STUB_RETVAL_NOP(false) bool UsingSmp() STUB_RETVAL_NOP(false) bool IamCoordinatorProcess() STUB_RETVAL(false) bool IamPrimaryProcess() STUB_RETVAL(false) int NumberOfKids() STUB_RETVAL(0) void setMaxFD(void) STUB void setSystemLimits(void) STUB void squid_signal(int
whether the current process is dedicated to managing a cache_dir
void failure(const char *msg, int errNo=0)
a helper to handle rebuild-killing I/O errors
Definition: RockRebuild.cc:709
int32_t StoreMapSliceId
Definition: StoreMap.h:24
LoadingFlags & flags
entry flags (see the above accessors) are ours
Definition: RockRebuild.cc:131
int64_t entryLimitActual() const
max number of possible entries in db
Definition: RockSwapDir.cc:205
void startShutdown() override
Definition: RockRebuild.cc:334
Definition: MemBuf.h:23
char * path
Definition: Disk.h:102
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
Definition: RockRebuild.cc:783
void validationSteps()
Definition: RockRebuild.cc:561
void state(State aState) const
Definition: RockRebuild.cc:124
int64_t diskOffsetLimit() const
Definition: RockSwapDir.cc:692
static bool ZeroedSlot(const MemBuf &buf)
Definition: RockRebuild.cc:517
Mores::Owner * moresOwner
LoadingSlot::more for all slots.
Definition: RockRebuild.cc:190
void setKey(const cache_key *const aKey)
Definition: StoreMap.cc:945
#define CallJobHere(debugSection, debugLevel, job, Class, method)
Definition: AsyncJobCalls.h:59
#define shm_old(Class)
Definition: Pointer.h:201
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
LoadingSlot(const SlotId slotId, LoadingParts &source)
Definition: RockRebuild.cc:209
bool doneLoading() const
Definition: RockRebuild.cc:384
#define assert(EX)
Definition: assert.h:17
bool storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf, StoreRebuildData &)
loads entry from disk; fills supplied memory buffer on success
void loadOneSlot()
Definition: RockRebuild.cc:470
static Owner * Old(const char *const id)
attaches to the existing shared memory segment, becoming its owner
Definition: Pointer.h:123
std::atomic< uint64_t > swap_file_sz
Definition: StoreMap.h:105
void fill(const Item &value)
reset all items to the same value
Definition: StoreMap.h:145
int64_t dbSlotLimit
total number of db cells
Definition: RockRebuild.h:116
static int version
SBuf buf()
bytes written so far
Definition: Stream.h:41
static Ipc::Mem::Owner< Stats > * Init(const SwapDir &)
Definition: RockRebuild.cc:259
int64_t int64Percent(const int64_t a, const int64_t b)
Definition: SquidMath.cc:19
bool completed(const SwapDir &) const
whether the rebuild is finished already
Definition: RockRebuild.cc:265
uint8_t finalized
whether finalizeOrThrow() has scanned the slot
Definition: RockRebuild.cc:106
void validateOneSlot(const SlotId slotId)
Definition: RockRebuild.cc:662
T::Owner * createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
Definition: RockRebuild.cc:219
static int store_dirs_rebuilding
the number of cache_dirs being rebuilt; TODO: move to Disks::Rebuilding
Definition: Controller.h:133
static std::ostream & Extra(std::ostream &)
Definition: debug.cc:1316
void checkpoint()
continues after a pause if not done
Definition: RockRebuild.cc:377
signed_int32_t sfileno
Definition: forward.h:22
void useNewSlot(const SlotId slotId, const DbCellHeader &header)
handle freshly loaded (and validated) db slot header
Definition: RockRebuild.cc:889
bool freed() const
Definition: RockRebuild.cc:152
static SBuf Name(const SBuf &prefix, const char *suffix)
concatenates parts of a name to form a complete name (or its prefix)
Definition: Segment.cc:51
uint64_t & size
payload seen so far
Definition: RockRebuild.cc:116
const char * filePath
location of cache storage file inside path/
Definition: RockSwapDir.h:135
static bool IsResponsible(const SwapDir &)
whether the current kid is responsible for rebuilding the given cache_dir
Definition: RockRebuild.cc:274
virtual void callException(const std::exception &) override
called when the job throws during an async call
Definition: RockRebuild.cc:700
uint32_t & version
DbCellHeader::version to distinguish same-URL chains.
Definition: RockRebuild.cc:117
~Rebuild() override
Definition: RockRebuild.cc:324
Versions::Owner * versionsOwner
LoadingEntry::version for all entries.
Definition: RockRebuild.cc:187
advancement of work that consists of (usually known number) of similar steps
Definition: store_rebuild.h:46
uint32_t number
page number within the segment
Definition: Page.h:42
void finalized(const bool beFinalized)
Definition: RockRebuild.cc:149
void validateOneEntry(const sfileno fileNo)
Definition: RockRebuild.cc:644
sfileno SlotId
db cell number, starting with cell 0 (always occupied by the db header)
Definition: forward.h:30
int64_t validations
the number of validated cache entries, slots
Definition: store_rebuild.h:41
an std::runtime_error with thrower location info
Definition: TextException.h:20
char * content()
start of the added data
Definition: MemBuf.h:41
Ipc::StoreMapSliceId & more
another slot in some chain belonging to the same entry (unordered!)
Definition: RockRebuild.cc:141
static StoreRebuildData counts
Sizes::Owner * sizesOwner
LoadingEntry::size for all entries.
Definition: RockRebuild.cc:186
bool mapped() const
Definition: RockRebuild.cc:144
static bool Start(SwapDir &dir)
Definition: RockRebuild.cc:281
#define Must(condition)
Definition: TextException.h:75
int64_t slotLimitActual() const
total number of slots in this db
Definition: RockSwapDir.cc:196
#define Important(id)
Definition: Messages.h:93
uint64_t swap_file_sz
Definition: Store.h:229
int xread(int fd, void *buf, size_t bufSize)
POSIX read(2) equivalent.
Definition: unistd.h:61
#define DBG_IMPORTANT
Definition: Stream.h:38
void freeSlot(const SlotId slotId, const bool invalid)
adds slot to the free slot index
Definition: RockRebuild.cc:727
int index
Definition: Disk.h:103
static const int64_t HeaderSize
on-disk db header size
Definition: RockSwapDir.h:151
LoadingFlags & flags
slot flags (see the above accessors) are ours
Definition: RockRebuild.cc:158
ssize_t mb_size_t
Definition: MemBuf.h:17
Flags::Owner * flagsOwner
all LoadingEntry and LoadingSlot flags
Definition: RockRebuild.cc:193
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
void start() override
prepares and initiates entry loading sequence
Definition: RockRebuild.cc:341
void loadingSteps()
Definition: RockRebuild.cc:420
void swanSong() override
Definition: RockRebuild.cc:692
int opt_foreground_rebuild
void mapSlot(const SlotId slotId, const DbCellHeader &header)
adds slot to the entry chain in the map
Definition: RockRebuild.cc:757
std::atomic< Size > size
slice contents size
Definition: StoreMap.h:48
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
int file_open(const char *path, int mode)
Definition: fs_io.cc:66
bool doneAll() const override
whether positive goal has been reached
Definition: RockRebuild.cc:396
bool UsingSmp()
Whether there should be more than one worker process running.
Definition: tools.cc:697
LoadingEntry(const sfileno fileNo, LoadingParts &source)
Definition: RockRebuild.cc:200
low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
Definition: RockRebuild.cc:95
static bool DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
Definition: RockRebuild.cc:87
cache_dir indexing statistics shared across same-kid process restarts
Definition: RockRebuild.h:36
Class * object()
Raw access; handy to finalize initiatization, but avoid if possible.
Definition: Pointer.h:43
void freed(const bool beFreed)
Definition: RockRebuild.cc:153
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
Sizes & sizes() const
Definition: RockRebuild.cc:177
uint8_t freed
whether the slot was given to the map as free space
Definition: RockRebuild.cc:107
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
static SBuf Path(const char *dirPath)
Definition: RockRebuild.cc:253
static PoolId IdForSwapDirSpace(const int dirIdx)
stack of free rock cache_dir slot numbers
Definition: PageStack.h:171
void finalizeOrFree(const sfileno fileNo, LoadingEntry &le)
Definition: RockRebuild.cc:634
void storeRebuildProgress(const int sd_index_raw, const int total, const int sofar)
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37

 

Introduction

Documentation

Support

Miscellaneous