RockIoState.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 79 Disk IO Routines */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "CollapsedForwarding.h"
14 #include "DiskIO/DiskIOModule.h"
15 #include "DiskIO/DiskIOStrategy.h"
16 #include "DiskIO/WriteRequest.h"
17 #include "fs/rock/RockIoRequests.h"
18 #include "fs/rock/RockIoState.h"
19 #include "fs/rock/RockSwapDir.h"
20 #include "globals.h"
21 #include "MemObject.h"
22 #include "Parsing.h"
23 #include "Transients.h"
24 
26  StoreEntry *anEntry,
28  void *data) :
29  StoreIOState(cbIo, data),
30  readableAnchor_(nullptr),
31  writeableAnchor_(nullptr),
32  splicingPoint(-1),
33  staleSplicingPointNext(-1),
34  dir(aDir),
35  slotSize(dir->slotSize),
36  objOffset(0),
37  sidFirst(-1),
38  sidPrevious(-1),
39  sidCurrent(-1),
40  sidNext(-1),
41  requestsSent(0),
42  repliesReceived(0),
43  theBuf(dir->slotSize)
44 {
45  e = anEntry;
46  e->lock("rock I/O");
47  // anchor, swap_filen, and swap_dirn are set by the caller
48  ++store_open_disk_fd; // TODO: use a dedicated counter?
49  //theFile is set by SwapDir because it depends on DiskIOStrategy
50 }
51 
53 {
55 
56  // The dir map entry may still be open for reading at the point because
57  // the map entry lock is associated with StoreEntry, not IoState.
58  // assert(!readableAnchor_);
59  assert(shutting_down || !writeableAnchor_);
60 
61  cbdataReferenceDone(callback_data);
62  theFile = nullptr;
63 
64  e->unlock("rock I/O");
65 }
66 
67 void
69 {
70  assert(!theFile);
71  assert(aFile != nullptr);
72  theFile = aFile;
73 }
74 
75 const Ipc::StoreMapAnchor &
77 {
78  assert(readableAnchor_);
79  return *readableAnchor_;
80 }
81 
84 {
85  assert(writeableAnchor_);
86  return *writeableAnchor_;
87 }
88 
90 const Ipc::StoreMapSlice &
92 {
93  return dir->map->readableSlice(swap_filen, sidCurrent);
94 }
95 
96 void
97 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
98 {
99  debugs(79, 7, swap_filen << " reads from " << coreOff);
100 
101  assert(theFile != nullptr);
102  assert(coreOff >= 0);
103 
104  bool writerLeft = readAnchor().writerHalted; // before the sidCurrent change
105 
106  // if we are dealing with the first read or
107  // if the offset went backwords, start searching from the beginning
108  if (sidCurrent < 0 || coreOff < objOffset) {
109  // readers do not need sidFirst but set it for consistency/triage sake
110  sidCurrent = sidFirst = readAnchor().start;
111  objOffset = 0;
112  }
113 
114  while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
115  writerLeft = readAnchor().writerHalted; // before the sidCurrent change
116  objOffset += currentReadableSlice().size;
117  sidCurrent = currentReadableSlice().next;
118  }
119 
120  assert(read.callback == nullptr);
121  assert(read.callback_data == nullptr);
122  read.callback = cb;
123  read.callback_data = cbdataReference(data);
124 
125  // quit if we cannot read what they want, and the writer cannot add more
126  if (sidCurrent < 0 && writerLeft) {
127  debugs(79, 5, "quitting at " << coreOff << " in " << *e);
128  callReaderBack(buf, -1);
129  return;
130  }
131 
132  // punt if read offset is too big (because of client bugs or collapsing)
133  if (sidCurrent < 0) {
134  debugs(79, 5, "no " << coreOff << " in " << *e);
135  callReaderBack(buf, 0);
136  return;
137  }
138 
139  offset_ = coreOff;
140  len = min(len,
141  static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
142  const uint64_t diskOffset = dir->diskOffset(sidCurrent);
143  const auto start = diskOffset + sizeof(DbCellHeader) + coreOff - objOffset;
144  const auto id = ++requestsSent;
145  const auto request = new ReadRequest(::ReadRequest(buf, start, len), this, id);
146  theFile->read(request);
147 }
148 
149 void
150 Rock::IoState::handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
151 {
152  if (errFlag != DISK_OK || rlen < 0) {
153  debugs(79, 3, errFlag << " failure for " << *e);
154  return callReaderBack(request.buf, -1);
155  }
156 
157  if (!expectedReply(request.id))
158  return callReaderBack(request.buf, -1);
159 
160  debugs(79, 5, '#' << request.id << " read " << rlen << " bytes at " << offset_ << " for " << *e);
161  offset_ += rlen;
162  callReaderBack(request.buf, rlen);
163 }
164 
166 void
167 Rock::IoState::callReaderBack(const char *buf, int rlen)
168 {
169  splicingPoint = rlen >= 0 ? sidCurrent : -1;
170  if (splicingPoint < 0)
171  staleSplicingPointNext = -1;
172  else
173  staleSplicingPointNext = currentReadableSlice().next;
174  StoreIOState::STRCB *callb = read.callback;
175  assert(callb);
176  read.callback = nullptr;
177  void *cbdata;
178  if (cbdataReferenceValidDone(read.callback_data, &cbdata))
179  callb(cbdata, buf, rlen, this);
180 }
181 
183 bool
184 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
185 {
186  bool success = false;
187  try {
188  tryWrite(buf, size, coreOff);
189  success = true;
190  } catch (const std::exception &ex) { // TODO: should we catch ... as well?
191  debugs(79, 2, "db write error: " << ex.what());
192  dir->writeError(*this);
193  finishedWriting(DISK_ERROR);
194  // 'this' might be gone beyond this point; fall through to free buf
195  }
196 
197  // careful: 'this' might be gone here
198 
199  if (dtor)
200  (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
201 
202  return success;
203 }
204 
211 void
212 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
213 {
214  debugs(79, 7, swap_filen << " writes " << size << " more");
215 
216  // either this is the first write or append;
217  // we do not support write gaps or rewrites
218  assert(!coreOff || coreOff == -1);
219 
220  // throw if an accepted unknown-size entry grew too big or max-size changed
221  Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
222 
223  // buffer incoming data in slot buffer and write overflowing or final slots
224  // quit when no data left or we stopped writing on reentrant error
225  while (size > 0 && theFile != nullptr) {
226  const size_t processed = writeToBuffer(buf, size);
227  buf += processed;
228  size -= processed;
229  const bool overflow = size > 0;
230 
231  // We do not write a full buffer without overflow because
232  // we do not want to risk writing a payload-free slot on EOF.
233  if (overflow) {
234  Must(sidNext < 0);
235  sidNext = dir->reserveSlotForWriting();
236  assert(sidNext >= 0);
237  writeToDisk();
238  Must(sidNext < 0); // short sidNext lifetime simplifies code logic
239  }
240  }
241 
242 }
243 
246 size_t
247 Rock::IoState::writeToBuffer(char const *buf, size_t size)
248 {
249  // do not buffer a cell header for nothing
250  if (!size)
251  return 0;
252 
253  if (!theBuf.size) {
254  // eventually, writeToDisk() will fill this header space
255  theBuf.appended(sizeof(DbCellHeader));
256  }
257 
258  size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
259  theBuf.append(buf, forCurrentSlot);
260  offset_ += forCurrentSlot; // so that Core thinks we wrote it
261  return forCurrentSlot;
262 }
263 
265 void
267 {
268  assert(theFile != nullptr);
269  assert(theBuf.size >= sizeof(DbCellHeader));
270 
271  assert((sidFirst < 0) == (sidCurrent < 0));
272  if (sidFirst < 0) // this is the first disk write
273  sidCurrent = sidFirst = dir->reserveSlotForWriting();
274 
275  // negative sidNext means this is the last write request for this entry
276  const bool lastWrite = sidNext < 0;
277  // here, eof means that we are writing the right-most entry slot
278  const bool eof = lastWrite &&
279  // either not updating or the updating reader has loaded everything
280  (touchingStoreEntry() || staleSplicingPointNext < 0);
281  debugs(79, 5, "sidCurrent=" << sidCurrent << " sidNext=" << sidNext << " eof=" << eof);
282 
283  // TODO: if DiskIO module is mmap-based, we should be writing whole pages
284  // to avoid triggering read-page;new_head+old_tail;write-page overheads
285 
286  assert(!eof || sidNext < 0); // no slots after eof
287 
288  // finalize db cell header
289  DbCellHeader header;
290  memcpy(header.key, e->key, sizeof(header.key));
291  header.firstSlot = sidFirst;
292 
293  const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
294  assert(!lastUpdatingWrite || sidNext < 0);
295  header.nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
296 
297  header.payloadSize = theBuf.size - sizeof(DbCellHeader);
298  header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
299  header.version = writeAnchor().basics.timestamp;
300 
301  // copy finalized db cell header into buffer
302  memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
303 
304  // and now allocate another buffer for the WriteRequest so that
305  // we can support concurrent WriteRequests (and to ease cleaning)
306  // TODO: should we limit the number of outstanding requests?
307  size_t wBufCap = 0;
308  void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
309  memcpy(wBuf, theBuf.mem, theBuf.size);
310 
311  const uint64_t diskOffset = dir->diskOffset(sidCurrent);
312  debugs(79, 5, swap_filen << " at " << diskOffset << '+' <<
313  theBuf.size);
314  const auto id = ++requestsSent;
315  WriteRequest *const r = new WriteRequest(
316  ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
317  memFreeBufFunc(wBufCap)), this, id);
318  r->sidCurrent = sidCurrent;
319  r->sidPrevious = sidPrevious;
320  r->eof = lastWrite;
321 
322  sidPrevious = sidCurrent;
323  sidCurrent = sidNext; // sidNext may be cleared/negative already
324  sidNext = -1;
325 
326  theBuf.clear();
327 
328  // theFile->write may call writeCompleted immediately
329  theFile->write(r);
330 }
331 
332 bool
334 {
335  Must(requestsSent); // paranoid: we sent some requests
336  Must(receivedId); // paranoid: the request was sent by some sio
337  Must(receivedId <= requestsSent); // paranoid: within our range
338  ++repliesReceived;
339  const auto expectedId = repliesReceived;
340  if (receivedId == expectedId)
341  return true;
342 
343  debugs(79, 3, "no; expected reply #" << expectedId <<
344  ", but got #" << receivedId);
345  return false;
346 }
347 
348 void
350 {
351  if (sidCurrent >= 0) {
352  dir->noteFreeMapSlice(sidCurrent);
353  sidCurrent = -1;
354  }
355  if (sidNext >= 0) {
356  dir->noteFreeMapSlice(sidNext);
357  sidNext = -1;
358  }
359 
360  // we incremented offset_ while accumulating data in write()
361  // we do not reset writeableAnchor_ here because we still keep the lock
362  if (touchingStoreEntry())
364  callBack(errFlag);
365 }
366 
367 void
369 {
370  debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
371  " leftovers: " << theBuf.size <<
372  " after " << requestsSent << '/' << repliesReceived <<
373  " callback: " << callback);
374 
375  if (!theFile) {
376  debugs(79, 3, "I/O already canceled");
377  assert(!callback);
378  // We keep writeableAnchor_ after callBack() on I/O errors.
379  assert(!readableAnchor_);
380  return;
381  }
382 
383  switch (how) {
384  case wroteAll:
385  assert(theBuf.size > 0); // we never flush last bytes on our own
386  try {
387  writeToDisk(); // flush last, yet unwritten slot to disk
388  return; // writeCompleted() will callBack()
389  }
390  catch (...) {
391  debugs(79, 2, "db flush error: " << CurrentException);
392  // TODO: Move finishedWriting() into SwapDir::writeError().
393  dir->writeError(*this);
394  finishedWriting(DISK_ERROR);
395  }
396  return;
397 
398  case writerGone:
399  dir->writeError(*this); // abort a partially stored entry
400  finishedWriting(DISK_ERROR);
401  return;
402 
403  case readerDone:
404  callBack(0);
405  return;
406  }
407 }
408 
412 {
413 public:
414  StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
415  callback(nullptr),
416  callback_data(nullptr),
417  errflag(err),
418  sio(anSio) {
419 
420  callback = cb;
421  callback_data = cbdataReference(data);
422  }
423 
425  callback(nullptr),
426  callback_data(nullptr),
427  errflag(cb.errflag),
428  sio(cb.sio) {
429 
430  callback = cb.callback;
431  callback_data = cbdataReference(cb.callback_data);
432  }
433 
434  ~StoreIOStateCb() override {
435  cbdataReferenceDone(callback_data); // may be nil already
436  }
437 
438  void dial(AsyncCall &) {
439  void *cbd;
440  if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
441  callback(cbd, errflag, sio.getRaw());
442  }
443 
444  bool canDial(AsyncCall &) const {
445  return cbdataReferenceValid(callback_data) && callback;
446  }
447 
448  void print(std::ostream &os) const override {
449  os << '(' << callback_data << ", err=" << errflag << ')';
450  }
451 
452 private:
453  StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
454 
457  int errflag;
459 };
460 
461 void
463 {
464  debugs(79,3, "errflag=" << errflag);
465  theFile = nullptr;
466 
467  AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
468  StoreIOStateCb(callback, callback_data, errflag, this));
469  ScheduleCallHere(call);
470 
471  callback = nullptr;
472  cbdataReferenceDone(callback_data);
473 }
474 
uint64_t entrySize
total entry content size or zero if still unknown
Definition: RockDbCell.h:42
void STRCB(void *their_data, const char *buf, ssize_t len, StoreIOState::Pointer self)
Definition: StoreIOState.h:29
StoreIOState::STIOCB * callback
Definition: RockIoState.cc:455
StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio)
Definition: RockIoState.cc:414
#define cbdataReferenceValidDone(var, ptr)
Definition: cbdata.h:239
void FREE(void *)
Definition: forward.h:37
FREE * memFreeBufFunc(size_t size)
Definition: minimal.cc:79
void * memAllocBuf(size_t net_size, size_t *gross_size)
Definition: minimal.cc:46
bool canDial(AsyncCall &) const
Definition: RockIoState.cc:444
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
void lock(const char *context)
Definition: store.cc:445
SlotId sidCurrent
slot being written using this write request
Definition: cbdata.cc:37
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
#define cbdataReference(var)
Definition: cbdata.h:348
int store_open_disk_fd
void callReaderBack(const char *buf, int rlen)
report (already sanitized/checked) I/O results to the read initiator
Definition: RockIoState.cc:167
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
#define DISK_OK
Definition: defines.h:27
StoreIOStateCb(const StoreIOStateCb &cb)
Definition: RockIoState.cc:424
Rock::IoState::Pointer sio
Definition: RockIoState.cc:458
int size
Definition: ModDevPoll.cc:69
IoXactionId id
identifies this read transaction for the requesting IoState
SlotId sidPrevious
slot that will point to sidCurrent in the cache_dir map
uint64_t key[2]
StoreEntry key.
Definition: RockDbCell.h:41
Ipc::StoreMapAnchor & writeAnchor()
Definition: RockIoState.cc:83
uint32_t version
detects conflicts among same-key entries
Definition: RockDbCell.h:44
const Ipc::StoreMapSlice & currentReadableSlice() const
convenience wrapper returning the map slot we are reading now
Definition: RockIoState.cc:91
#define assert(EX)
Definition: assert.h:17
void callBack(int errflag)
Definition: RockIoState.cc:462
void writeToDisk()
write what was buffered during write() calls
Definition: RockIoState.cc:266
bool expectedReply(const IoXactionId receivedId)
Definition: RockIoState.cc:333
std::ostream & CurrentException(std::ostream &os)
prints active (i.e., thrown but not yet handled) exception
#define cbdataReferenceDone(var)
Definition: cbdata.h:357
void read_(char *buf, size_t size, off_t offset, STRCB *callback, void *callback_data) override
Definition: RockIoState.cc:97
void finishedWriting(const int errFlag)
called by SwapDir::writeCompleted() after the last write and on error
Definition: RockIoState.cc:349
bool eof
whether this is the last request for the entry
uint64_t IoXactionId
unique (within a given IoState object scope) I/O transaction identifier
Definition: forward.h:36
void file(const RefCount< DiskFile > &aFile)
Definition: RockIoState.cc:68
void handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
forwards read data (or an error) to the reader that initiated this I/O
Definition: RockIoState.cc:150
~StoreIOStateCb() override
Definition: RockIoState.cc:434
char * buf
Definition: ReadRequest.h:24
void dial(AsyncCall &)
Definition: RockIoState.cc:438
void STIOCB(void *their_data, int errflag, StoreIOState::Pointer self)
Definition: StoreIOState.h:39
IoState(Rock::SwapDir::Pointer &, StoreEntry *, StoreIOState::STIOCB *, void *cbData)
Definition: RockIoState.cc:25
void print(std::ostream &os) const override
Definition: RockIoState.cc:448
#define Must(condition)
Definition: TextException.h:75
bool write(char const *buf, size_t size, off_t offset, FREE *free_func) override
wraps tryWrite() to handle deep write failures centrally and safely
Definition: RockIoState.cc:184
void * callback_data
Definition: RockIoState.cc:456
StoreEntry * e
Definition: StoreIOState.h:73
const Ipc::StoreMapAnchor & readAnchor() const
Definition: RockIoState.cc:76
~IoState() override
Definition: RockIoState.cc:52
uint32_t payloadSize
slot contents size, always positive
Definition: RockDbCell.h:43
int shutting_down
#define DISK_ERROR
Definition: defines.h:28
size_t writeToBuffer(char const *buf, size_t size)
Definition: RockIoState.cc:247
static void Broadcast(const StoreEntry &e, const bool includingThisWorker=false)
notify other workers about changes in entry state (e.g., new data)
sfileno nextSlot
slot ID of the next slot occupied by the entry
Definition: RockDbCell.h:46
void tryWrite(char const *buf, size_t size, off_t offset)
Definition: RockIoState.cc:212
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
const A & min(A const &lhs, A const &rhs)
void close(int how) override
finish or abort swapping per CloseHow
Definition: RockIoState.cc:368
sfileno firstSlot
slot ID of the first slot occupied by the entry
Definition: RockDbCell.h:45

 

Introduction

Documentation

Support

Miscellaneous