RockHeaderUpdater.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "debug/Stream.h"
13 #include "fs/rock/RockIoState.h"
14 #include "mime_header.h"
15 #include "Store.h"
16 #include "store/SwapMetaIn.h"
17 
18 CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater);
19 
21  AsyncJob("Rock::HeaderUpdater"),
22  store(aStore),
23  update(anUpdate),
24  reader(),
25  writer(),
26  bytesRead(0),
27  staleSwapHeaderSize(0),
28  staleSplicingPointNext(-1)
29 {
30  // TODO: Consider limiting the number of concurrent store updates.
31 }
32 
33 bool
35 {
36  return !reader && !writer && AsyncJob::doneAll();
37 }
38 
39 void
41 {
42  if (update.stale || update.fresh)
43  store->map->abortUpdating(update);
44 
45  if (reader) {
46  reader->close(StoreIOState::readerDone);
47  reader = nullptr;
48  }
49 
50  if (writer) {
51  writer->close(StoreIOState::writerGone);
52  // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53  // Also required to avoid IoState destructor assertions.
54  // We can do this because we closed update earlier or aborted it above.
55  dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56  writer = nullptr;
57  }
58 
60 }
61 
62 void
64 {
65  Must(update.entry);
66  Must(update.stale);
67  Must(update.fresh);
68  startReading();
69 }
70 
71 void
73 {
74  reader = store->openStoreIO(
75  *update.entry,
76  &NoteDoneReading,
77  this);
78  readMore("need swap entry metadata");
79 }
80 
81 void
83 {
84  debugs(47, 7, why);
85 
86  Must(reader);
87  const IoState &rockReader = dynamic_cast<IoState&>(*reader);
88  update.stale.splicingPoint = rockReader.splicingPoint;
89  staleSplicingPointNext = rockReader.staleSplicingPointNext;
90  debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
91  " body continues at " << staleSplicingPointNext);
92 
93  reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
94  reader = nullptr; // so that swanSong() does not try to close again
95 }
96 
97 void
98 Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
99 {
100  IoCbParams io(buf, result);
101  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
102  CallJobHere1(47, 7,
103  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
105  noteRead,
106  io);
107 }
108 
109 void
111 {
112  debugs(47, 7, result.size);
113  if (!result.size) { // EOF
114  stopReading("eof");
115  } else {
116  Must(result.size > 0);
117  bytesRead += result.size;
118  readerBuffer.rawAppendFinish(result.buf, result.size);
119  exchangeBuffer.append(readerBuffer);
120  debugs(47, 7, "accumulated " << exchangeBuffer.length());
121  }
122 
123  parseReadBytes();
124 }
125 
126 void
128 {
129  debugs(47, 7, "from " << bytesRead << " because " << why);
130  Must(reader);
131  readerBuffer.clear();
132  storeRead(reader,
133  readerBuffer.rawAppendStart(store->slotSize),
134  store->slotSize,
135  bytesRead,
136  &NoteRead,
137  this);
138 }
139 
140 void
142 {
143  // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
144  CallJobHere1(47, 7,
145  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
147  noteDoneReading,
148  errflag);
149 }
150 
151 void
153 {
154  debugs(47, 5, errflag << " writer=" << writer);
155  if (!reader) {
156  Must(!errflag); // we only initiate successful closures
157  Must(writer); // otherwise we would be done() and would not be called
158  } else {
159  reader = nullptr; // we are done reading
160  Must(errflag); // any external closures ought to be errors
161  mustStop("read error");
162  }
163 }
164 
165 void
167 {
168  writer = store->createUpdateIO(
169  update,
170  &NoteDoneWriting,
171  this);
172  Must(writer);
173 
174  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
175  rockWriter.staleSplicingPointNext = staleSplicingPointNext;
176 
177  // here, prefix is swap header plus HTTP reply header (i.e., updated bytes)
178  uint64_t stalePrefixSz = 0;
179  uint64_t freshPrefixSz = 0;
180 
181  off_t offset = 0; // current writing offset (for debugging)
182 
183  const auto &mem = update.entry->mem();
184 
185  {
186  debugs(20, 7, "fresh store meta for " << *update.entry);
187  size_t freshSwapHeaderSize = 0; // set by getSerialisedMetaData() below
188 
189  // There is a circular dependency between the correct/fresh value of
190  // entry->swap_file_sz and freshSwapHeaderSize. We break that loop by
191  // serializing zero swap_file_sz, just like the regular first-time
192  // swapout code may do. De-serializing code will re-calculate it in
193  // storeRebuildParseEntry(). TODO: Stop serializing entry->swap_file_sz.
194  const auto savedEntrySwapFileSize = update.entry->swap_file_sz;
195  update.entry->swap_file_sz = 0;
196  const auto freshSwapHeader = update.entry->getSerialisedMetaData(freshSwapHeaderSize);
197  update.entry->swap_file_sz = savedEntrySwapFileSize;
198 
199  Must(freshSwapHeader);
200  writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
201  stalePrefixSz += mem.swap_hdr_sz;
202  freshPrefixSz += freshSwapHeaderSize;
203  offset += freshSwapHeaderSize;
204  xfree(freshSwapHeader);
205  }
206 
207  {
208  debugs(20, 7, "fresh HTTP header @ " << offset);
209  const auto httpHeader = mem.freshestReply().pack();
210  writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
211  const auto &staleReply = mem.baseReply();
212  Must(staleReply.hdr_sz >= 0); // for int-to-uint64_t conversion below
213  Must(staleReply.hdr_sz > 0); // already initialized
214  stalePrefixSz += staleReply.hdr_sz;
215  freshPrefixSz += httpHeader->contentSize();
216  offset += httpHeader->contentSize();
217  delete httpHeader;
218  }
219 
220  {
221  debugs(20, 7, "moved HTTP body prefix @ " << offset);
222  writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
223  offset += exchangeBuffer.length();
224  exchangeBuffer.clear();
225  }
226 
227  debugs(20, 7, "wrote " << offset <<
228  "; swap_file_sz delta: -" << stalePrefixSz << " +" << freshPrefixSz);
229 
230  // Optimistic early update OK: Our write lock blocks access to swap_file_sz.
231  auto &swap_file_sz = update.fresh.anchor->basics.swap_file_sz;
232  Must(swap_file_sz >= stalePrefixSz);
233  swap_file_sz -= stalePrefixSz;
234  swap_file_sz += freshPrefixSz;
235 
236  writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
237 }
238 
239 void
241 {
242  CallJobHere1(47, 7,
243  CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
245  noteDoneWriting,
246  errflag);
247 }
248 
249 void
251 {
252  debugs(47, 5, errflag << " reader=" << reader);
253  Must(!errflag);
254  Must(!reader); // if we wrote everything, then we must have read everything
255 
256  Must(writer);
257  IoState &rockWriter = dynamic_cast<IoState&>(*writer);
258  update.fresh.splicingPoint = rockWriter.splicingPoint;
259  debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
260  store->map->closeForUpdating(update);
261  rockWriter.writeableAnchor_ = nullptr;
262  writer = nullptr; // we are done writing
263 
264  Must(doneAll());
265 }
266 
267 void
269 {
270  if (!staleSwapHeaderSize) {
271  staleSwapHeaderSize = Store::UnpackSwapMetaSize(exchangeBuffer);
272  // Squid assumes that metadata always fits into a single db slot
273  debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
274  Must(staleSwapHeaderSize > 0);
275  exchangeBuffer.consume(staleSwapHeaderSize);
276  }
277 
278  const size_t staleHttpHeaderSize = headersEnd(
279  exchangeBuffer.rawContent(),
280  exchangeBuffer.length());
281  debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
282  if (!staleHttpHeaderSize) {
283  readMore("need more stale HTTP reply header data");
284  return;
285  }
286 
287  exchangeBuffer.consume(staleHttpHeaderSize);
288  debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
289 
290  stopReading("read the last HTTP header slot");
291  startWriting();
292 }
293 
SlotId splicingPoint
the last db slot successfully read or written
Definition: RockIoState.h:60
SlotId staleSplicingPointNext
Definition: RockIoState.h:63
Definition: forward.h:27
virtual void swanSong()
Definition: AsyncJob.h:61
void noteDoneWriting(int errflag)
size_t UnpackSwapMetaSize(const SBuf &)
Definition: SwapMetaIn.cc:237
bool doneAll() const override
whether positive goal has been reached
@ wroteAll
success: caller supplied all data it wanted to swap out
Definition: StoreIOState.h:58
@ readerDone
success or failure: either way, stop swapping in
Definition: StoreIOState.h:60
static StoreIOState::STIOCB NoteDoneReading
void swanSong() override
void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB *callback, void *callback_data)
Definition: store_io.cc:79
void noteRead(const IoCbParams result)
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
void noteDoneReading(int errflag)
void stopReading(const char *why)
void readMore(const char *why)
Aggregates information required for updating entry metadata and headers.
Definition: StoreMap.h:181
#define xfree
static StoreIOState::STRCB NoteRead
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
Definition: RockIoState.h:57
HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &update)
#define Must(condition)
Definition: TextException.h:75
void start() override
called by AsyncStart; do not call directly
@ writerGone
failure: caller left before swapping out everything
Definition: StoreIOState.h:59
CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater)
size_t headersEnd(const char *mime, size_t l, bool &containsObsFold)
Definition: mime_header.cc:17
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
static StoreIOState::STIOCB NoteDoneWriting
#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1)
Definition: AsyncJobCalls.h:64

 

Introduction

Documentation

Support

Miscellaneous