BodyPipe.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "base/AsyncJobCalls.h"
11 #include "base/TextException.h"
12 #include "BodyPipe.h"
13 
14 // BodySink is a BodyConsumer class which just consume and drops
15 // data from a BodyPipe
16 class BodySink: public BodyConsumer
17 {
19 
20 public:
21  BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
22  ~BodySink() override { assert(!body_pipe); }
23 
25  size_t contentSize = bp->buf().contentSize();
26  bp->consume(contentSize);
27  }
30  }
33  }
34  bool doneAll() const override {return !body_pipe && AsyncJob::doneAll();}
35 
36 private:
38 };
39 
41 
42 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
43 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
44 // the BodyPipe passed as argument
45 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
46 {
47 public:
49 
51  Parent::Method aHandler, BodyPipe::Pointer bp):
52  Parent(aProducer, aHandler, bp) {}
53 
54  bool canDial(AsyncCall &call) override;
55 };
56 
57 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
58 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
59 // of the BodyPipe passed as argument
60 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
61 {
62 public:
64 
66  Parent::Method aHandler, BodyPipe::Pointer bp):
67  Parent(aConsumer, aHandler, bp) {}
68 
69  bool canDial(AsyncCall &call) override;
70 };
71 
72 bool
74 {
75  if (!Parent::canDial(call))
76  return false;
77 
78  const BodyProducer::Pointer &producer = job;
79  BodyPipe::Pointer aPipe = arg1;
80  if (!aPipe->stillProducing(producer)) {
81  debugs(call.debugSection, call.debugLevel, producer << " no longer producing for " << aPipe->status());
82  return call.cancel("no longer producing");
83  }
84 
85  return true;
86 }
87 
88 bool
90 {
91  if (!Parent::canDial(call))
92  return false;
93 
94  const BodyConsumer::Pointer &consumer = job;
95  BodyPipe::Pointer aPipe = arg1;
96  if (!aPipe->stillConsuming(consumer)) {
97  debugs(call.debugSection, call.debugLevel, consumer << " no longer consuming from " << aPipe->status());
98  return call.cancel("no longer consuming");
99  }
100 
101  return true;
102 }
103 
104 /* BodyProducer */
105 
106 // inform the pipe that we are done and clear the Pointer
108 {
109  debugs(91,7, this << " will not produce for " << p << "; atEof: " << atEof);
110  assert(p != nullptr); // be strict: the caller state may depend on this
111  p->clearProducer(atEof);
112  p = nullptr;
113 }
114 
115 /* BodyConsumer */
116 
117 // inform the pipe that we are done and clear the Pointer
119 {
120  debugs(91,7, this << " will not consume from " << p);
121  assert(p != nullptr); // be strict: the caller state may depend on this
122  p->clearConsumer();
123  p = nullptr;
124 }
125 
126 /* BodyPipe */
127 
128 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
129  theProducer(aProducer), theConsumer(nullptr),
130  thePutSize(0), theGetSize(0),
131  mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
132 {
133  // TODO: teach MemBuf to start with zero minSize
134  // TODO: limit maxSize by theBodySize, when known?
135  theBuf.init(2*1024, MaxCapacity);
136  debugs(91,7, "created BodyPipe" << status());
137 }
138 
140 {
141  debugs(91,7, "destroying BodyPipe" << status());
144  theBuf.clean();
145 }
146 
147 void BodyPipe::setBodySize(uint64_t aBodySize)
148 {
149  assert(!bodySizeKnown());
150  assert(thePutSize <= aBodySize);
151 
152  // If this assert fails, we need to add code to check for eof and inform
153  // the consumer about the eof condition via scheduleBodyEndNotification,
154  // because just setting a body size limit may trigger the eof condition.
156 
157  theBodySize = aBodySize;
158  debugs(91,7, "set body size" << status());
159 }
160 
161 uint64_t BodyPipe::bodySize() const
162 {
164  return static_cast<uint64_t>(theBodySize);
165 }
166 
167 bool BodyPipe::expectMoreAfter(uint64_t offset) const
168 {
169  assert(theGetSize <= offset);
170  return offset < thePutSize || // buffer has more now or
171  (!productionEnded() && mayNeedMoreData()); // buffer will have more
172 }
173 
175 {
176  return !expectMoreAfter(theGetSize);
177 }
178 
179 uint64_t BodyPipe::unproducedSize() const
180 {
181  return bodySize() - thePutSize; // bodySize() asserts that size is known
182 }
183 
185 {
186  const uint64_t expectedSize = thePutSize + size;
187  if (bodySizeKnown())
188  Must(bodySize() == expectedSize);
189  else
190  theBodySize = expectedSize;
191 }
192 
193 void
195 {
196  if (theProducer.set()) {
197  debugs(91,7, "clearing BodyPipe producer" << status());
198  theProducer.clear();
199  if (atEof) {
200  if (!bodySizeKnown())
202  else if (bodySize() != thePutSize)
203  debugs(91,3, "aborting on premature eof" << status());
204  } else {
205  // asserta that we can detect the abort if the consumer joins later
207  }
209  }
210 }
211 
212 size_t
213 BodyPipe::putMoreData(const char *aBuffer, size_t size)
214 {
215  if (bodySizeKnown())
216  size = min((uint64_t)size, unproducedSize());
217 
218  const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
219  if ((size = min(size, spaceSize))) {
220  theBuf.append(aBuffer, size);
221  postAppend(size);
222  return size;
223  }
224  return 0;
225 }
226 
227 bool
229 {
231  assert(aConsumer.set()); // but might be invalid
232 
233  // TODO: convert this into an exception and remove IfNotLate suffix
234  // If there is something consumed already, we are in an auto-consuming mode
235  // and it is too late to attach a real consumer to the pipe.
236  if (theGetSize > 0) {
238  return false;
239  }
240 
241  Must(!abortedConsumption); // did not promise to never consume
242 
243  theConsumer = aConsumer;
244  debugs(91,7, "set consumer" << status());
245  if (theBuf.hasContent())
247  if (!theProducer)
249 
250  return true;
251 }
252 
253 void
255 {
256  if (theConsumer.set()) {
257  debugs(91,7, "clearing consumer" << status());
258  theConsumer.clear();
259  // do not abort if we have not consumed so that HTTP or ICAP can retry
260  // benign xaction failures due to persistent connection race conditions
261  if (consumedSize())
263  }
264 }
265 
266 void
268 {
269  // We and enableAutoConsumption() may be called multiple times because
270  // multiple jobs on the consumption chain may realize that there will be no
271  // more setConsumer() calls (e.g., consuming code and retrying code). It is
272  // both difficult and unnecessary for them to coordinate their calls.
273 
274  // As a consequence, we may be called when already auto-consuming, including
275  // cases where abortedConsumption is still false. We could try to harden
276  // this by also aborting consumption from enableAutoConsumption() when there
277  // is no consumer, but see errorAppendEntry() TODO for a better plan.
278 
279  debugs(91, 7, status());
280  if (!abortedConsumption && !exhausted() && !theConsumer) {
281  AsyncCall::Pointer call= asyncCall(91, 7,
282  "BodyProducer::noteBodyConsumerAborted",
285  ScheduleCallHere(call);
286  abortedConsumption = true;
287 
288  // in case somebody enabled auto-consumption before regular one aborted
290  }
291 }
292 
293 size_t
295 {
296  if (!theBuf.hasContent())
297  return 0; // did not touch the possibly uninitialized buf
298 
299  if (aMemBuffer.isNull())
300  aMemBuffer.init();
301  const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
302  aMemBuffer.append(theBuf.content(), size);
304  postConsume(size);
305  return size; // cannot be zero if we called buf.init above
306 }
307 
308 void
310 {
312  postConsume(size);
313 }
314 
315 void
317 {
318  mustAutoConsume = true;
319  debugs(91,5, "enabled auto consumption" << status());
321 }
322 
326 void
328 {
329  const auto startNow =
330  mustAutoConsume && // was enabled
331  !theConsumer && // has not started yet
332  theProducer.valid() && // still useful (and will eventually stop)
333  theBuf.hasContent(); // has something to consume right now
334  if (!startNow)
335  return;
336 
337  theConsumer = new BodySink(this);
339  debugs(91,7, "starting auto consumption" << status());
341 }
342 
343 MemBuf &
345 {
347  isCheckedOut = true;
348  return theBuf;
349 }
350 
351 void
353 {
355  isCheckedOut = false;
356  const size_t currentSize = theBuf.contentSize();
357  if (checkout.checkedOutSize > currentSize)
358  postConsume(checkout.checkedOutSize - currentSize);
359  else if (checkout.checkedOutSize < currentSize)
360  postAppend(currentSize - checkout.checkedOutSize);
361 }
362 
363 void
365 {
367  const size_t currentSize = theBuf.contentSize();
368  // We can only undo if size did not change, and even that carries
369  // some risk. If this becomes a problem, the code checking out
370  // raw buffers should always check them in (possibly unchanged)
371  // instead of relying on the automated undo mechanism of Checkout.
372  // The code can always use a temporary buffer to accomplish that.
373  Must(checkout.checkedOutSize == currentSize);
374 }
375 
376 // TODO: Optimize: inform consumer/producer about more data/space only if
377 // they used the data/space since we notified them last time.
378 
379 void
381 {
383  theGetSize += size;
384  debugs(91,7, "consumed " << size << " bytes" << status());
385  if (mayNeedMoreData()) {
386  AsyncCall::Pointer call= asyncCall(91, 7,
387  "BodyProducer::noteMoreBodySpaceAvailable",
390  ScheduleCallHere(call);
391  }
392 }
393 
394 void
396 {
398  thePutSize += size;
399  debugs(91,7, "added " << size << " bytes" << status());
400 
401  // We should not consume here even if mustAutoConsume because the
402  // caller may not be ready for the data to be consumed during this call.
404 
405  // Do this check after scheduleBodyDataNotification() to ensure the
406  // natural order of "more body data" and "production ended" events.
407  if (!mayNeedMoreData())
408  clearProducer(true); // reached end-of-body
409 
411 }
412 
413 void
415 {
416  if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
417  AsyncCall::Pointer call = asyncCall(91, 7,
418  "BodyConsumer::noteMoreBodyDataAvailable",
421  ScheduleCallHere(call);
422  }
423 }
424 
425 void
427 {
428  if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
429  if (bodySizeKnown() && bodySize() == thePutSize) {
430  AsyncCall::Pointer call = asyncCall(91, 7,
431  "BodyConsumer::noteBodyProductionEnded",
434  ScheduleCallHere(call);
435  } else {
436  AsyncCall::Pointer call = asyncCall(91, 7,
437  "BodyConsumer::noteBodyProducerAborted",
440  ScheduleCallHere(call);
441  }
442  }
443 }
444 
445 // a short temporary string describing buffer status for debugging
446 const char *BodyPipe::status() const
447 {
448  static MemBuf outputBuffer;
449  outputBuffer.reset();
450 
451  outputBuffer.append(" [", 2);
452 
453  outputBuffer.appendf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
454  if (theBodySize >= 0)
455  outputBuffer.appendf("<=%" PRId64, theBodySize);
456  else
457  outputBuffer.append("<=?", 3);
458 
459  outputBuffer.appendf(" %" PRId64 "+%" PRId64, static_cast<int64_t>(theBuf.contentSize()), static_cast<int64_t>(theBuf.spaceSize()));
460 
461  outputBuffer.appendf(" pipe%p", this);
462  if (theProducer.set())
463  outputBuffer.appendf(" prod%p", theProducer.get());
464  if (theConsumer.set())
465  outputBuffer.appendf(" cons%p", theConsumer.get());
466 
467  if (mustAutoConsume)
468  outputBuffer.append(" A", 2);
469  if (abortedConsumption)
470  outputBuffer.append(" !C", 3);
471  if (isCheckedOut)
472  outputBuffer.append(" L", 2); // Locked
473 
474  outputBuffer.append("]", 1);
475 
476  outputBuffer.terminate();
477 
478  return outputBuffer.content();
479 }
480 
481 /* BodyPipeCheckout */
482 
484  buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
485  checkedOutSize(buf.contentSize()), checkedIn(false)
486 {
487 }
488 
490 {
491  if (!checkedIn) {
492  // Do not pipe.undoCheckOut(*this) because it asserts or throws
493  // TODO: consider implementing the long-term solution discussed at
494  // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
495  debugs(91,2, "Warning: cannot undo BodyPipeCheckout");
496  thePipe.checkIn(*this);
497  }
498 }
499 
500 void
502 {
503  assert(!checkedIn);
504  thePipe.checkIn(*this);
505  checkedIn = true;
506 }
507 
509  buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
510  checkedIn(c.checkedIn)
511 {
512  assert(false); // prevent copying
513 }
514 
517 {
518  assert(false); // prevent assignment
519  return *this;
520 }
521 
bool cancel(const char *reason)
Definition: AsyncCall.cc:56
const MemBuf & buf() const
Definition: BodyPipe.h:137
BodyProducerDialer(const BodyProducer::Pointer &aProducer, Parent::Method aHandler, BodyPipe::Pointer bp)
Definition: BodyPipe.cc:50
UnaryMemFunT< BodyProducer, BodyPipe::Pointer > Parent
Definition: BodyPipe.cc:48
Cbc * get() const
a temporary valid raw Cbc pointer or NULL
Definition: CbcPointer.h:159
Producer::Pointer theProducer
Definition: BodyPipe.h:158
bool stillProducing(const Producer::Pointer &producer) const
Definition: BodyPipe.h:121
MemBuf theBuf
Definition: BodyPipe.h:164
void terminate()
Definition: MemBuf.cc:241
bool mustAutoConsume
keep theBuf empty when producing without consumer
Definition: BodyPipe.h:166
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
uint64_t unproducedSize() const
Definition: BodyPipe.cc:179
void setBodySize(uint64_t aSize)
Definition: BodyPipe.cc:147
virtual void noteBodyProducerAborted(RefCount< BodyPipe > bp)=0
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
bool hasContent() const
Definition: MemBuf.h:54
void init(mb_size_t szInit, mb_size_t szMax)
Definition: MemBuf.cc:93
int64_t theBodySize
Definition: BodyPipe.h:157
virtual void noteBodyConsumerAborted(RefCount< BodyPipe > bp)=0
void enableAutoConsumption()
start or continue consuming when producing without consumer
Definition: BodyPipe.cc:316
#define PRIu64
Definition: types.h:114
uint64_t theGetSize
Definition: BodyPipe.h:162
void postAppend(size_t size)
Definition: BodyPipe.cc:395
void scheduleBodyDataNotification()
Definition: BodyPipe.cc:414
~BodySink() override
Definition: BodyPipe.cc:22
void postConsume(size_t size)
Definition: BodyPipe.cc:380
BodyPipe & thePipe
Definition: BodyPipe.h:73
BodyPipe(Producer *aProducer)
Definition: BodyPipe.cc:128
void(BodyProducer ::* Method)(BodyPipe::Pointer)
uint64_t thePutSize
Definition: BodyPipe.h:161
bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
Definition: BodyPipe.cc:228
virtual bool canDial(AsyncCall &call)
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
size_t getMoreData(MemBuf &buf)
Definition: BodyPipe.cc:294
void undoCheckOut(Checkout &checkout)
Definition: BodyPipe.cc:364
BodyPipeCheckout & operator=(const BodyPipeCheckout &)
Definition: BodyPipe.cc:516
bool exhausted() const
Definition: BodyPipe.cc:174
void consume(size_t size)
Definition: BodyPipe.cc:309
void startAutoConsumptionIfNeeded()
Definition: BodyPipe.cc:327
const size_t checkedOutSize
Definition: BodyPipe.h:78
~BodyPipe() override
Definition: BodyPipe.cc:139
bool bodySizeKnown() const
Definition: BodyPipe.h:109
bool canDial(AsyncCall &call) override
Definition: BodyPipe.cc:73
virtual void noteBodyProductionEnded(RefCount< BodyPipe > bp)=0
mb_size_t contentSize() const
available data size
Definition: MemBuf.h:47
int isNull() const
Definition: MemBuf.cc:145
uint64_t bodySize() const
Definition: BodyPipe.cc:161
int size
Definition: ModDevPoll.cc:69
void append(const char *c, int sz) override
Definition: MemBuf.cc:209
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
BodyPipeCheckout(BodyPipe &)
Definition: BodyPipe.cc:483
bool doneAll() const override
whether positive goal has been reached
Definition: BodyPipe.cc:34
const int debugLevel
Definition: AsyncCall.h:77
MemBuf & checkOut()
Definition: BodyPipe.cc:344
void stopConsumingFrom(RefCount< BodyPipe > &)
Definition: BodyPipe.cc:118
virtual void noteMoreBodyDataAvailable(RefCount< BodyPipe > bp)=0
Definition: MemBuf.h:23
void clean()
Definition: MemBuf.cc:110
const int debugSection
Definition: AsyncCall.h:76
bool isCheckedOut
Definition: BodyPipe.h:168
bool stillConsuming(const Consumer::Pointer &consumer) const
Definition: BodyPipe.h:132
#define assert(EX)
Definition: assert.h:17
Consumer::Pointer theConsumer
Definition: BodyPipe.h:159
void clear()
make pointer not set; does not invalidate cbdata
Definition: CbcPointer.h:144
BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer, Parent::Method aHandler, BodyPipe::Pointer bp)
Definition: BodyPipe.cc:65
void noteBodyProducerAborted(BodyPipe::Pointer) override
Definition: BodyPipe.cc:31
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:325
virtual void noteMoreBodySpaceAvailable(RefCount< BodyPipe > bp)=0
void clearConsumer()
Definition: BodyPipe.cc:254
UnaryMemFunT< BodyConsumer, BodyPipe::Pointer > Parent
Definition: BodyPipe.cc:63
void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) override
Definition: BodyPipe.cc:24
BodyPipe::Pointer body_pipe
the pipe we are consuming from
Definition: BodyPipe.cc:37
size_t putMoreData(const char *buf, size_t size)
Definition: BodyPipe.cc:213
const char * status() const
Definition: BodyPipe.cc:446
bool productionEnded() const
Definition: BodyPipe.h:113
bool expectMoreAfter(uint64_t offset) const
Definition: BodyPipe.cc:167
static constexpr size_t MaxCapacity
Definition: BodyPipe.h:100
bool canDial(AsyncCall &call) override
Definition: BodyPipe.cc:89
mb_size_t potentialSpaceSize() const
Definition: MemBuf.cc:161
CBDATA_CHILD(BodySink)
void clearProducer(bool atEof)
Definition: BodyPipe.cc:194
char * content()
start of the added data
Definition: MemBuf.h:41
bool mayNeedMoreData() const
Definition: BodyPipe.h:118
mb_size_t spaceSize() const
Definition: MemBuf.cc:155
void expectNoConsumption()
there will be no more setConsumer() calls
Definition: BodyPipe.cc:267
#define Must(condition)
Definition: TextException.h:75
void checkIn(Checkout &checkout)
Definition: BodyPipe.cc:352
void stopProducingFor(RefCount< BodyPipe > &, bool atEof)
Definition: BodyPipe.cc:107
void noteBodyProductionEnded(BodyPipe::Pointer) override
Definition: BodyPipe.cc:28
void reset()
Definition: MemBuf.cc:129
#define PRId64
Definition: types.h:104
void expectProductionEndAfter(uint64_t extraSize)
sets or checks body size
Definition: BodyPipe.cc:184
BodySink(const BodyPipe::Pointer &bp)
Definition: BodyPipe.cc:21
void scheduleBodyEndNotification()
Definition: BodyPipe.cc:426
bool abortedConsumption
called BodyProducer::noteBodyConsumerAborted
Definition: BodyPipe.h:167
void consume(mb_size_t sz)
removes sz bytes and "packs" by moving content left
Definition: MemBuf.cc:168
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
const A & min(A const &lhs, A const &rhs)
uint64_t consumedSize() const
Definition: BodyPipe.h:111
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37

 

Introduction

Documentation

Support

Miscellaneous