Inquirer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 16 Cache Manager API */
10 
11 #include "squid.h"
12 #include "AccessLogEntry.h"
13 #include "base/TextException.h"
14 #include "CacheManager.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "comm/Write.h"
18 #include "CommCalls.h"
19 #include "errorpage.h"
20 #include "HttpReply.h"
21 #include "HttpRequest.h"
22 #include "ipc/UdsOp.h"
23 #include "mgr/ActionWriter.h"
24 #include "mgr/Command.h"
25 #include "mgr/Inquirer.h"
26 #include "mgr/IntParam.h"
27 #include "mgr/Request.h"
28 #include "mgr/Response.h"
29 
30 #include <memory>
31 #include <algorithm>
32 
34 
36  const Request &aCause, const Ipc::StrandCoords &coords):
37  Ipc::Inquirer(aCause.clone(), applyQueryParams(coords, aCause.params.queryParams), anAction->atomic() ? 10 : 100),
38  aggrAction(anAction)
39 {
40  conn = aCause.conn;
41  Ipc::ImportFdIntoComm(conn, SOCK_STREAM, IPPROTO_TCP, Ipc::fdnHttpSocket);
42 
43  debugs(16, 5, conn << " action: " << aggrAction);
44 
45  closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
48 }
49 
51 void
53 {
54  if (Comm::IsConnOpen(conn)) {
55  removeCloseHandler();
56  conn->close();
57  }
58 }
59 
60 void
62 {
63  if (closer != nullptr) {
64  comm_remove_close_handler(conn->fd, closer);
65  closer = nullptr;
66  }
67 }
68 
69 void
71 {
72  debugs(16, 5, MYNAME);
74  Must(Comm::IsConnOpen(conn));
75  Must(aggrAction != nullptr);
76 
77  const auto &origin = aggrAction->command().params.httpOrigin;
78  const auto originOrNil = origin.size() ? origin.termedBuf() : nullptr;
79 
80  std::unique_ptr<MemBuf> replyBuf;
81  if (strands.empty()) {
82  const char *url = aggrAction->command().params.httpUri.termedBuf();
83  const auto mx = MasterXaction::MakePortless<XactionInitiator::initIpc>();
84  auto *req = HttpRequest::FromUrlXXX(url, mx);
85  ErrorState err(ERR_INVALID_URL, Http::scNotFound, req, nullptr);
86  std::unique_ptr<HttpReply> reply(err.BuildHttpReply());
87  CacheManager::PutCommonResponseHeaders(*reply, originOrNil);
88  replyBuf.reset(reply->pack());
89  } else {
90  std::unique_ptr<HttpReply> reply(new HttpReply);
91  reply->setHeaders(Http::scOkay, nullptr, aggrAction->contentType(), -1, squid_curtime, squid_curtime);
92  CacheManager::PutCommonResponseHeaders(*reply, originOrNil);
93  reply->header.putStr(Http::HdrType::CONNECTION, "close"); // until we chunk response
94  replyBuf.reset(reply->pack());
95  }
96  writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
98  Comm::Write(conn, replyBuf.get(), writer);
99 }
100 
102 void
104 {
105  debugs(16, 5, MYNAME);
106  writer = nullptr;
107  Must(params.flag == Comm::OK);
108  Must(params.conn.getRaw() == conn.getRaw());
109  Must(params.size != 0);
110  // start inquiries at the initial pos
111  inquire();
112 }
113 
115 void
117 {
118  debugs(16, 5, MYNAME);
119  closer = nullptr;
120  if (conn) {
121  conn->noteClosure();
122  conn = nullptr;
123  }
124  mustStop("commClosed");
125 }
126 
127 bool
129 {
130  Mgr::Response& response = static_cast<Response&>(*aResponse);
131  if (response.hasAction())
132  aggrAction->add(response.getAction());
133  return true;
134 }
135 
136 void
138 {
139  if (!strands.empty() && aggrAction->aggregatable()) {
140  removeCloseHandler();
141  AsyncJob::Start(new ActionWriter(aggrAction, conn));
142  conn = nullptr; // should not close because we passed it to ActionWriter
143  }
144 }
145 
146 bool
148 {
149  return !writer && Ipc::Inquirer::doneAll();
150 }
151 
154 {
156 
157  QueryParam::Pointer processesParam = aParams.get("processes");
158  QueryParam::Pointer workersParam = aParams.get("workers");
159 
160  if (processesParam == nullptr || workersParam == nullptr) {
161  if (processesParam != nullptr) {
162  IntParam* param = dynamic_cast<IntParam*>(processesParam.getRaw());
163  if (param != nullptr && param->type == QueryParam::ptInt) {
164  const std::vector<int>& processes = param->value();
165  for (Ipc::StrandCoords::const_iterator iter = aStrands.begin();
166  iter != aStrands.end(); ++iter) {
167  if (std::find(processes.begin(), processes.end(), iter->kidId) != processes.end())
168  sc.push_back(*iter);
169  }
170  }
171  } else if (workersParam != nullptr) {
172  IntParam* param = dynamic_cast<IntParam*>(workersParam.getRaw());
173  if (param != nullptr && param->type == QueryParam::ptInt) {
174  const std::vector<int>& workers = param->value();
175  for (int i = 0; i < (int)aStrands.size(); ++i) {
176  if (std::find(workers.begin(), workers.end(), i + 1) != workers.end())
177  sc.push_back(aStrands[i]);
178  }
179  }
180  } else {
181  sc = aStrands;
182  }
183  }
184 
185  debugs(16, 4, "strands kid IDs = ");
186  for (Ipc::StrandCoords::const_iterator iter = sc.begin(); iter != sc.end(); ++iter) {
187  debugs(16, 4, iter->kidId);
188  }
189 
190  return sc;
191 }
192 
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:952
void noteWroteHeader(const CommIoCbParams &params)
called when we wrote the response header
Definition: Inquirer.cc:103
@ ERR_INVALID_URL
Definition: forward.h:45
static void PutCommonResponseHeaders(HttpReply &, const char *httpOrigin)
HttpHeader header
Definition: Message.h:74
Inquirer(Action::Pointer anAction, const Request &aCause, const Ipc::StrandCoords &coords)
Definition: Inquirer.cc:35
HttpReply * BuildHttpReply(void)
Definition: errorpage.cc:1315
void sendResponse() override
send response to client
Definition: Inquirer.cc:137
bool doneAll() const override
whether positive goal has been reached
Definition: Inquirer.cc:146
QueryParam::Pointer get(const String &name) const
returns query parameter by name
Definition: QueryParams.cc:23
const std::vector< int > & value() const
Definition: IntParam.cc:48
C * getRaw() const
Definition: RefCount.h:89
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
@ OK
Definition: Flag.h:16
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
void noteCommClosed(const CommCloseCbParams &params)
called when the HTTP client or some external force closed our socket
Definition: Inquirer.cc:116
CBDATA_NAMESPACED_CLASS_INIT(Mgr, Inquirer)
void start() override
called by AsyncStart; do not call directly
Definition: Inquirer.cc:89
Action::Pointer aggrAction
Definition: Inquirer.h:51
Ipc::StrandCoords applyQueryParams(const Ipc::StrandCoords &aStrands, const QueryParams &aParams)
Definition: Inquirer.cc:153
MemBuf * pack() const
Definition: HttpReply.cc:112
const Action & getAction() const
returns action object
Definition: Response.cc:67
bool doneAll() const override
whether positive goal has been reached
Definition: Inquirer.cc:147
cache manager request
Definition: Request.h:23
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
bool hasAction() const
whether response contain action object
Definition: Response.cc:61
void cleanup() override
closes our copy of the client HTTP connection socket
Definition: Inquirer.cc:52
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:82
bool aggregate(Ipc::Response::Pointer aResponse) override
perform aggregating of responses and returns true if need to continue
Definition: Inquirer.cc:128
@ fdnHttpSocket
Definition: FdNotes.h:20
static int sc[16]
Definition: smbdes.c:121
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
time_t squid_curtime
Definition: stub_libtime.cc:20
@ scNotFound
Definition: StatusCode.h:49
void removeCloseHandler()
Definition: Inquirer.cc:61
void putStr(Http::HdrType id, const char *str)
Definition: HttpHeader.cc:995
void start() override
called by AsyncStart; do not call directly
Definition: Inquirer.cc:70
static HttpRequest * FromUrlXXX(const char *url, const MasterXaction::Pointer &, const HttpRequestMethod &method=Http::METHOD_GET)
Definition: HttpRequest.cc:528
#define Must(condition)
Definition: TextException.h:75
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
#define MYNAME
Definition: Stream.h:219
Comm::ConnectionPointer conn
HTTP client socket descriptor.
Definition: Inquirer.h:53
AsyncCall::Pointer closer
comm_close handler
Definition: Inquirer.h:56
const Comm::ConnectionPointer & ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, FdNoteId noteId)
import socket fd from another strand into our Comm state
Definition: UdsOp.cc:194
void setHeaders(Http::StatusCode status, const char *reason, const char *ctype, int64_t clen, time_t lmt, time_t expires)
Definition: HttpReply.cc:170
@ scOkay
Definition: StatusCode.h:27
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:981
Definition: IpcIoFile.h:23
int unsigned int
Definition: stub_fd.cc:19
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition: Request.h:35
Cache Manager API.
Definition: Action.h:20

 

Introduction

Documentation

Support

Miscellaneous