Go to the documentation of this file.
25 clientConnection(aConn),
29 mayUseConnection_(false),
30 connRegistered_(false)
42 if (
auto node = getTail()) {
57 connRegistered_ =
true;
64 return http->out.size == 0;
71 debugs(33, 5, clientConnection <<
", sz " <<
size <<
72 ", off " << (http->out.size +
size) <<
", len " <<
75 http->out.size +=
size;
77 switch (socketState()) {
84 debugs(33, 5, clientConnection <<
" Stream complete, keepalive is " <<
85 http->request->flags.proxyKeepalive);
89 if (!http->request->flags.proxyKeepalive)
90 clientConnection->close();
97 initiateClose(
"STREAM_UNPLANNED_COMPLETE");
101 initiateClose(
"STREAM_FAILED");
105 fatal(
"Hit unreachable code in Http::Stream::writeComplete\n");
112 debugs(33, 5, reply <<
" written " << http->out.size <<
" into " << clientConnection);
118 readBuffer.
offset = getNextRangeOffset();
120 readBuffer.
data = reqbuf;
128 return http->multipartRangeRequest();
134 debugs (33, 5,
"range: " << http->request->range <<
135 "; http offset " << http->out.offset <<
136 "; reply " << reply);
142 if (http->request->range) {
145 assert(http->range_iter.valid);
147 assert(canPackMoreRanges());
149 assert(http->range_iter.currentSpec());
151 int64_t start = http->range_iter.currentSpec()->offset +
152 http->range_iter.currentSpec()->length -
153 http->range_iter.debt();
154 debugs(33, 3,
"clientPackMoreRanges: in: offset: " << http->out.offset);
155 debugs(33, 3,
"clientPackMoreRanges: out:"
156 " start: " << start <<
157 " spec[" << http->range_iter.pos - http->request->range->begin() <<
"]:" <<
158 " [" << http->range_iter.currentSpec()->offset <<
159 ", " << http->range_iter.currentSpec()->offset +
160 http->range_iter.currentSpec()->length <<
"),"
161 " len: " << http->range_iter.currentSpec()->length <<
162 " debt: " << http->range_iter.debt());
163 if (http->range_iter.currentSpec()->length != -1)
164 assert(http->out.offset <= start);
169 }
else if (
const auto cr = reply ? reply->contentRange() :
nullptr) {
175 return http->out.offset + cr->spec.offset;
178 return http->out.offset;
192 if (!http->range_iter.debt()) {
193 debugs(33, 5,
"At end of current range spec for " << clientConnection);
195 if (http->range_iter.pos != http->range_iter.end)
196 ++http->range_iter.pos;
198 http->range_iter.updateSpec();
201 assert(!http->range_iter.debt() == !http->range_iter.currentSpec());
205 debugs(33, 5,
"returning " << (http->range_iter.currentSpec() ?
true :
false));
206 return http->range_iter.currentSpec() ? true :
false;
217 if (http->request->range) {
219 assert(http->range_iter.valid);
222 if (!canPackMoreRanges()) {
223 debugs(33, 5,
"Range request at end of returnable " <<
224 "range sequence on " << clientConnection);
228 }
else if (reply && reply->contentRange()) {
230 const int64_t &bytesSent = http->out.offset;
231 const int64_t &bytesExpected = reply->contentRange()->spec.length;
233 debugs(33, 7,
"body bytes sent vs. expected: " <<
234 bytesSent <<
" ? " << bytesExpected <<
" (+" <<
235 reply->contentRange()->spec.offset <<
")");
243 if (bytesSent == bytesExpected)
246 if (bytesSent > bytesExpected)
262 fatal (
"unreachable code\n");
274 debugs(11, 2,
"HTTP Client " << clientConnection);
275 debugs(11, 2,
"HTTP Client REPLY:\n---------\n" << mb->
buf <<
"\n----------");
281 if (multipartRangeRequest())
282 packRange(bodyData, mb);
283 else if (http->request->flags.chunkedReply) {
284 packChunk(bodyData, *mb);
286 size_t length = lengthToSend(bodyData.
range());
287 noteSentBodyBytes(length);
298 if (answer.allowed()) {
299 writeQuotaHandler = pool->createBucket();
300 fd_table[clientConnection->fd].writeQuotaHandler = writeQuotaHandler;
303 debugs(83, 4,
"Response delay pool " << pool->poolName <<
304 " skipped because ACL " << answer);
310 getConn()->write(mb);
317 if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
318 size_t length = lengthToSend(bodyData.
range());
319 noteSentBodyBytes(length);
320 getConn()->write(bodyData.
data, length);
326 if (multipartRangeRequest())
327 packRange(bodyData, &mb);
329 packChunk(bodyData, mb);
332 getConn()->write(&mb);
341 size_t maximum = available.
size();
343 if (!http->request->range)
346 assert(canPackMoreRanges());
348 if (http->range_iter.debt() == -1)
351 assert(http->range_iter.debt() > 0);
354 if (available.
start < http->range_iter.currentSpec()->offset)
357 return min(http->range_iter.debt(),
static_cast<int64_t
>(maximum));
363 debugs(33, 7, bytes <<
" body bytes");
364 http->out.offset += bytes;
366 if (!http->request->range)
369 if (http->range_iter.debt() != -1) {
370 http->range_iter.debt(http->range_iter.debt() - bytes);
371 assert (http->range_iter.debt() >= 0);
375 assert(http->range_iter.debt() >= -1);
392 (rep_tag.
str ? rep_tag.
str :
"<none>"));
420 const char *range_err =
nullptr;
425 auto contentRange = rep ? rep->
contentRange() :
nullptr;
428 range_err =
"no [parse-able] reply";
430 range_err =
"wrong status code";
432 range_err =
"too complex response";
434 range_err =
"wrong status code";
437 range_err =
"meaningless response";
440 range_err =
"unknown length";
441 else if (rep->
content_length != http->storeEntry()->mem().baseReply().content_length)
442 range_err =
"INCONSISTENT length";
447 else if (http->loggingTags().isTcpHit() &&
450 range_err =
"If-Range match failed";
452 else if (!http->request->range->canonize(rep))
453 range_err =
"canonization failed";
454 else if (http->request->range->isComplex())
455 range_err =
"too complex range header";
456 else if (!http->loggingTags().isTcpHit() && http->request->range->offsetLimitExceeded(roffLimit))
457 range_err =
"range outside range_offset_limit";
466 http->request->ignoreRange(range_err);
472 const auto actual_clen = http->prepPartialResponseGeneration();
474 const int spec_count = http->request->range->specs.size();
476 debugs(33, 3,
"range spec count: " << spec_count <<
480 if (spec_count == 1) {
481 const auto singleSpec = *http->request->range->begin();
497 debugs(33, 3,
"actual content length: " << actual_clen);
504 if (http->client_stream.tail)
513 return static_cast<clientStreamNode *
>(http->client_stream.tail->prev->data);
519 assert(http && http->getConn());
520 return http->getConn();
528 http->updateError(
error);
529 http->al->cache.code.err.update(lte);
543 connRegistered_ =
false;
551 debugs(33, 4, clientConnection <<
" because " << reason);
552 getConn()->stopSending(reason);
558 debugs(33, 2,
"Deferring request " << http->uri);
559 assert(flags.deferred == 0);
561 deferredparams.node =
node;
562 deferredparams.rep = rep;
563 deferredparams.queuedBuffer = receivedData;
570 if (http->request->range)
571 buildRangeHeader(rep);
581 const uint64_t length =
582 static_cast<uint64_t
>(lengthToSend(bodyData.
range()));
583 noteSentBodyBytes(length);
599 char const *buf = source.
data;
602 const size_t copy_sz = lengthToSend(available);
614 &http->storeEntry()->mem().freshestReply(),
621 debugs(33, 3,
"appending " << copy_sz <<
" bytes");
622 noteSentBodyBytes(copy_sz);
626 available.
start += copy_sz;
630 if (!canPackMoreRanges()) {
631 debugs(33, 3,
"Returning because !canPackMoreRanges.");
638 int64_t nextOffset = getNextRangeOffset();
639 assert(nextOffset >= http->out.offset);
640 int64_t skip = nextOffset - http->out.offset;
642 http->out.offset = nextOffset;
644 if (available.
size() <= (uint64_t)skip)
647 available.
start += skip;
658 clientConnection->close();
void fatal(const char *message)
ConnStateData * getConn() const
clientStream_status_t clientStreamStatus(clientStreamNode *thisObject, ClientHttpRequest *http)
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Range< int64_t > range() const
void sendStartOfMessage(HttpReply *, StoreIOBuffer bodyData)
send an HTTP reply message headers and maybe some initial payload
void registerWithConn()
register this stream with the Server
void buildRangeHeader(HttpReply *)
add Range headers (if any) to the given HTTP reply message
void clientStreamRead(clientStreamNode *thisObject, ClientHttpRequest *http, StoreIOBuffer readBuffer)
void finished()
cleanup when the transaction has finished. may destroy 'this'
int64_t getRangeOffsetLimit()
char reqbuf[HTTP_REQBUF_SZ]
void error(char *format,...)
#define SQUIDSTRINGPRINT(s)
void httpRequestFree(void *data)
void init(mb_size_t szInit, mb_size_t szMax)
clientStream_status_t socketState()
Adapt stream status to account for Range cases.
const char * str
quoted-string
bool etagIsStrongEqual(const ETag &tag1, const ETag &tag2)
whether etags are strong-equal
int64_t objectLen() const
int64_t getNextRangeOffset() const
bool multipartRangeRequest() const
Pipeline pipeline
set of requests waiting to be serviced
void writeComplete(size_t size)
update stream state after a write, may initiate more I/O
void popMe(const Http::StreamPointer &)
deregister the front request from the pipeline
void updateReply(const HttpReply::Pointer &)
void clientStreamDetach(clientStreamNode *thisObject, ClientHttpRequest *http)
static void Reset()
forgets the current context, setting it to nil/unknown
void noteIoError(const Error &, const LogTagsErrors &)
update state to reflect I/O error
size_t lengthToSend(Range< int64_t > const &available) const
mb_size_t contentSize() const
available data size
bool canPackMoreRanges() const
void append(const char *c, int sz) override
Http::StatusCode status() const
retrieve the status code for this status line
void prepareReply(HttpReply *)
const Acl::Answer & fastCheck()
@ STREAM_UNPLANNED_COMPLETE
clientStreamNode * getClientReplyContext() const
void set(const AnyP::ProtocolVersion &newVersion, Http::StatusCode newStatus, const char *newReason=nullptr)
Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq)
construct with HTTP/1.x details
int weak
true if it is a weak validator
const HttpHdrRangeSpec * currentSpec() const
void packChunk(const StoreIOBuffer &bodyData, MemBuf &)
static bool clientIfRangeMatch(ClientHttpRequest *http, HttpReply *rep)
static const int64_t UnknownPosition
void clientAclChecklistFill(ACLFilledChecklist &checklist, ClientHttpRequest *http)
clientStreamNode * getTail() const
AnyP::ProtocolVersion version
breakdown of protocol version label: (HTTP/ICY) and (0.9/1.0/1.1)
void httpHeaderAddContRange(HttpHeader *, HttpHdrRangeSpec, int64_t)
const HttpHdrContRange * contentRange() const
static MessageDelayPools * Instance()
void clientPackRangeHdr(const HttpReplyPointer &rep, const HttpHdrRangeSpec *spec, String boundary, MemBuf *mb)
append a "part" HTTP header (as in a multi-part/range reply) to the buffer
bool startOfOutput() const
whether the reply has started being sent
void pullData()
get more data to send
struct Http::Stream::@65 flags
void deferRecipientForLater(clientStreamNode *, HttpReply *, StoreIOBuffer receivedData)
void packRange(StoreIOBuffer const &, MemBuf *)
void kick()
try to make progress on a transaction or read more I/O
StoreEntry * storeEntry() const
void sendBody(StoreIOBuffer bodyData)
send some HTTP reply message payload
bool modifiedSince(const time_t ims, const int imslen=-1) const
DeferredParams deferredparams
void noteSentBodyBytes(size_t)
#define debugs(SECTION, LEVEL, CONTENT)
const A & min(A const &lhs, A const &rhs)
void clientPackTermBound(String boundary, MemBuf *mb)
put terminating boundary for multiparts to the buffer
void initiateClose(const char *reason)
terminate due to a send/write error (may continue reading)
HttpRequest *const request