RockRebuild.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Disk IO Routines */
10 
11 #include "squid.h"
12 #include "base/AsyncJobCalls.h"
13 #include "debug/Messages.h"
14 #include "fs/rock/RockDbCell.h"
15 #include "fs/rock/RockRebuild.h"
16 #include "fs/rock/RockSwapDir.h"
17 #include "fs_io.h"
18 #include "globals.h"
19 #include "md5.h"
20 #include "sbuf/Stream.h"
21 #include "SquidMath.h"
22 #include "Store.h"
23 #include "tools.h"
24 
25 #include <array>
26 #include <cerrno>
27 #include <cstring>
28 
30 
76 namespace Rock
77 {
78 
79 static bool
80 DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
81 {
82  return loadingPos >= dbSlotLimit;
83 }
84 
85 static bool
86 DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
87 {
88  // paranoid slot checking is only enabled with squid -S
89  const auto extraWork = opt_store_doublecheck ? dbSlotLimit : 0;
90  return validationPos >= (dbEntryLimit + extraWork);
91 }
92 
95 {
96 public:
97  LoadingFlags(): state(0), anchored(0), mapped(0), finalized(0), freed(0) {}
98 
99  /* for LoadingEntry */
100  uint8_t state:3;
101  uint8_t anchored:1;
102 
103  /* for LoadingSlot */
104  uint8_t mapped:1;
105  uint8_t finalized:1;
106  uint8_t freed:1;
107 };
108 
111 {
112 public:
113  LoadingEntry(const sfileno fileNo, LoadingParts &source);
114 
115  uint64_t &size;
116  uint32_t &version;
117 
120 
121  /* LoadingFlags::state */
122  State state() const { return static_cast<State>(flags.state); }
123  void state(State aState) const { flags.state = aState; }
124 
125  /* LoadingFlags::anchored */
126  bool anchored() const { return flags.anchored; }
127  void anchored(const bool beAnchored) { flags.anchored = beAnchored; }
128 
129 private:
131 };
132 
135 {
136 public:
137  LoadingSlot(const SlotId slotId, LoadingParts &source);
138 
141 
142  /* LoadingFlags::mapped */
143  bool mapped() const { return flags.mapped; }
144  void mapped(const bool beMapped) { flags.mapped = beMapped; }
145 
146  /* LoadingFlags::finalized */
147  bool finalized() const { return flags.finalized; }
148  void finalized(const bool beFinalized) { flags.finalized = beFinalized; }
149 
150  /* LoadingFlags::freed */
151  bool freed() const { return flags.freed; }
152  void freed(const bool beFreed) { flags.freed = beFreed; }
153 
154  bool used() const { return freed() || mapped() || more != -1; }
155 
156 private:
158 };
159 
163 {
164 public:
169 
170  LoadingParts(const SwapDir &dir, const bool resuming);
171  ~LoadingParts();
172 
173  // lacking copying/moving code and often too huge to copy
174  LoadingParts(LoadingParts&&) = delete;
175 
176  Sizes &sizes() const { return *sizesOwner->object(); }
177  Versions &versions() const { return *versionsOwner->object(); }
178  Mores &mores() const { return *moresOwner->object(); }
179  Flags &flags() const { return *flagsOwner->object(); }
180 
181 private:
182  /* Anti-padding storage. With millions of entries, padding matters! */
183 
184  /* indexed by sfileno */
187 
188  /* indexed by SlotId */
190 
191  /* entry flags are indexed by sfileno; slot flags -- by SlotId */
193 };
194 
195 } /* namespace Rock */
196 
197 /* LoadingEntry */
198 
200  size(source.sizes().at(fileNo)),
201  version(source.versions().at(fileNo)),
202  flags(source.flags().at(fileNo))
203 {
204 }
205 
206 /* LoadingSlot */
207 
209  more(source.mores().at(slotId)),
210  flags(source.flags().at(slotId))
211 {
212 }
213 
214 /* LoadingParts */
215 
216 template <class T>
217 inline typename T::Owner *
218 createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
219 {
220  auto id = Ipc::Mem::Segment::Name(SBuf(dirPath), sfx);
221  return resuming ? Ipc::Mem::Owner<T>::Old(id.c_str()) : shm_new(T)(id.c_str(), limit);
222 }
223 
224 Rock::LoadingParts::LoadingParts(const SwapDir &dir, const bool resuming):
225  sizesOwner(createOwner<Sizes>(dir.path, "rebuild_sizes", dir.entryLimitActual(), resuming)),
226  versionsOwner(createOwner<Versions>(dir.path, "rebuild_versions", dir.entryLimitActual(), resuming)),
227  moresOwner(createOwner<Mores>(dir.path, "rebuild_mores", dir.slotLimitActual(), resuming)),
228  flagsOwner(createOwner<Flags>(dir.path, "rebuild_flags", dir.slotLimitActual(), resuming))
229 {
230  assert(sizes().capacity == versions().capacity); // every entry has both fields
231  assert(sizes().capacity <= mores().capacity); // every entry needs slot(s)
232  assert(mores().capacity == flags().capacity); // every slot needs a set of flags
233 
234  if (!resuming) {
235  // other parts rely on shared memory segments being zero-initialized
236  // TODO: refactor the next slot pointer to use 0 for nil values
237  mores().fill(-1);
238  }
239 }
240 
242 {
243  delete sizesOwner;
244  delete versionsOwner;
245  delete moresOwner;
246  delete flagsOwner;
247 }
248 
249 /* Rock::Rebuild::Stats */
250 
251 SBuf
252 Rock::Rebuild::Stats::Path(const char *dirPath)
253 {
254  return Ipc::Mem::Segment::Name(SBuf(dirPath), "rebuild_stats");
255 }
256 
259 {
260  return shm_new(Stats)(Path(dir.path).c_str());
261 }
262 
263 bool
265 {
266  return DoneLoading(counts.scancount, dir.slotLimitActual()) &&
268 }
269 
270 /* Rebuild */
271 
272 bool
274 {
275  // in SMP mode, only the disker is responsible for populating the map
276  return !UsingSmp() || IamDiskProcess();
277 }
278 
279 bool
281 {
282  if (!IsResponsible(dir)) {
283  debugs(47, 2, "not responsible for indexing cache_dir #" <<
284  dir.index << " from " << dir.filePath);
285  return false;
286  }
287 
288  const auto stats = shm_old(Rebuild::Stats)(Stats::Path(dir.path).c_str());
289  if (stats->completed(dir)) {
290  debugs(47, 2, "already indexed cache_dir #" <<
291  dir.index << " from " << dir.filePath);
292  return false;
293  }
294 
295  AsyncJob::Start(new Rebuild(&dir, stats));
296  return true;
297 }
298 
300  sd(dir),
301  parts(nullptr),
302  stats(s),
303  dbSize(0),
304  dbSlotSize(0),
305  dbSlotLimit(0),
306  dbEntryLimit(0),
307  fd(-1),
308  dbOffset(0),
309  loadingPos(stats->counts.scancount),
310  validationPos(stats->counts.validations),
311  counts(stats->counts),
312  resuming(stats->counts.started())
313 {
314  assert(sd);
315  dbSize = sd->diskOffsetLimit(); // we do not care about the trailer waste
320  registerRunner();
321 }
322 
324 {
325  if (fd >= 0)
326  file_close(fd);
327  // normally, segments are used until the Squid instance quits,
328  // but these indexing-only segments are no longer needed
329  delete parts;
330 }
331 
332 void
334 {
335  mustStop("startShutdown");
336 }
337 
339 void
341 {
342  assert(IsResponsible(*sd));
343 
344  if (!resuming) {
345  debugs(47, Important(18), "Loading cache_dir #" << sd->index <<
346  " from " << sd->filePath);
347  } else {
348  debugs(47, Important(63), "Resuming indexing cache_dir #" << sd->index <<
349  " from " << sd->filePath << ':' << progressDescription());
350  }
351 
352  fd = file_open(sd->filePath, O_RDONLY | O_BINARY);
353  if (fd < 0)
354  failure("cannot open db", errno);
355 
356  char hdrBuf[SwapDir::HeaderSize];
357  if (read(fd, hdrBuf, sizeof(hdrBuf)) != SwapDir::HeaderSize)
358  failure("cannot read db header", errno);
359 
360  // slot prefix of SM_PAGE_SIZE should fit both core entry header and ours
361  assert(sizeof(DbCellHeader) < SM_PAGE_SIZE);
362  buf.init(SM_PAGE_SIZE, SM_PAGE_SIZE);
363 
364  dbOffset = SwapDir::HeaderSize + loadingPos * dbSlotSize;
365 
366  assert(!parts);
367  parts = new LoadingParts(*sd, resuming);
368 
370 
371  checkpoint();
372 }
373 
375 void
377 {
378  if (!done())
379  eventAdd("Rock::Rebuild", Rock::Rebuild::Steps, this, 0.01, 1, true);
380 }
381 
382 bool
384 {
385  return DoneLoading(loadingPos, dbSlotLimit);
386 }
387 
388 bool
390 {
391  return DoneValidating(validationPos, dbSlotLimit, dbEntryLimit);
392 }
393 
394 bool
396 {
397  return doneLoading() && doneValidating() && AsyncJob::doneAll();
398 }
399 
400 void
402 {
403  // use async call to enable job call protection that time events lack
404  CallJobHere(47, 5, static_cast<Rebuild*>(data), Rock::Rebuild, steps);
405 }
406 
407 void
409 {
410  if (!doneLoading())
411  loadingSteps();
412  else
413  validationSteps();
414 
415  checkpoint();
416 }
417 
418 void
420 {
421  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
422  dbOffset << " <= " << dbSize);
423 
424  // Balance our desire to maximize the number of entries processed at once
425  // (and, hence, minimize overheads and total rebuild time) with a
426  // requirement to also process Coordinator events, disk I/Os, etc.
427  const int maxSpentMsec = 50; // keep small: most RAM I/Os are under 1ms
428  const timeval loopStart = current_time;
429 
430  int64_t loaded = 0;
431  while (!doneLoading()) {
432  loadOneSlot();
433  dbOffset += dbSlotSize;
434  ++loadingPos;
435  ++loaded;
436 
437  if (counts.scancount % 1000 == 0)
438  storeRebuildProgress(sd->index, dbSlotLimit, counts.scancount);
439 
441  continue; // skip "few entries at a time" check below
442 
443  getCurrentTime();
444  const double elapsedMsec = tvSubMsec(loopStart, current_time);
445  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
446  debugs(47, 5, "pausing after " << loaded << " entries in " <<
447  elapsedMsec << "ms; " << (elapsedMsec/loaded) << "ms per entry");
448  break;
449  }
450  }
451 }
452 
455 {
456  Must(0 <= fileNo && fileNo < dbEntryLimit);
457  return LoadingEntry(fileNo, *parts);
458 }
459 
462 {
463  Must(0 <= slotId && slotId < dbSlotLimit);
464  Must(slotId <= loadingPos); // cannot look ahead
465  return LoadingSlot(slotId, *parts);
466 }
467 
468 void
470 {
471  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
472  dbOffset << " <= " << dbSize);
473 
474  // increment before loadingPos to avoid getting stuck at a slot
475  // in a case of crash
476  ++counts.scancount;
477 
478  if (lseek(fd, dbOffset, SEEK_SET) < 0)
479  failure("cannot seek to db entry", errno);
480 
481  buf.reset();
482 
483  if (!storeRebuildLoadEntry(fd, sd->index, buf, counts))
484  return;
485 
486  const SlotId slotId = loadingPos;
487 
488  // get our header
489  DbCellHeader header;
490  if (buf.contentSize() < static_cast<mb_size_t>(sizeof(header))) {
491  debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
492  "Ignoring truncated " << buf.contentSize() << "-byte " <<
493  "cache entry meta data at " << dbOffset);
494  freeUnusedSlot(slotId, true);
495  return;
496  }
497  memcpy(&header, buf.content(), sizeof(header));
498  if (header.empty()) {
499  freeUnusedSlot(slotId, false);
500  return;
501  }
502  if (!header.sane(dbSlotSize, dbSlotLimit)) {
503  debugs(47, DBG_IMPORTANT, "WARNING: cache_dir[" << sd->index << "]: " <<
504  "Ignoring malformed cache entry meta data at " << dbOffset);
505  freeUnusedSlot(slotId, true);
506  return;
507  }
508  buf.consume(sizeof(header)); // optimize to avoid memmove()
509 
510  useNewSlot(slotId, header);
511 }
512 
515 static bool
516 ZeroedSlot(const MemBuf &buf)
517 {
518  // We could memcmp the entire buffer, but it is probably safe enough to test
519  // a few bytes because even if we do not detect a corrupted entry, it is not
520  // a big deal: Store::UnpackPrefix() rejects all-0s metadata prefix.
521  static const std::array<char, 10> zeros = {};
522 
523  if (static_cast<size_t>(buf.contentSize()) < zeros.size())
524  return false; // cannot be sure enough
525 
526  return memcmp(buf.content(), zeros.data(), zeros.size()) == 0;
527 }
528 
530 bool
532 {
534  StoreEntry loadedE;
535  const uint64_t knownSize = header.entrySize > 0 ?
536  header.entrySize : anchor.basics.swap_file_sz.load();
537 
538  if (ZeroedSlot(buf))
539  return false;
540 
541  if (!storeRebuildParseEntry(buf, loadedE, key, counts, knownSize))
542  return false;
543 
544  // the entry size may be unknown, but if it is known, it is authoritative
545 
546  debugs(47, 8, "importing basics for entry " << fileno <<
547  " inode.entrySize: " << header.entrySize <<
548  " swap_file_sz: " << loadedE.swap_file_sz);
549  anchor.set(loadedE);
550 
551  // we have not validated whether all db cells for this entry were loaded
553 
554  // loadedE->dump(5);
555 
556  return true;
557 }
558 
559 void
561 {
562  debugs(47, 5, sd->index << " validating from " << validationPos);
563 
564  // see loadingSteps() for the rationale; TODO: avoid duplication
565  const int maxSpentMsec = 50; // keep small: validation does not do I/O
566  const timeval loopStart = current_time;
567 
568  int64_t validated = 0;
569  while (!doneValidating()) {
570  // increment before validationPos to avoid getting stuck at a slot
571  // in a case of crash
573  if (validationPos < dbEntryLimit)
574  validateOneEntry(validationPos);
575  else
576  validateOneSlot(validationPos - dbEntryLimit);
577  ++validationPos;
578  ++validated;
579 
580  if (validationPos % 1000 == 0)
581  debugs(20, 2, "validated: " << validationPos);
582 
584  continue; // skip "few entries at a time" check below
585 
586  getCurrentTime();
587  const double elapsedMsec = tvSubMsec(loopStart, current_time);
588  if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
589  debugs(47, 5, "pausing after " << validated << " entries in " <<
590  elapsedMsec << "ms; " << (elapsedMsec/validated) << "ms per entry");
591  break;
592  }
593  }
594 }
595 
598 void
600 {
601  // walk all map-linked slots, starting from inode, and mark each
602  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileNo);
603  Must(le.size > 0); // paranoid
604  uint64_t mappedSize = 0;
605  SlotId slotId = anchor.start;
606  while (slotId >= 0 && mappedSize < le.size) {
607  LoadingSlot slot = loadingSlot(slotId); // throws if we have not loaded that slot
608  Must(!slot.finalized()); // no loops or stealing from other entries
609  Must(slot.mapped()); // all our slots should be in the sd->map
610  Must(!slot.freed()); // all our slots should still be present
611  slot.finalized(true);
612 
613  Ipc::StoreMapSlice &mapSlice = sd->map->writeableSlice(fileNo, slotId);
614  Must(mapSlice.size > 0); // paranoid
615  mappedSize += mapSlice.size;
616  slotId = mapSlice.next;
617  }
618  /* no hodgepodge entries: one entry - one full chain and no leftovers */
619  Must(slotId < 0);
620  Must(mappedSize == le.size);
621 
622  if (!anchor.basics.swap_file_sz)
623  anchor.basics.swap_file_sz = le.size;
626  sd->map->closeForWriting(fileNo);
627  ++counts.objcount;
628 }
629 
632 void
634 {
635  try {
636  finalizeOrThrow(fileNo, le);
637  } catch (const std::exception &ex) {
638  freeBadEntry(fileNo, ex.what());
639  }
640 }
641 
642 void
644 {
645  LoadingEntry entry = loadingEntry(fileNo);
646  switch (entry.state()) {
647 
649  finalizeOrFree(fileNo, entry);
650  break;
651 
652  case LoadingEntry::leEmpty: // no entry hashed to this position
653  case LoadingEntry::leLoaded: // we have already unlocked this entry
654  case LoadingEntry::leCorrupted: // we have already removed this entry
655  case LoadingEntry::leIgnored: // we have already discarded this entry
656  break;
657  }
658 }
659 
660 void
662 {
663  const LoadingSlot slot = loadingSlot(slotId);
664  // there should not be any unprocessed slots left
665  Must(slot.freed() || (slot.mapped() && slot.finalized()));
666 }
667 
670 void
671 Rock::Rebuild::freeBadEntry(const sfileno fileno, const char *eDescription)
672 {
673  debugs(47, 2, "cache_dir #" << sd->index << ' ' << eDescription <<
674  " entry " << fileno << " is ignored during rebuild");
675 
676  LoadingEntry le = loadingEntry(fileno);
678 
679  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
680  assert(anchor.start < 0 || le.size > 0);
681  for (SlotId slotId = anchor.start; slotId >= 0;) {
682  const SlotId next = loadingSlot(slotId).more;
683  freeSlot(slotId, true);
684  slotId = next;
685  }
686 
687  sd->map->forgetWritingEntry(fileno);
688 }
689 
690 void
692 {
693  debugs(47,3, "cache_dir #" << sd->index << " rebuild level: " <<
696 }
697 
698 void
699 Rock::Rebuild::callException(const std::exception &)
700 {
701  // For now, treat all Rebuild exceptions as fatal errors rather than letting
702  // the default callException() implementation to silently stop this job.
703  throw;
704 }
705 
707 void
708 Rock::Rebuild::failure(const char * const msg, const int errNo)
709 {
710  assert(sd);
711  debugs(47,5, sd->index << " slot " << loadingPos << " at " <<
712  dbOffset << " <= " << dbSize);
713 
715  error << "Cannot rebuild rock cache_dir index for " << sd->filePath <<
716  Debug::Extra << "problem: " << msg;
717  if (errNo)
718  error << Debug::Extra << "I/O error: " << xstrerr(errNo);
719  error << Debug::Extra << "scan progress: " << Math::int64Percent(loadingPos, dbSlotLimit) << '%';
720 
721  throw TextException(error.buf(), Here());
722 }
723 
725 void
726 Rock::Rebuild::freeSlot(const SlotId slotId, const bool invalid)
727 {
728  debugs(47,5, sd->index << " frees slot " << slotId);
729  LoadingSlot slot = loadingSlot(slotId);
730  assert(!slot.freed());
731  slot.freed(true);
732 
733  if (invalid) {
734  ++counts.invalid;
735  //sd->unlink(fileno); leave garbage on disk, it should not hurt
736  }
737 
738  Ipc::Mem::PageId pageId;
739  pageId.pool = Ipc::Mem::PageStack::IdForSwapDirSpace(sd->index);
740  pageId.number = slotId+1;
741  sd->freeSlots->push(pageId);
742 }
743 
745 void
746 Rock::Rebuild::freeUnusedSlot(const SlotId slotId, const bool invalid)
747 {
748  LoadingSlot slot = loadingSlot(slotId);
749  // mapped slots must be freed via freeBadEntry() to keep the map in sync
750  assert(!slot.mapped());
751  freeSlot(slotId, invalid);
752 }
753 
755 void
756 Rock::Rebuild::mapSlot(const SlotId slotId, const DbCellHeader &header)
757 {
758  LoadingSlot slot = loadingSlot(slotId);
759  assert(!slot.mapped());
760  assert(!slot.freed());
761  slot.mapped(true);
762 
763  Ipc::StoreMapSlice slice;
764  slice.next = header.nextSlot;
765  slice.size = header.payloadSize;
766  sd->map->importSlice(slotId, slice);
767 }
768 
769 template <class SlotIdType> // accommodates atomic and simple SlotIds.
770 void
771 Rock::Rebuild::chainSlots(SlotIdType &from, const SlotId to)
772 {
773  LoadingSlot slot = loadingSlot(to);
774  assert(slot.more < 0);
775  slot.more = from; // may still be unset
776  from = to;
777 }
778 
781 void
782 Rock::Rebuild::addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
783 {
784  LoadingEntry le = loadingEntry(fileno);
785  Ipc::StoreMapAnchor &anchor = sd->map->writeableEntry(fileno);
786 
787  debugs(47,9, "adding " << slotId << " to entry " << fileno);
788  // we do not need to preserve the order
789  if (le.anchored()) {
790  LoadingSlot inode = loadingSlot(anchor.start);
791  chainSlots(inode.more, slotId);
792  } else {
793  chainSlots(anchor.start, slotId);
794  }
795 
796  le.size += header.payloadSize; // must precede freeBadEntry() calls
797 
798  if (header.firstSlot == slotId) {
799  debugs(47,5, "added inode");
800 
801  if (le.anchored()) { // we have already added another inode slot
802  freeBadEntry(fileno, "inode conflict");
803  ++counts.clashcount;
804  return;
805  }
806 
807  le.anchored(true);
808 
809  if (!importEntry(anchor, fileno, header)) {
810  freeBadEntry(fileno, "corrupted metainfo");
811  return;
812  }
813 
814  // set total entry size and/or check it for consistency
815  if (const uint64_t totalSize = header.entrySize) {
816  assert(totalSize != static_cast<uint64_t>(-1));
817  if (!anchor.basics.swap_file_sz) {
818  anchor.basics.swap_file_sz = totalSize;
819  assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
820  } else if (totalSize != anchor.basics.swap_file_sz) {
821  freeBadEntry(fileno, "size mismatch");
822  return;
823  }
824  }
825  }
826 
827  const uint64_t totalSize = anchor.basics.swap_file_sz; // may be 0/unknown
828 
829  if (totalSize > 0 && le.size > totalSize) { // overflow
830  debugs(47, 8, "overflow: " << le.size << " > " << totalSize);
831  freeBadEntry(fileno, "overflowing");
832  return;
833  }
834 
835  mapSlot(slotId, header);
836  if (totalSize > 0 && le.size == totalSize)
837  finalizeOrFree(fileno, le); // entry is probably fully loaded now
838 }
839 
841 void
843 {
844  anchor.setKey(reinterpret_cast<const cache_key*>(header.key));
845  assert(header.firstSlot >= 0);
846  anchor.start = -1; // addSlotToEntry() will set it
847 
848  assert(anchor.basics.swap_file_sz != static_cast<uint64_t>(-1));
849 
850  LoadingEntry le = loadingEntry(fileno);
852  le.version = header.version;
853  le.size = 0;
854 }
855 
857 void
858 Rock::Rebuild::startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
859 {
860  // A miss may have been stored at our fileno while we were loading other
861  // slots from disk. We ought to preserve that entry because it is fresher.
862  const bool overwriteExisting = false;
863  if (Ipc::StoreMap::Anchor *anchor = sd->map->openForWritingAt(fileno, overwriteExisting)) {
864  primeNewEntry(*anchor, fileno, header);
865  addSlotToEntry(fileno, slotId, header); // may fail
866  assert(anchor->basics.swap_file_sz != static_cast<uint64_t>(-1));
867  } else {
868  // A new from-network entry is occupying our map slot; let it be, but
869  // save us from the trouble of going through the above motions again.
870  LoadingEntry le = loadingEntry(fileno);
872  freeUnusedSlot(slotId, false);
873  }
874 }
875 
877 bool
878 Rock::Rebuild::sameEntry(const sfileno fileno, const DbCellHeader &header) const
879 {
880  // Header updates always result in multi-start chains and often
881  // result in multi-version chains so we can only compare the keys.
882  const Ipc::StoreMap::Anchor &anchor = sd->map->writeableEntry(fileno);
883  return anchor.sameKey(reinterpret_cast<const cache_key*>(header.key));
884 }
885 
887 void
888 Rock::Rebuild::useNewSlot(const SlotId slotId, const DbCellHeader &header)
889 {
890  const cache_key *const key =
891  reinterpret_cast<const cache_key*>(header.key);
892  const sfileno fileno = sd->map->fileNoByKey(key);
893  assert(0 <= fileno && fileno < dbEntryLimit);
894 
895  LoadingEntry le = loadingEntry(fileno);
896  debugs(47,9, "entry " << fileno << " state: " << le.state() << ", inode: " <<
897  header.firstSlot << ", size: " << header.payloadSize);
898 
899  switch (le.state()) {
900 
901  case LoadingEntry::leEmpty: {
902  startNewEntry(fileno, slotId, header);
903  break;
904  }
905 
907  if (sameEntry(fileno, header)) {
908  addSlotToEntry(fileno, slotId, header); // may fail
909  } else {
910  // either the loading chain or this slot is stale;
911  // be conservative and ignore both (and any future ones)
912  freeBadEntry(fileno, "duplicated");
913  freeUnusedSlot(slotId, true);
914  ++counts.dupcount;
915  }
916  break;
917  }
918 
919  case LoadingEntry::leLoaded: {
920  // either the previously loaded chain or this slot is stale;
921  // be conservative and ignore both (and any future ones)
923  sd->map->freeEntry(fileno); // may not be immediately successful
924  freeUnusedSlot(slotId, true);
925  ++counts.dupcount;
926  break;
927  }
928 
930  // previously seen slots messed things up so we must ignore this one
931  freeUnusedSlot(slotId, true);
932  break;
933  }
934 
936  // already replaced by a fresher or colliding from-network entry
937  freeUnusedSlot(slotId, false);
938  break;
939  }
940  }
941 }
942 
943 SBuf
945 {
946  SBufStream str;
947 
948  str << Debug::Extra << "slots loaded: " << Progress(loadingPos, dbSlotLimit);
949 
950  const auto validatingEntries = validationPos < dbEntryLimit;
951  const auto entriesValidated = validatingEntries ? validationPos : dbEntryLimit;
952  str << Debug::Extra << "entries validated: " << Progress(entriesValidated, dbEntryLimit);
953  if (opt_store_doublecheck) {
954  const auto slotsValidated = validatingEntries ? 0 : (validationPos - dbEntryLimit);
955  str << Debug::Extra << "slots validated: " << Progress(slotsValidated, dbSlotLimit);
956  }
957 
958  return str.buf();
959 }
960 
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
#define EBIT_CLR(flag, bit)
Definition: defines.h:66
const char * xstrerr(int error)
Definition: xstrerror.cc:83
void storeRebuildProgress(int sd_index, int total, int sofar)
#define Here()
source code location of the caller
Definition: Here.h:15
#define SM_PAGE_SIZE
Definition: defines.h:63
void startNewEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
handle a slot from an entry that we have not seen before
Definition: RockRebuild.cc:858
uint8_t state
current entry state (one of the LoadingEntry::State values)
Definition: RockRebuild.cc:100
SBuf progressDescription() const
Definition: RockRebuild.cc:944
bool sane(const size_t slotSize, int slotLimit) const
whether this slot is not corrupted
Definition: RockDbCell.h:33
bool used() const
Definition: RockRebuild.cc:154
LoadingSlot loadingSlot(const SlotId slotId)
Definition: RockRebuild.cc:461
smart db slot-level info pointer (hides anti-padding LoadingParts arrays)
Definition: RockRebuild.cc:134
int64_t dbEntryLimit
maximum number of entries that can be stored in db
Definition: RockRebuild.h:117
Definition: forward.h:27
unsigned char cache_key
Store key.
Definition: forward.h:29
smart StoreEntry-level info pointer (hides anti-padding LoadingParts arrays)
Definition: RockRebuild.cc:110
#define EBIT_SET(flag, bit)
Definition: defines.h:65
bool importEntry(Ipc::StoreMapAnchor &anchor, const sfileno slotId, const DbCellHeader &header)
parse StoreEntry basics and add them to the map, returning true on success
Definition: RockRebuild.cc:531
int opt_store_doublecheck
bool storeRebuildParseEntry(MemBuf &buf, StoreEntry &tmpe, cache_key *key, StoreRebuildData &stats, uint64_t expectedSize)
Rebuild(SwapDir *dir, const Ipc::Mem::Pointer< Stats > &)
Definition: RockRebuild.cc:299
PoolId pool
Definition: Page.h:39
SwapDir * sd
Definition: RockRebuild.h:109
void chainSlots(SlotIdType &from, const SlotId to)
Definition: RockRebuild.cc:771
Flags & flags() const
Definition: RockRebuild.cc:179
Shared memory page identifier, address, or handler.
Definition: Page.h:23
CBDATA_NAMESPACED_CLASS_INIT(Rock, Rebuild)
bool sameKey(const cache_key *const aKey) const
Definition: StoreMap.cc:952
void error(char *format,...)
@ ENTRY_VALIDATED
Definition: enums.h:108
static void Steps(void *data)
Definition: RockRebuild.cc:401
bool sameEntry(const sfileno fileno, const DbCellHeader &header) const
does the header belong to the fileno entry being loaded?
Definition: RockRebuild.cc:878
Definition: SBuf.h:93
Mores & mores() const
Definition: RockRebuild.cc:178
void set(const StoreEntry &anEntry, const cache_key *aKey=nullptr)
store StoreEntry key and basics for an inode slot
Definition: StoreMap.cc:959
void updateStartTime(const timeval &dirStartTime)
maintain earliest initiation time across multiple indexing cache_dirs
uint8_t anchored
whether we loaded the inode slot for this entry
Definition: RockRebuild.cc:101
void mapped(const bool beMapped)
Definition: RockRebuild.cc:144
static bool DoneLoading(const int64_t loadingPos, const int64_t dbSlotLimit)
Definition: RockRebuild.cc:80
LoadingParts(const SwapDir &dir, const bool resuming)
Definition: RockRebuild.cc:224
bool finalized() const
Definition: RockRebuild.cc:147
bool doneValidating() const
Definition: RockRebuild.cc:389
#define shm_new(Class)
Definition: Pointer.h:200
bool anchored() const
Definition: RockRebuild.cc:126
LoadingEntry loadingEntry(const sfileno fileNo)
Definition: RockRebuild.cc:454
void file_close(int fd)
Definition: fs_io.cc:93
bool empty() const
true iff no entry occupies this slot
Definition: RockDbCell.h:28
State
possible store entry states during index rebuild
Definition: RockRebuild.cc:119
void primeNewEntry(Ipc::StoreMapAnchor &anchor, const sfileno fileno, const DbCellHeader &header)
initialize housekeeping information for a newly accepted entry
Definition: RockRebuild.cc:842
uint8_t mapped
whether the slot was added to a mapped entry
Definition: RockRebuild.cc:104
State state() const
Definition: RockRebuild.cc:122
int tvSubMsec(struct timeval t1, struct timeval t2)
Definition: gadgets.cc:51
void storeRebuildComplete(StoreRebuildData *dc)
void freeBadEntry(const sfileno fileno, const char *eDescription)
Definition: RockRebuild.cc:671
void freeUnusedSlot(const SlotId slotId, const bool invalid)
freeSlot() for never-been-mapped slots
Definition: RockRebuild.cc:746
std::atomic< StoreMapSliceId > start
where the chain of StoreEntry slices begins [app]
Definition: StoreMap.h:111
int64_t dbSize
Definition: RockRebuild.h:114
void finalizeOrThrow(const sfileno fileNo, LoadingEntry &le)
Definition: RockRebuild.cc:599
uint64_t slotSize
all db slots are of this size
Definition: RockSwapDir.h:83
int dbSlotSize
the size of a db cell, including the cell header
Definition: RockRebuild.h:115
void anchored(const bool beAnchored)
Definition: RockRebuild.cc:127
#define SQUID_MD5_DIGEST_LENGTH
Definition: md5.h:66
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
std::atomic< StoreMapSliceId > next
ID of the next entry slice.
Definition: StoreMap.h:49
#define O_BINARY
Definition: defines.h:134
struct Ipc::StoreMapAnchor::Basics basics
int size
Definition: ModDevPoll.cc:69
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format
Definition: gadgets.cc:18
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
Versions & versions() const
Definition: RockRebuild.cc:177
time_t getCurrentTime() STUB_RETVAL(0) int tvSubUsec(struct timeval
bool IamDiskProcess() STUB_RETVAL_NOP(false) bool InDaemonMode() STUB_RETVAL_NOP(false) bool UsingSmp() STUB_RETVAL_NOP(false) bool IamCoordinatorProcess() STUB_RETVAL(false) bool IamPrimaryProcess() STUB_RETVAL(false) int NumberOfKids() STUB_RETVAL(0) void setMaxFD(void) STUB void setSystemLimits(void) STUB void squid_signal(int
whether the current process is dedicated to managing a cache_dir
void failure(const char *msg, int errNo=0)
a helper to handle rebuild-killing I/O errors
Definition: RockRebuild.cc:708
int32_t StoreMapSliceId
Definition: StoreMap.h:24
LoadingFlags & flags
entry flags (see the above accessors) are ours
Definition: RockRebuild.cc:130
int64_t entryLimitActual() const
max number of possible entries in db
Definition: RockSwapDir.cc:203
void startShutdown() override
Definition: RockRebuild.cc:333
Definition: MemBuf.h:23
char * path
Definition: Disk.h:102
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
void addSlotToEntry(const sfileno fileno, const SlotId slotId, const DbCellHeader &header)
Definition: RockRebuild.cc:782
void validationSteps()
Definition: RockRebuild.cc:560
void state(State aState) const
Definition: RockRebuild.cc:123
int64_t diskOffsetLimit() const
Definition: RockSwapDir.cc:690
static bool ZeroedSlot(const MemBuf &buf)
Definition: RockRebuild.cc:516
Mores::Owner * moresOwner
LoadingSlot::more for all slots.
Definition: RockRebuild.cc:189
void setKey(const cache_key *const aKey)
Definition: StoreMap.cc:945
#define CallJobHere(debugSection, debugLevel, job, Class, method)
Definition: AsyncJobCalls.h:59
#define shm_old(Class)
Definition: Pointer.h:201
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
LoadingSlot(const SlotId slotId, LoadingParts &source)
Definition: RockRebuild.cc:208
bool doneLoading() const
Definition: RockRebuild.cc:383
#define assert(EX)
Definition: assert.h:17
bool storeRebuildLoadEntry(int fd, int diskIndex, MemBuf &buf, StoreRebuildData &)
loads entry from disk; fills supplied memory buffer on success
void loadOneSlot()
Definition: RockRebuild.cc:469
static Owner * Old(const char *const id)
attaches to the existing shared memory segment, becoming its owner
Definition: Pointer.h:123
std::atomic< uint64_t > swap_file_sz
Definition: StoreMap.h:105
void fill(const Item &value)
reset all items to the same value
Definition: StoreMap.h:145
int64_t dbSlotLimit
total number of db cells
Definition: RockRebuild.h:116
static int version
SBuf buf()
bytes written so far
Definition: Stream.h:41
static Ipc::Mem::Owner< Stats > * Init(const SwapDir &)
Definition: RockRebuild.cc:258
int64_t int64Percent(const int64_t a, const int64_t b)
Definition: SquidMath.cc:19
bool completed(const SwapDir &) const
whether the rebuild is finished already
Definition: RockRebuild.cc:264
uint8_t finalized
whether finalizeOrThrow() has scanned the slot
Definition: RockRebuild.cc:105
void validateOneSlot(const SlotId slotId)
Definition: RockRebuild.cc:661
T::Owner * createOwner(const char *dirPath, const char *sfx, const int64_t limit, const bool resuming)
Definition: RockRebuild.cc:218
static int store_dirs_rebuilding
the number of cache_dirs being rebuilt; TODO: move to Disks::Rebuilding
Definition: Controller.h:133
static std::ostream & Extra(std::ostream &)
Definition: debug.cc:1316
void checkpoint()
continues after a pause if not done
Definition: RockRebuild.cc:376
signed_int32_t sfileno
Definition: forward.h:22
void useNewSlot(const SlotId slotId, const DbCellHeader &header)
handle freshly loaded (and validated) db slot header
Definition: RockRebuild.cc:888
bool freed() const
Definition: RockRebuild.cc:151
static SBuf Name(const SBuf &prefix, const char *suffix)
concatenates parts of a name to form a complete name (or its prefix)
Definition: Segment.cc:52
uint64_t & size
payload seen so far
Definition: RockRebuild.cc:115
const char * filePath
location of cache storage file inside path/
Definition: RockSwapDir.h:135
static bool IsResponsible(const SwapDir &)
whether the current kid is responsible for rebuilding the given cache_dir
Definition: RockRebuild.cc:273
virtual void callException(const std::exception &) override
called when the job throws during an async call
Definition: RockRebuild.cc:699
uint32_t & version
DbCellHeader::version to distinguish same-URL chains.
Definition: RockRebuild.cc:116
~Rebuild() override
Definition: RockRebuild.cc:323
Versions::Owner * versionsOwner
LoadingEntry::version for all entries.
Definition: RockRebuild.cc:186
advancement of work that consists of (usually known number) of similar steps
Definition: store_rebuild.h:46
uint32_t number
page number within the segment
Definition: Page.h:42
void finalized(const bool beFinalized)
Definition: RockRebuild.cc:148
void validateOneEntry(const sfileno fileNo)
Definition: RockRebuild.cc:643
sfileno SlotId
db cell number, starting with cell 0 (always occupied by the db header)
Definition: forward.h:30
int64_t validations
the number of validated cache entries, slots
Definition: store_rebuild.h:41
an std::runtime_error with thrower location info
Definition: TextException.h:20
char * content()
start of the added data
Definition: MemBuf.h:41
Ipc::StoreMapSliceId & more
another slot in some chain belonging to the same entry (unordered!)
Definition: RockRebuild.cc:140
static StoreRebuildData counts
Sizes::Owner * sizesOwner
LoadingEntry::size for all entries.
Definition: RockRebuild.cc:185
bool mapped() const
Definition: RockRebuild.cc:143
static bool Start(SwapDir &dir)
Definition: RockRebuild.cc:280
#define Must(condition)
Definition: TextException.h:75
int64_t slotLimitActual() const
total number of slots in this db
Definition: RockSwapDir.cc:194
#define Important(id)
Definition: Messages.h:93
uint64_t swap_file_sz
Definition: Store.h:229
#define DBG_IMPORTANT
Definition: Stream.h:38
void freeSlot(const SlotId slotId, const bool invalid)
adds slot to the free slot index
Definition: RockRebuild.cc:726
int index
Definition: Disk.h:103
static const int64_t HeaderSize
on-disk db header size
Definition: RockSwapDir.h:151
LoadingFlags & flags
slot flags (see the above accessors) are ours
Definition: RockRebuild.cc:157
ssize_t mb_size_t
Definition: MemBuf.h:17
Flags::Owner * flagsOwner
all LoadingEntry and LoadingSlot flags
Definition: RockRebuild.cc:192
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
void start() override
prepares and initiates entry loading sequence
Definition: RockRebuild.cc:340
void loadingSteps()
Definition: RockRebuild.cc:419
void swanSong() override
Definition: RockRebuild.cc:691
int opt_foreground_rebuild
void mapSlot(const SlotId slotId, const DbCellHeader &header)
adds slot to the entry chain in the map
Definition: RockRebuild.cc:756
std::atomic< Size > size
slice contents size
Definition: StoreMap.h:48
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
int file_open(const char *path, int mode)
Definition: fs_io.cc:65
bool doneAll() const override
whether positive goal has been reached
Definition: RockRebuild.cc:395
bool UsingSmp()
Whether there should be more than one worker process running.
Definition: tools.cc:696
LoadingEntry(const sfileno fileNo, LoadingParts &source)
Definition: RockRebuild.cc:199
low-level anti-padding storage class for LoadingEntry and LoadingSlot flags
Definition: RockRebuild.cc:94
static bool DoneValidating(const int64_t validationPos, const int64_t dbSlotLimit, const int64_t dbEntryLimit)
Definition: RockRebuild.cc:86
cache_dir indexing statistics shared across same-kid process restarts
Definition: RockRebuild.h:36
Class * object()
Raw access; handy to finalize initiatization, but avoid if possible.
Definition: Pointer.h:43
void freed(const bool beFreed)
Definition: RockRebuild.cc:152
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
Sizes & sizes() const
Definition: RockRebuild.cc:176
uint8_t freed
whether the slot was given to the map as free space
Definition: RockRebuild.cc:106
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
static SBuf Path(const char *dirPath)
Definition: RockRebuild.cc:252
static PoolId IdForSwapDirSpace(const int dirIdx)
stack of free rock cache_dir slot numbers
Definition: PageStack.h:171
void finalizeOrFree(const sfileno fileNo, LoadingEntry &le)
Definition: RockRebuild.cc:633
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37

 

Introduction

Documentation

Support

Miscellaneous