51 memset(
this, 0,
sizeof(*
this));
56 AsyncJob(
"Adaptation::Icap::ModXact"),
95 adaptHistoryId = ah->
recordXactStart(service().cfg().key, icap_tr_start, attempts > 1);
99 canStartBypass = service().cfg().bypass;
103 if (service().up() && service().availableForNew())
112 Must(!state.serviceWaiting);
114 if (!service().up()) {
118 service().callWhenReady(call);
119 comment =
"to be up";
123 if (service().cfg().onOverload !=
srvWait) {
126 if (service().cfg().onOverload ==
srvBlock)
127 disableBypass(
"not available",
true);
129 canStartBypass =
true;
132 disableRepeats(
"ICAP service is not available");
134 debugs(93, 7,
"will not wait for the service to be available" <<
137 throw TexcHere(
"ICAP service is not available");
142 service().callWhenAvailable(call, state.waitedForService);
143 comment =
"to be available";
146 debugs(93, 7,
"will wait for the service " << comment << status());
147 state.serviceWaiting =
true;
148 state.waitedForService =
true;
153 Must(state.serviceWaiting);
154 state.serviceWaiting =
false;
156 if (!service().up()) {
158 disableRepeats(
"ICAP service is unusable");
159 throw TexcHere(
"ICAP service is unusable");
162 if (service().availableForOld())
170 Must(state.serviceWaiting);
171 state.serviceWaiting =
false;
173 if (service().up() && service().availableForOld())
181 state.writing = State::writingConnect;
191 Must(state.writing == State::writingConnect);
198 makeRequestHeaders(requestBuf);
199 debugs(93, 9,
"will write" << status() <<
":\n" <<
203 state.writing = State::writingHeaders;
205 scheduleWrite(requestBuf);
210 debugs(93, 5,
"Wrote " << sz <<
" bytes");
212 if (state.writing == State::writingHeaders)
213 handleCommWroteHeaders();
215 handleCommWroteBody();
220 Must(state.writing == State::writingHeaders);
223 if (preview.enabled()) {
225 decideWritingAfterPreview(
"zero-size");
227 state.writing = State::writingPreview;
228 }
else if (virginBody.expected()) {
229 state.writing = State::writingPrime;
240 debugs(93, 5,
"checking whether to write more" << status());
242 if (writer !=
nullptr)
245 switch (state.writing) {
247 case State::writingInit:
248 Must(state.serviceWaiting);
251 case State::writingConnect:
252 case State::writingHeaders:
253 case State::writingPaused:
254 case State::writingReallyDone:
257 case State::writingAlmostDone:
261 case State::writingPreview:
265 case State::writingPrime:
270 throw TexcHere(
"Adaptation::Icap::ModXact in bad writing state");
276 debugs(93, 8,
"will write Preview body from " <<
277 virgin.body_pipe << status());
278 Must(state.writing == State::writingPreview);
279 Must(virgin.body_pipe !=
nullptr);
281 const size_t sizeMax = (
size_t)virgin.body_pipe->buf().contentSize();
282 const size_t size =
min(preview.debt(), sizeMax);
283 writeSomeBody(
"preview body",
size);
288 decideWritingAfterPreview(
"body");
296 else if (state.parsing == State::psIcapHeader)
297 state.writing = State::writingPaused;
301 debugs(93, 6,
"decided on writing after " << kind <<
" preview" <<
307 Must(state.writing == State::writingPrime);
308 Must(virginBodyWriting.active());
310 const size_t size = (
size_t)virgin.body_pipe->buf().contentSize();
311 writeSomeBody(
"prime virgin body",
size);
313 if (virginBodyEndReached(virginBodyWriting)) {
314 debugs(93, 5,
"wrote entire body");
321 Must(!writer && state.writing < state.writingAlmostDone);
322 Must(virgin.body_pipe !=
nullptr);
323 debugs(93, 8,
"will write up to " <<
size <<
" bytes of " <<
330 const size_t writableSize = virginContentSize(virginBodyWriting);
331 const size_t chunkSize =
min(writableSize,
size);
334 debugs(93, 7,
"will write " << chunkSize <<
335 "-byte chunk of " << label);
337 openChunk(writeBuf, chunkSize,
false);
338 writeBuf.
append(virginContentData(virginBodyWriting), chunkSize);
339 closeChunk(writeBuf);
341 virginBodyWriting.progress(chunkSize);
344 debugs(93, 7,
"has no writable " << label <<
" content");
347 const bool wroteEof = virginBodyEndReached(virginBodyWriting);
348 bool lastChunk = wroteEof;
349 if (state.writing == State::writingPreview) {
350 preview.wrote(chunkSize, wroteEof);
351 lastChunk = lastChunk || preview.done();
355 debugs(93, 8,
"will write last-chunk of " << label);
356 addLastRequestChunk(writeBuf);
360 <<
" raw bytes of " << label);
363 scheduleWrite(writeBuf);
371 const bool ieof = state.writing == State::writingPreview && preview.ieof();
372 openChunk(buf, 0, ieof);
378 buf.
appendf((ieof ?
"%x; ieof\r\n" :
"%x\r\n"), (
int) chunkSize);
389 virgin.cause :
dynamic_cast<const HttpRequest*
>(virgin.header);
399 !virgin.body_pipe->expectMoreAfter(act.
offset());
408 const uint64_t dataStart = act.
offset();
410 const uint64_t dataEnd = virginConsumed + virgin.body_pipe->buf().contentSize();
411 Must(virginConsumed <= dataStart && dataStart <= dataEnd);
412 return static_cast<size_t>(dataEnd - dataStart);
419 const uint64_t dataStart = act.
offset();
420 Must(virginConsumed <= dataStart);
421 return virgin.body_pipe->buf().content() +
static_cast<size_t>(dataStart-virginConsumed);
426 debugs(93, 9,
"consumption guards: " << !virgin.body_pipe << isRetriable <<
427 isRepeatable << canStartBypass << protectGroupBypass);
429 if (!virgin.body_pipe)
436 const bool wantToPostpone = isRepeatable || canStartBypass || protectGroupBypass;
446 debugs(93, 8,
"postponing consumption from " << bp.
status());
451 const uint64_t end = virginConsumed + have;
452 uint64_t offset = end;
454 debugs(93, 9,
"max virgin consumption offset=" << offset <<
455 " acts " << virginBodyWriting.active() << virginBodySending.active() <<
456 " consumed=" << virginConsumed <<
457 " from " << virgin.body_pipe->status());
459 if (virginBodyWriting.active())
460 offset =
min(virginBodyWriting.offset(), offset);
462 if (virginBodySending.active())
463 offset =
min(virginBodySending.offset(), offset);
465 Must(virginConsumed <= offset && offset <= end);
467 if (
const size_t size =
static_cast<size_t>(offset - virginConsumed)) {
468 debugs(93, 8,
"consuming " <<
size <<
" out of " << have <<
469 " virgin body bytes");
471 virginConsumed +=
size;
473 disableRepeats(
"consumed content");
474 disableBypass(
"consumed content",
true);
488 if (state.writing == State::writingReallyDone)
491 if (writer !=
nullptr) {
493 debugs(93, 7,
"will wait for the last write" << status());
494 state.writing = State::writingAlmostDone;
498 debugs(93, 3,
"will NOT wait for the last write" << status());
505 reuseConnection =
false;
506 ignoreLastWrite =
true;
509 debugs(93, 7,
"will no longer write" << status());
510 if (virginBodyWriting.active()) {
511 virginBodyWriting.disable();
514 state.writing = State::writingReallyDone;
520 if (!virginBodySending.active())
523 debugs(93, 7,
"will no longer backup" << status());
524 virginBodySending.disable();
532 doneReading() && state.doneWriting();
537 Must(haveConnection());
539 Must(!adapted.header);
540 Must(!adapted.body_pipe);
548 if (reader !=
nullptr || doneReading()) {
549 debugs(93,3,
"returning from readMore because reader or doneReading()");
554 if (adapted.body_pipe !=
nullptr &&
555 !adapted.body_pipe->buf().hasPotentialSpace()) {
556 debugs(93,3,
"not reading because ICAP reply pipe is full");
560 if (readBuf.length() < SQUID_TCP_SO_RCVBUF)
563 debugs(93,3,
"cannot read with a full buffer");
569 Must(!state.doneParsing());
577 Must(state.sending == State::sendingVirgin);
578 Must(adapted.body_pipe !=
nullptr);
579 Must(virginBodySending.active());
581 const size_t sizeMax = virginContentSize(virginBodySending);
582 debugs(93,5,
"will echo up to " << sizeMax <<
" bytes from " <<
583 virgin.body_pipe->status());
584 debugs(93,5,
"will echo up to " << sizeMax <<
" bytes to " <<
585 adapted.body_pipe->status());
588 const size_t size = adapted.body_pipe->putMoreData(virginContentData(virginBodySending), sizeMax);
589 debugs(93,5,
"echoed " <<
size <<
" out of " << sizeMax <<
591 virginBodySending.progress(
size);
592 disableRepeats(
"echoed content");
593 disableBypass(
"echoed content",
true);
597 if (virginBodyEndReached(virginBodySending)) {
598 debugs(93, 5,
"echoed all" << status());
602 virgin.body_pipe->buf().contentSize() <<
" bytes " <<
603 "and expects more to echo" << status());
610 return state.sending == State::sendingDone;
616 debugs(93, 7,
"Enter stop sending ");
619 debugs(93, 7,
"Proceed with stop sending ");
621 if (state.sending != State::sendingUndecided) {
622 debugs(93, 7,
"will no longer send" << status());
623 if (adapted.body_pipe !=
nullptr) {
624 virginBodySending.disable();
627 const bool leftDebts = adapted.body_pipe->needsMoreData();
628 stopProducingFor(adapted.body_pipe, nicely && !leftDebts);
631 debugs(93, 7,
"will not start sending" << status());
632 Must(!adapted.body_pipe);
635 state.sending = State::sendingDone;
643 if (!virgin.body_pipe || !state.doneConsumingVirgin())
646 debugs(93, 7,
"will stop consuming" << status());
647 stopConsumingFrom(virgin.body_pipe);
652 debugs(93, 5,
"have " << readBuf.length() <<
" bytes to parse" << status());
653 debugs(93, 5,
"\n" << readBuf);
655 if (state.parsingHeaders())
658 if (state.parsing == State::psBody)
661 if (state.parsing == State::psIcapTrailer)
667 if (!canStartBypass || isRetriable) {
679 debugs(93, 3,
"bypassing " << inCall <<
" exception: " <<
680 e.what() <<
' ' << status());
685 }
catch (
const std::exception &bypassE) {
693 disableBypass(
"already started to bypass",
false);
707 if (haveConnection()) {
708 reuseConnection =
false;
711 debugs(93, 7,
"Warning: bypass failed to stop I/O" << status());
714 service().noteFailure();
719 if (canStartBypass) {
720 debugs(93,7,
"will never start bypass because " << reason);
721 canStartBypass =
false;
723 if (protectGroupBypass && includingGroupBypass) {
724 debugs(93,7,
"not protecting group bypass because " << reason);
725 protectGroupBypass =
false;
735 if (gotEncapsulated(
"res-hdr")) {
739 }
else if (gotEncapsulated(
"req-hdr")) {
740 adapted.setHeader(
new HttpRequest(virginRequest().masterXaction));
743 throw TexcHere(
"Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
748 Must(state.parsingHeaders());
750 if (state.parsing == State::psIcapHeader) {
751 debugs(93, 5,
"parse ICAP headers");
755 if (state.parsing == State::psHttpHeader) {
756 debugs(93, 5,
"parse HTTP headers");
760 if (state.parsingHeaders()) {
771 disableRepeats(
"sent headers");
772 disableBypass(
"sent headers",
true);
775 if (state.sending == State::sendingVirgin)
791 Must(state.sending == State::sendingUndecided);
793 if (!parseHead(icapReply.getRaw()))
796 if (expectIcapTrailers()) {
797 Must(!trailerParser);
801 static SBuf close(
"close", 5);
803 debugs(93, 5,
"found connection close");
804 reuseConnection =
false;
807 switch (icapReply->sline.status()) {
816 if (!validate200Ok()) {
817 throw TexcHere(
"Invalid ICAP Response");
825 handle204NoContent();
829 handle206PartialContent();
833 debugs(93, 5,
"ICAP status " << icapReply->sline.status());
834 handleUnknownScode();
840 request = &virginRequest();
846 const String val = icapReply->header.getByName(xxName);
853 if (service().cfg().routing) {
872 if (state.writing == State::writingPaused)
880 if (parsePart(trailerParser,
"trailer")) {
881 for (
const auto &e: trailerParser->trailer.entries)
882 debugs(93, 5,
"ICAP trailer: " << e->name <<
": " << e->value);
890 return gotEncapsulated(
"res-hdr");
898 Must(state.writing == State::writingPaused);
900 Must(preview.enabled() && preview.done() && !preview.ieof());
904 if (!state.allowedPostview204 && !state.allowedPostview206)
907 state.parsing = State::psIcapHeader;
910 state.writing = State::writingPrime;
917 state.parsing = State::psHttpHeader;
918 state.sending = State::sendingAdapted;
931 if (state.writing == State::writingPaused) {
932 Must(preview.enabled());
933 Must(state.allowedPreview206);
934 debugs(93, 7,
"206 inside preview");
936 Must(state.writing > State::writingPaused);
937 Must(state.allowedPostview206);
938 debugs(93, 7,
"206 outside preview");
940 state.parsing = State::psHttpHeader;
941 state.sending = State::sendingAdapted;
942 state.readyForUob =
true;
951 disableRepeats(
"preparing to echo content");
952 disableBypass(
"preparing to echo content",
true);
962 debugs(93, 7,
"cloning virgin message " << oldHead);
968 packHead(httpBuf, oldHead);
971 Must(!adapted.header);
976 }
else if (
dynamic_cast<const HttpReply*
>(oldHead)) {
983 adapted.setHeader(newHead.
getRaw());
995 debugs(93, 7,
"cloned virgin message " << oldHead <<
" to " <<
1000 debugs(93, 7,
"will echo virgin body from " <<
1002 if (!virginBodySending.active())
1003 virginBodySending.plan();
1004 state.sending = State::sendingVirgin;
1009 makeAdaptedBodyPipe(
"echoed virgin response");
1012 debugs(93, 7,
"will echo virgin body to " <<
1015 debugs(93, 7,
"no virgin body to echo");
1024 Must(virginBodySending.active());
1025 Must(virgin.header->body_pipe !=
nullptr);
1029 debugs(93, 7,
"will echo virgin body suffix from " <<
1030 virgin.header->body_pipe <<
" offset " << pos );
1033 const uint64_t virginDataEnd = virginConsumed +
1034 virgin.body_pipe->buf().contentSize();
1035 Must(pos <= virginDataEnd);
1036 virginBodySending.progress(
static_cast<size_t>(pos));
1038 state.sending = State::sendingVirgin;
1041 if (virgin.header->body_pipe->bodySizeKnown())
1042 adapted.body_pipe->expectProductionEndAfter(virgin.header->body_pipe->bodySize() - pos);
1044 debugs(93, 7,
"will echo virgin body suffix to " <<
1058 throw TexcHere(
"Unsupported ICAP status code");
1063 if (expectHttpHeader()) {
1064 replyHttpHeaderSize = 0;
1065 maybeAllocateHttpMsg();
1067 if (!parseHead(adapted.header))
1071 replyHttpHeaderSize = adapted.header->hdr_sz;
1084 adapted.header->inheritProperties(virgin.header);
1087 decideOnParsingBody();
1094 debugs(93, 5,
"have " << readBuf.length() <<
' ' << description <<
" bytes to parse; state: " << state.parsing);
1098 const char *tmpBuf = readBuf.c_str();
1099 const bool parsed = part->parse(tmpBuf, readBuf.length(), commEof, &
error);
1100 debugs(93, (!parsed &&
error) ? 2 : 5, description <<
" parsing result: " << parsed <<
" detail: " <<
error);
1103 readBuf.consume(part->hdr_sz);
1111 if (!parsePart(
head,
"head")) {
1120 return gotEncapsulated(
"res-hdr") || gotEncapsulated(
"req-hdr");
1125 return gotEncapsulated(
"res-body") || gotEncapsulated(
"req-body");
1131 const bool promisesToSendTrailer = icapReply->header.getByIdIfPresent(
Http::HdrType::TRAILER, &trailers);
1132 const bool supportsTrailers = icapReply->header.hasListMember(
Http::HdrType::ALLOW,
"trailers",
',');
1135 Must((promisesToSendTrailer == supportsTrailers) || (!promisesToSendTrailer && supportsTrailers));
1136 if (promisesToSendTrailer && !trailers.
size())
1137 debugs(93,
DBG_IMPORTANT,
"ERROR: ICAP Trailer response header field must not be empty (salvaged)");
1138 return promisesToSendTrailer;
1143 if (expectHttpBody()) {
1144 debugs(93, 5,
"expecting a body");
1145 state.parsing = State::psBody;
1146 replyHttpBodySize = 0;
1149 makeAdaptedBodyPipe(
"adapted response from the ICAP server");
1150 Must(state.sending == State::sendingAdapted);
1152 debugs(93, 5,
"not expecting a body");
1154 state.parsing = State::psIcapTrailer;
1163 Must(state.parsing == State::psBody);
1166 debugs(93, 5,
"have " << readBuf.length() <<
" body bytes to parse");
1170 bodyParser->setPayloadBuffer(&bpc.
buf);
1171 const bool parsed = bodyParser->parse(readBuf);
1172 readBuf = bodyParser->remaining();
1175 debugs(93, 5,
"have " << readBuf.length() <<
" body bytes after parsed all: " << parsed);
1176 replyHttpBodySize += adapted.body_pipe->buf().contentSize();
1180 if (adapted.body_pipe->buf().contentSize() > 0) {
1181 disableRepeats(
"sent adapted content");
1182 disableBypass(
"sent adapted content",
true);
1186 if (state.readyForUob && extensionParser.sawUseOriginalBody())
1187 prepPartialBodyEchoing(extensionParser.useOriginalBody());
1191 state.parsing = State::psIcapTrailer;
1197 debugs(93,3,
this <<
" needsMoreData = " << bodyParser->needsMoreData());
1199 if (bodyParser->needsMoreData()) {
1201 Must(mayReadMore());
1205 if (bodyParser->needsMoreSpace()) {
1206 Must(!doneSending());
1207 Must(adapted.body_pipe->buf().contentSize() > 0);
1215 if (state.parsing == State::psDone)
1218 if (checkUnparsedData)
1219 Must(readBuf.isEmpty());
1221 debugs(93, 7,
"will no longer parse" << status());
1224 bodyParser =
nullptr;
1226 delete trailerParser;
1227 trailerParser =
nullptr;
1229 state.parsing = State::psDone;
1237 if (state.sending == State::sendingVirgin)
1244 Must(virgin.body_pipe->productionEnded());
1249 if (state.sending == State::sendingVirgin)
1257 Must(virgin.body_pipe->productionEnded());
1262 if (state.sending == State::sendingVirgin)
1270 if (state.sending == State::sendingVirgin)
1272 else if (state.sending == State::sendingAdapted)
1275 Must(state.sending == State::sendingUndecided);
1283 mustStop(
"adapted body consumer aborted");
1289 delete trailerParser;
1295 debugs(93, 5,
"swan sings" << status());
1300 if (theInitiator.set()) {
1307 if (ah !=
nullptr && adaptHistoryId >= 0)
1320 if (!(adapted_request_ =
dynamic_cast<HttpRequest*
>(adapted.header))) {
1323 adapted_request_ = virgin_request_;
1324 adapted_reply_ =
dynamic_cast<HttpReply*
>(adapted.header);
1336 al.request = virgin_request_;
1338 al.adapted_request = adapted_request_;
1342 al.reply = adapted_reply_;
1355 virgin_msg = virgin_request_;
1356 assert(virgin_msg != virgin.cause);
1357 al.http.clientRequestSz.header = virgin_msg->
hdr_sz;
1362 if (replyHttpHeaderSize >= 0 || replyHttpBodySize >= 0) {
1363 const int64_t
zero = 0;
1364 const uint64_t headerSize =
max(
zero, replyHttpHeaderSize);
1365 const uint64_t bodySize =
max(
zero, replyHttpBodySize);
1366 al.icap.bodyBytesRead = headerSize + bodySize;
1367 al.http.clientReplySz.header = headerSize;
1368 al.http.clientReplySz.payloadData = bodySize;
1371 if (adapted_reply_) {
1374 if (replyHttpBodySize >= 0)
1375 al.cache.highOffset = replyHttpBodySize;
1394 buf.
appendf(
"Connection: close\r\n");
1413 resultLen +=
base64_encode_update(&ctx, base64buf+resultLen, 1,
reinterpret_cast<const uint8_t*
>(
":"));
1416 buf.
appendf(
"Proxy-Authorization: Basic %.*s\r\n", (
int)resultLen, base64buf);
1422 if (ah !=
nullptr) {
1430 buf.
append(
"Encapsulated: ", 14);
1443 encapsulateHead(buf,
"req-hdr", httpBuf, request);
1445 encapsulateHead(buf,
"req-hdr", httpBuf, virgin.header);
1450 encapsulateHead(buf,
"res-hdr", httpBuf, prime);
1452 if (!virginBody.expected())
1461 if (preview.enabled()) {
1462 buf.
appendf(
"Preview: %d\r\n", (
int)preview.ad());
1463 if (!virginBody.expected())
1464 finishNullOrEmptyBodyPreview(httpBuf);
1467 makeAllowHeader(buf);
1471#if FOLLOW_X_FORWARDED_FOR
1482 makeUsernameHeader(request, buf);
1487 virgin.cause :
dynamic_cast<HttpRequest*
>(virgin.header);
1493 if (h->match(r, reply, alMaster, matched)) {
1494 buf.
append(h->key().rawContent(), h->key().length());
1499 if (ah !=
nullptr) {
1524 const bool allow204in = preview.enabled();
1525 const bool allow204out = state.allowedPostview204 = shouldAllow204();
1526 const bool allow206in = state.allowedPreview206 = shouldAllow206in();
1527 const bool allow206out = state.allowedPostview206 = shouldAllow206out();
1528 const bool allowTrailers =
true;
1530 debugs(93, 9,
"Allows: " << allow204in << allow204out <<
1531 allow206in << allow206out << allowTrailers);
1533 const bool allow204 = allow204in || allow204out;
1534 const bool allow206 = allow206in || allow206out;
1536 if ((allow204 || allow206) && virginBody.expected())
1537 virginBodySending.plan();
1543 if (allow204 || allow206 || allowTrailers) {
1561 const char *value =
nullptr;
1571 size_t resultLen =
base64_encode_update(&ctx, base64buf, strlen(value),
reinterpret_cast<const uint8_t*
>(value));
1595 new_request->
method = old_request->method;
1596 new_request->
url = old_request->url;
1597 new_request->
http_ver = old_request->http_ver;
1598 headClone = new_request.
getRaw();
1601 new_reply->
sline = old_reply->sline;
1602 headClone = new_reply.
getRaw();
1625 packHead(httpBuf, headClone.
getRaw());
1633 head->packInto(&httpBuf,
true);
1640 debugs(93, 5,
"preview disabled by squid.conf");
1644 const SBuf urlPath(virginRequest().url.path());
1646 if (!service().wantsPreview(urlPath, wantedSize)) {
1647 debugs(93, 5,
"should not offer preview for " << urlPath);
1656 if (!virginBody.expected())
1658 else if (virginBody.knownSize())
1659 ad =
min(
static_cast<uint64_t
>(ad), virginBody.size());
1661 debugs(93, 5,
"should offer " << ad <<
"-byte preview " <<
1662 "(service wanted " << wantedSize <<
")");
1665 Must(preview.enabled());
1671 if (!service().allows204())
1674 return canBackupEverything();
1681 virginBody.expected();
1687 return shouldAllow206any() && preview.enabled();
1693 return shouldAllow206any() && canBackupEverything();
1699 if (!virginBody.expected())
1704 if (!virginBody.knownSize())
1719 if (preview.enabled())
1722 if (canBackupEverything())
1736 Must(!virginBodyWriting.active());
1737 Must(!virgin.body_pipe);
1738 Must(!preview.ad());
1742 preview.wrote(0,
true);
1744 Must(preview.done());
1745 Must(preview.ieof());
1752 if (state.serviceWaiting)
1755 if (virgin.body_pipe !=
nullptr)
1758 if (haveConnection() && !doneReading())
1761 if (!state.doneWriting() && state.writing != State::writingInit)
1762 buf.
appendf(
"w(%d)", state.writing);
1764 if (preview.enabled()) {
1765 if (!preview.done())
1766 buf.
appendf(
"P(%d)", (
int) preview.debt());
1769 if (virginBodySending.active())
1772 if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1773 buf.
appendf(
"p(%d)", state.parsing);
1775 if (!doneSending() && state.sending != State::sendingUndecided)
1776 buf.
appendf(
"S(%d)", state.sending);
1778 if (state.readyForUob)
1784 if (protectGroupBypass)
1792 if (!virgin.body_pipe)
1795 if (state.doneWriting())
1798 if (preview.enabled()) {
1800 buf.
appendf(
"P%s", preview.ieof() ?
"(ieof)" :
"");
1806 if (state.doneParsing())
1815 return !icapReply->header.getByNameListMember(
"Encapsulated",
1816 section,
',').isEmpty();
1832 method = virgin.cause->method;
1834 method = req->method;
1842 debugs(93, 6,
"expects virgin body from " <<
1843 virgin.body_pipe <<
"; size: " <<
size);
1845 virginBody.expect(
size);
1846 virginBodyWriting.plan();
1851 Must(virgin.body_pipe->setConsumerIfNotLate(
this));
1856 debugs(93, 6,
"does not expect virgin body");
1864 Must(!adapted.body_pipe);
1865 Must(!adapted.header->body_pipe);
1866 adapted.header->body_pipe =
new BodyPipe(
this);
1867 adapted.body_pipe = adapted.header->body_pipe;
1868 debugs(93, 7,
"will supply " << what <<
" via " <<
1869 adapted.body_pipe <<
" pipe");
1875 : theData(dtUnexpected)
1880 theData = (aSize >= 0) ? aSize : (int64_t)dtUnknown;
1885 return theData != dtUnexpected;
1891 return theData != dtUnknown;
1897 return static_cast<uint64_t
>(theData);
1907 theState = stActive;
1912 theState = stDisabled;
1918#if SIZEOF_SIZE_T > 4
1920 Must(
static_cast<int64_t
>(
size) >= 0);
1922 theStart +=
static_cast<int64_t
>(
size);
1928 return static_cast<uint64_t
>(theStart);
1939 theState = stWriting;
1944 return theState != stDisabled;
1956 return theState >= stIeof;
1962 return theState == stIeof;
1968 return done() ? 0 : (theAd - theWritten);
1977 Must(theWritten <= theAd);
1981 else if (theWritten >= theAd)
1987 if (virgin.header ==
nullptr)
1990 virgin.header->firstLineBuf(mb);
2001 request =
const_cast<HttpRequest*
>(&virginRequest());
2012 request =
const_cast<HttpRequest*
>(&virginRequest());
2020 Must(adapted.header);
2027 AsyncJob(
"Adaptation::Icap::ModXactLauncher"),
2046 debugs(93, 5,
"swan sings");
2047 updateHistory(
false);
2054 virgin.cause :
dynamic_cast<HttpRequest*
>(virgin.header);
2061 h->
start(
"ICAPModXactLauncher");
2063 h->
stop(
"ICAPModXactLauncher");
2073 const int parsed = trailer.parse(buf, len, atEnd, hdr_sz, clen);
2082 if (extName == UseOriginalBodyName) {
2083 useOriginalBody_ =
tok.udec64(
"use-original-body");
2084 assert(useOriginalBody_ >= 0);
2086 Ignore(
tok, extName);
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
ErrorDetail::Pointer MakeNamedErrorDetail(const char *name)
#define Here()
source code location of the caller
void prepareLogWithRequestDetails(HttpRequest *, const AccessLogEntryPointer &)
static const size_t TheBackupLimit
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Icap, ModXact)
#define SQUIDSTRINGPRINT(s)
#define TexcHere(msg)
legacy convenience macro; it is not difficult to type Here() now
void error(char *format,...)
squidaio_request_t * head
void base64_encode_init(struct base64_encode_ctx *ctx)
size_t base64_encode_update(struct base64_encode_ctx *ctx, char *dst, size_t length, const uint8_t *src)
size_t base64_encode_final(struct base64_encode_ctx *ctx, char *dst)
#define base64_encode_len(length)
static Answer Forward(Http::Message *aMsg)
create an akForward answer
static int send_client_ip
static int use_indirect_client
static char * masterx_shared_name
static Notes metaHeaders
The list of configured meta headers.
int recordXactStart(const String &serviceId, const timeval &when, bool retrying)
record the start of a xact, return xact history ID
void updateXxRecord(const char *name, const String &value)
sets or resets a cross-transactional database record
NotePairs::Pointer metaHeaders
bool getXxRecord(String &name, String &value) const
returns true and fills the record fields iff there is a db record
void updateNextServices(const String &services)
sets or resets next services for the Adaptation::Iterator to notice
void recordMeta(const HttpHeader *lm)
store the last meta header fields received from the adaptation service
void recordXactFinish(int hid)
record the end of a xact identified by its history ID
static const SBuf UseOriginalBodyName
void parse(Tokenizer &tok, const SBuf &extName) override
extracts and then interprets (or ignores) the extension value
int client_username_encode
char * client_username_header
String log_uri
the request uri
void start(const char *context)
record the start of an ICAP processing interval
void stop(const char *context)
note the end of an ICAP processing interval
String rfc931
the username from ident
String ssluser
the username from SSL
LogTags logType
the squid request status (TCP_MISS etc)
void setHeader(Header *h)
void setCause(HttpRequest *r)
void updateHistory(bool start)
starts or stops transaction accounting in ICAP history
Xaction * createXaction() override
ModXactLauncher(Http::Message *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp, Adaptation::ServicePointer s)
const char * virginContentData(const VirginBodyAct &act) const
void openChunk(MemBuf &buf, size_t chunkSize, bool ieof)
void noteMoreBodyDataAvailable(BodyPipe::Pointer) override
int64_t replyHttpHeaderSize
void start() override
called by AsyncStart; do not call directly
void packHead(MemBuf &httpBuf, const Http::Message *head)
TrailerParser * trailerParser
void stopWriting(bool nicely)
void handleCommWroteHeaders()
void decideOnParsingBody()
void noteBodyProductionEnded(BodyPipe::Pointer) override
Http1::TeChunkedParser * bodyParser
bool doneAll() const override
whether positive goal has been reached
void makeAllowHeader(MemBuf &buf)
void clearError() override
clear stored error details, if any; used for retries/repeats
void maybeAllocateHttpMsg()
void writeSomeBody(const char *label, size_t size)
void callException(const std::exception &e) override
called when the job throws during an async call
bool expectHttpBody() const
whether ICAP response header indicates HTTP body presence
bool parseHead(Http::Message *head)
bool fillVirginHttpHeader(MemBuf &) const override
void stopSending(bool nicely)
bool virginBodyEndReached(const VirginBodyAct &act) const
bool gotEncapsulated(const char *section) const
AccessLogEntry::Pointer alMaster
Master transaction AccessLogEntry.
void updateSources()
Update the Http::Message sources.
void stopParsing(const bool checkUnparsedData=true)
bool parsePart(Part *part, const char *description)
void startShoveling() override
starts sending/receiving ICAP messages
void estimateVirginBody()
void detailError(const ErrorDetail::Pointer &errDetail) override
record error detail in the virgin request if possible
void handleUnknownScode()
int adaptHistoryId
adaptation history slot reservation
void closeChunk(MemBuf &buf)
void addLastRequestChunk(MemBuf &buf)
void disableBypass(const char *reason, bool includeGroupBypass)
bool canBackupEverything() const
bool expectHttpHeader() const
whether ICAP response header indicates HTTP header presence
void noteBodyConsumerAborted(BodyPipe::Pointer) override
void handle206PartialContent()
ModXact(Http::Message *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp, ServiceRep::Pointer &s)
size_t virginContentSize(const VirginBodyAct &act) const
void handleCommRead(size_t size) override
void fillDoneStatus(MemBuf &buf) const override
void prepPartialBodyEchoing(uint64_t pos)
int64_t replyHttpBodySize
bool expectIcapTrailers() const
whether ICAP response header indicates ICAP trailers presence
void fillPendingStatus(MemBuf &buf) const override
void noteMoreBodySpaceAvailable(BodyPipe::Pointer) override
void makeRequestHeaders(MemBuf &buf)
void handleCommWroteBody()
void encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const Http::Message *head)
void finishNullOrEmptyBodyPreview(MemBuf &buf)
void noteBodyProducerAborted(BodyPipe::Pointer) override
void makeAdaptedBodyPipe(const char *what)
void makeUsernameHeader(const HttpRequest *request, MemBuf &buf)
void handleCommWrote(size_t size) override
void finalizeLogInfo() override
void handle204NoContent()
void decideWritingAfterPreview(const char *previewKind)
determine state.writing after we wrote the entire preview
void noteServiceAvailable()
const HttpRequest & virginRequest() const
locates the request, either as a cause or as a virgin message itself
void wrote(size_t size, bool wroteEof)
void expect(int64_t aSize)
Parses and stores ICAP trailer header block.
bool parse(const char *buf, int len, int atEnd, Http::StatusCode *error)
void progress(size_t size)
void start() override
called by AsyncStart; do not call directly
void callException(const std::exception &e) override
called when the job throws during an async call
HttpReply::Pointer icapReply
received ICAP reply, if any
bool doneAll() const override
whether positive goal has been reached
virtual void fillDoneStatus(MemBuf &buf) const
const char * status() const override
internal cleanup; do not call directly
virtual void fillPendingStatus(MemBuf &buf) const
virtual void finalizeLogInfo()
const char * methodStr() const
const ServiceConfig & cfg() const
char const * username() const
uint64_t producedSize() const
const MemBuf & buf() const
bool bodySizeKnown() const
void consume(size_t size)
const char * status() const
uint64_t bodySize() const
Adaptation::History::Pointer adaptHistory(bool createIfNone=false) const
Returns possibly nil history, creating it if requested.
void clearError()
clear error details, useful for retries/repeats
Ip::Address indirect_client_addr
void detailError(const err_type c, const ErrorDetail::Pointer &d)
sets error detail if no earlier detail was available
Adaptation::History::Pointer adaptLogHistory() const
Returns possibly nil history, creating it if adapt. logging is enabled.
Auth::UserRequest::Pointer auth_user_request
Adaptation::Icap::History::Pointer icapHistory() const
Returns possibly nil history, creating it if icap logging is enabled.
AnyP::Uri url
the request URI
void applyTrailerRules()
prohibits Content-Length in GET/HEAD requests
common parts of HttpRequest and HttpReply
@ srcIcaps
Secure ICAP service.
@ srcIcap
traditional ICAP service without encryption
virtual bool expectingBody(const HttpRequestMethod &, int64_t &) const =0
virtual bool inheritProperties(const Http::Message *)=0
BodyPipe::Pointer body_pipe
optional pipeline to receive message body
AnyP::ProtocolVersion http_ver
::Parser::Tokenizer Tokenizer
void parseExtensionValuesWith(ChunkExtensionValueParser *parser)
Http::StatusCode status() const
retrieve the status code for this status line
char * toStr(char *buf, const unsigned int blen, int force=AF_UNSPEC) const
mb_size_t spaceSize() const
void append(const char *c, int sz) override
void init(mb_size_t szInit, mb_size_t szMax)
char * content()
start of the added data
mb_size_t contentSize() const
available data size
void add(const SBuf &key, const SBuf &value)
bool hasPair(const SBuf &key, const SBuf &value) const
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
const char * rawContent() const
size_type length() const
Returns the number of bytes stored in SBuf.
char const * rawBuf() const
char const * termedBuf() const
an std::runtime_error with thrower location info
SourceLocationId id() const
same-location exceptions have the same ID
A const & max(A const &lhs, A const &rhs)
A const & min(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
void HTTPMSGLOCK(Http::Message *a)
#define MAX_IPSTRLEN
Length of buffer that needs to be allocated to old a null-terminated IP-string.
const XactOutcome xoPartEcho
preserved virgin msg part (ICAP 206)
const XactOutcome xoModified
replaced virgin msg with adapted
const XactOutcome xoSatisfied
request satisfaction
const XactOutcome xoEcho
preserved virgin message (ICAP 204)
const char * FormatRfc1123(time_t)
const CharacterSet crlf("crlf","\r\n")
struct timeval current_time
the current UNIX time in timeval {seconds, microseconds} format