Inquirer.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "comm.h"
14 #include "comm/Write.h"
15 #include "ipc/Inquirer.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "mem/PoolingAllocator.h"
19 #include "MemBuf.h"
20 
21 #include <algorithm>
22 #include <unordered_map>
23 
25 
26 namespace Ipc {
27 
30 using WaitingInquiriesItem = std::pair<const RequestId::Index, InquirerPointer>;
31 using WaitingInquiries = std::unordered_map<
34  std::hash<RequestId::Index>,
35  std::equal_to<RequestId::Index>,
37 
40 
42 static InquirerPointer
44 {
45  debugs(54, 3, "requestId " << requestId);
46  Assure(requestId != 0);
47  const auto request = TheWaitingInquirers.find(requestId);
48  if (request != TheWaitingInquirers.end()) {
49  const auto inquirer = request->second;
50  TheWaitingInquirers.erase(request);
51  return inquirer; // may already be gone by now
52  }
53  return nullptr;
54 }
55 
56 } // namespace Ipc
57 
59 static bool
61 {
62  return c1.kidId < c2.kidId;
63 }
64 
66  double aTimeout):
67  AsyncJob("Ipc::Inquirer"),
68  codeContext(CodeContext::Current()),
69  request(aRequest), strands(coords), pos(strands.begin()), timeout(aTimeout)
70 {
71  debugs(54, 5, MYNAME);
72 
73  // order by ascending kid IDs; useful for non-aggregatable stats
74  std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
75 }
76 
78 {
79  debugs(54, 5, MYNAME);
80  cleanup();
81 }
82 
83 void
85 {
86 }
87 
88 void
90 {
91  request->requestId = 0;
92 }
93 
94 void
96 {
97  if (pos == strands.end()) {
98  Must(done());
99  return;
100  }
101 
102  Must(request->requestId == 0);
103  if (++LastRequestId == 0) // don't use zero value as request->requestId
104  ++LastRequestId;
105  request->requestId = LastRequestId;
106  const int kidId = pos->kidId;
107  debugs(54, 4, "inquire kid: " << kidId << status());
108  TheWaitingInquirers[request->requestId] = this;
109  TypedMsgHdr message;
110  request->pack(message);
111  SendMessage(Port::MakeAddr(strandAddrLabel, kidId), message);
112  eventAdd("Ipc::Inquirer::requestTimedOut", &Inquirer::RequestTimedOut,
113  this, timeout, 0, false);
114 }
115 
117 void
119 {
120  debugs(54, 4, status());
121  request->requestId = 0;
122  removeTimeoutEvent();
123  if (aggregate(response)) {
124  Must(!done()); // or we should not be called
125  ++pos; // advance after a successful inquiry
126  inquire();
127  } else {
128  mustStop("error");
129  }
130 }
131 
132 void
134 {
135  debugs(54, 5, MYNAME);
136  removeTimeoutEvent();
137  if (request->requestId > 0) {
138  DequeueRequest(request->requestId);
139  request->requestId = 0;
140  }
141  sendResponse();
142  cleanup();
143 }
144 
145 bool
147 {
148  return pos == strands.end();
149 }
150 
151 void
152 Ipc::Inquirer::handleException(const std::exception& e)
153 {
154  debugs(54, 3, e.what());
155  mustStop("exception");
156 }
157 
158 void
159 Ipc::Inquirer::callException(const std::exception& e)
160 {
161  debugs(54, 3, MYNAME);
162  try {
163  handleException(e);
164  } catch (const std::exception& ex) {
165  debugs(54, DBG_CRITICAL, ex.what());
166  }
168 }
169 
170 void
172 {
173  Must(response.requestId != 0);
174  const auto inquirer = DequeueRequest(response.requestId);
175  if (inquirer.valid()) {
176  CallService(inquirer->codeContext, [&] {
177  const auto call = asyncCall(54, 5, "Ipc::Inquirer::handleRemoteAck",
178  JobMemFun(inquirer, &Inquirer::handleRemoteAck, response.clone()));
179  ScheduleCallHere(call);
180  });
181  }
182 }
183 
185 void
187 {
190 }
191 
193 void
195 {
196  debugs(54, 3, MYNAME);
197  Must(param != nullptr);
198  Inquirer* cmi = static_cast<Inquirer*>(param);
199  // use async call to enable job call protection that time events lack
200  CallBack(cmi->codeContext, [&cmi] {
201  CallJobHere(54, 5, cmi, Inquirer, requestTimedOut);
202  });
203 }
204 
206 void
208 {
209  debugs(54, 3, MYNAME);
210  if (request->requestId != 0) {
211  DequeueRequest(request->requestId);
212  request->requestId = 0;
213  Must(!done()); // or we should not be called
214  ++pos; // advance after a failed inquiry
215  inquire();
216  }
217 }
218 
219 const char*
221 {
222  static MemBuf buf;
223  buf.reset();
224  buf.appendf(" [requestId %u]", request->requestId.index());
225  buf.terminate();
226  return buf.content();
227 }
228 
static String MakeAddr(const char *proccessLabel, int id)
calculates IPC message address for strand id of processLabel type
Definition: Port.cc:52
int eventFind(EVH *func, void *arg)
Definition: event.cc:145
void terminate()
Definition: MemBuf.cc:241
std::unordered_map< RequestId::Index, InquirerPointer, std::hash< RequestId::Index >, std::equal_to< RequestId::Index >, PoolingAllocator< WaitingInquiriesItem > > WaitingInquiries
Definition: Inquirer.cc:36
void appendf(const char *fmt,...) PRINTF_FORMAT_ARG2
Append operation with printf-style arguments.
Definition: Packable.h:61
#define DBG_CRITICAL
Definition: Stream.h:37
CodeContextPointer codeContext
Definition: Inquirer.h:41
void eventDelete(EVH *func, void *arg)
Definition: event.cc:127
void swanSong() override
Definition: Inquirer.cc:133
static WaitingInquiries TheWaitingInquirers
pending Inquirer requests for this process
Definition: Inquirer.cc:39
int kidId
internal Squid process number
Definition: StrandCoord.h:31
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:23
bool doneAll() const override
whether positive goal has been reached
Definition: Inquirer.cc:146
static InquirerPointer DequeueRequest(const RequestId::Index requestId)
returns and forgets the Inquirer waiting for the given requests
Definition: Inquirer.cc:43
void CallBack(const CodeContext::Pointer &callbackContext, Fun &&callback)
Definition: CodeContext.h:126
A response to Ipc::Request.
Definition: Response.h:23
void start() override
called by AsyncStart; do not call directly
Definition: Inquirer.cc:89
CbcPointer< Inquirer > InquirerPointer
maps request->id to the Inquirer waiting for the response to that request
Definition: Inquirer.cc:29
const char * status() const override
internal cleanup; do not call directly
Definition: Inquirer.cc:220
Inquirer(Request::Pointer aRequest, const Ipc::StrandCoords &coords, double aTimeout)
Definition: Inquirer.cc:65
void requestTimedOut()
called when the strand failed to respond (or finish responding) in time
Definition: Inquirer.cc:207
Definition: MemBuf.h:23
static void RequestTimedOut(void *param)
Ipc::Inquirer::requestTimedOut wrapper.
Definition: Inquirer.cc:194
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
virtual void cleanup()
perform cleanup actions on completion of job
Definition: Inquirer.cc:84
#define Assure(condition)
Definition: Assure.h:35
void callException(const std::exception &e) override
called when the job throws during an async call
Definition: Inquirer.cc:159
RequestId requestId
the ID of the request we are responding to
Definition: Response.h:36
virtual void handleException(const std::exception &e)
do specific exception handling
Definition: Inquirer.cc:152
Strand location details.
Definition: StrandCoord.h:21
static bool LesserStrandByKidId(const Ipc::StrandCoord &c1, const Ipc::StrandCoord &c2)
compare Ipc::StrandCoord using kidId, for std::sort() below
Definition: Inquirer.cc:60
char * content()
start of the added data
Definition: MemBuf.h:41
STL Allocator that uses Squid memory pools for memory management.
virtual void inquire()
inquire the next strand
Definition: Inquirer.cc:95
void removeTimeoutEvent()
called when we are no longer waiting for the strand to respond
Definition: Inquirer.cc:186
#define Must(condition)
Definition: TextException.h:75
unsigned int Index
Definition: RequestId.h:27
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition: Inquirer.cc:171
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:34
#define MYNAME
Definition: Stream.h:219
void reset()
Definition: MemBuf.cc:129
static RequestId::Index LastRequestId
last requestId used
Definition: Inquirer.h:76
~Inquirer() override
Definition: Inquirer.cc:77
void handleRemoteAck(Response::Pointer response)
called when a strand is done writing its output
Definition: Inquirer.cc:118
void CallService(const CodeContext::Pointer &serviceContext, Fun &&service)
Definition: CodeContext.h:151
Ipc::StrandCoords strands
all strands we want to query, in order
Definition: Inquirer.h:71
virtual void callException(const std::exception &e)
called when the job throws during an async call
Definition: AsyncJob.cc:143
std::pair< const RequestId::Index, InquirerPointer > WaitingInquiriesItem
Definition: Inquirer.cc:30
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
Definition: IpcIoFile.h:23

 

Introduction

Documentation

Support

Miscellaneous