Coordinator.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/Subscription.h"
13 #include "base/TextException.h"
14 #include "CacheManager.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "compat/unistd.h"
18 #include "ipc/Coordinator.h"
19 #include "ipc/SharedListen.h"
20 #include "mgr/Inquirer.h"
21 #include "mgr/Request.h"
22 #include "mgr/Response.h"
23 #include "tools.h"
24 #if SQUID_SNMP
25 #include "snmp/Inquirer.h"
26 #include "snmp/Request.h"
27 #include "snmp/Response.h"
28 #endif
29 
30 #include <cerrno>
31 
32 #if HAVE_SYS_UNISTD_H
33 #include <sys/unistd.h>
34 #endif
35 
36 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
38 
40  Port(Ipc::Port::CoordinatorAddr())
41 {
42 }
43 
45 {
46  Port::start();
47 }
48 
50 {
51  typedef StrandCoords::iterator SI;
52  for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
53  if (iter->kidId == kidId)
54  return &(*iter);
55  }
56  return nullptr;
57 }
58 
60 {
61  debugs(54, 3, "registering kid" << strand.kidId <<
62  ' ' << strand.tag);
63  if (StrandCoord* found = findStrand(strand.kidId)) {
64  const String oldTag = found->tag;
65  *found = strand;
66  if (oldTag.size() && !strand.tag.size())
67  found->tag = oldTag; // keep more detailed info (XXX?)
68  } else {
69  strands_.push_back(strand);
70  }
71 
72  // notify searchers waiting for this new strand, if any
73  typedef Searchers::iterator SRI;
74  for (SRI i = searchers.begin(); i != searchers.end();) {
75  if (i->tag == strand.tag) {
76  notifySearcher(*i, strand);
77  i = searchers.erase(i);
78  } else {
79  ++i;
80  }
81  }
82 }
83 
85 {
86  switch (message.rawType()) {
87  case mtRegisterStrand:
88  debugs(54, 6, "Registration request");
89  handleRegistrationRequest(StrandMessage(message));
90  break;
91 
92  case mtFindStrand: {
93  const StrandSearchRequest sr(message);
94  debugs(54, 6, "Strand search request: " << sr.requestorId <<
95  " tag: " << sr.tag);
96  handleSearchRequest(sr);
97  break;
98  }
99 
101  debugs(54, 6, "Shared listen request");
102  handleSharedListenRequest(SharedListenRequest(message));
103  break;
104 
105  case mtCacheMgrRequest: {
106  debugs(54, 6, "Cache manager request");
107  const Mgr::Request req(message);
108  handleCacheMgrRequest(req);
109  }
110  break;
111 
112  case mtCacheMgrResponse: {
113  debugs(54, 6, "Cache manager response");
114  const Mgr::Response resp(message);
115  handleCacheMgrResponse(Mine(resp));
116  }
117  break;
118 
119 #if SQUID_SNMP
120  case mtSnmpRequest: {
121  debugs(54, 6, "SNMP request");
122  const Snmp::Request req(message);
123  handleSnmpRequest(req);
124  }
125  break;
126 
127  case mtSnmpResponse: {
128  debugs(54, 6, "SNMP response");
129  const Snmp::Response resp(message);
130  handleSnmpResponse(Mine(resp));
131  }
132  break;
133 #endif
134 
135  default:
136  Port::receive(message);
137  break;
138  }
139 }
140 
142 {
143  registerStrand(msg.strand);
144 
145  // send back an acknowledgement; TODO: remove as not needed?
146  TypedMsgHdr message;
147  msg.pack(mtStrandRegistered, message);
148  SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
149 }
150 
151 void
153 {
154  debugs(54, 4, "kid" << request.requestorId <<
155  " needs shared listen FD for " << request.params.addr);
156  Listeners::const_iterator i = listeners.find(request.params);
157  int errNo = 0;
158  const Comm::ConnectionPointer c = (i != listeners.end()) ?
159  i->second : openListenSocket(request, errNo);
160 
161  debugs(54, 3, "sending shared listen " << c << " for " <<
162  request.params.addr << " to kid" << request.requestorId <<
163  " mapId=" << request.mapId);
164 
165  SharedListenResponse response(c->fd, errNo, request.mapId);
166  TypedMsgHdr message;
167  response.pack(message);
168  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
169 }
170 
171 void
173 {
174  debugs(54, 4, MYNAME);
175 
176  try {
177  Mgr::Action::Pointer action =
179  AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
180  } catch (const std::exception &ex) {
181  debugs(54, DBG_IMPORTANT, "ERROR: Squid BUG: cannot aggregate mgr:" <<
182  request.params.actionName << ": " << ex.what());
183  // TODO: Avoid half-baked Connections or teach them how to close.
184  xclose(request.conn->fd);
185  request.conn->fd = -1;
186  return; // the worker will timeout and close
187  }
188 
189  // Let the strand know that we are now responsible for handling the request
190  Mgr::Response response(request.requestId);
191  TypedMsgHdr message;
192  response.pack(message);
193  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
194 
195 }
196 
197 void
199 {
201 }
202 
203 void
205 {
206  // do we know of a strand with the given search tag?
207  const StrandCoord *strand = nullptr;
208  typedef StrandCoords::const_iterator SCCI;
209  for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
210  if (i->tag == request.tag)
211  strand = &(*i);
212  }
213 
214  if (strand) {
215  notifySearcher(request, *strand);
216  return;
217  }
218 
219  searchers.push_back(request);
220  debugs(54, 3, "cannot yet tell kid" << request.requestorId <<
221  " who " << request.tag << " is");
222 }
223 
224 void
226  const StrandCoord& strand)
227 {
228  debugs(54, 3, "tell kid" << request.requestorId << " that " <<
229  request.tag << " is kid" << strand.kidId);
230  const StrandMessage response(strand, request.qid);
231  TypedMsgHdr message;
232  response.pack(mtStrandReady, message);
233  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
234 }
235 
236 #if SQUID_SNMP
237 void
239 {
240  debugs(54, 4, MYNAME);
241 
242  Snmp::Response response(request.requestId);
243  TypedMsgHdr message;
244  response.pack(message);
245  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
246 
247  AsyncJob::Start(new Snmp::Inquirer(request, strands_));
248 }
249 
250 void
252 {
253  debugs(54, 4, MYNAME);
255 }
256 #endif
257 
260  int &errNo)
261 {
262  const OpenListenerParams &p = request.params;
263 
264  debugs(54, 6, "opening listen FD at " << p.addr << " for kid" <<
265  request.requestorId);
266 
268  newConn->local = p.addr; // comm_open_listener may modify it
269  newConn->flags = p.flags;
270 
271  enter_suid();
272  comm_open_listener(p.sock_type, p.proto, newConn, FdNote(p.fdNote));
273  errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
274  leave_suid();
275 
276  debugs(54, 6, "tried listening on " << newConn << " for kid" <<
277  request.requestorId);
278 
279  // cache positive results
280  if (Comm::IsConnOpen(newConn))
281  listeners[request.params] = newConn;
282 
283  return newConn;
284 }
285 
287 {
288  typedef StrandCoords::const_iterator SCI;
289  for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
290  debugs(54, 5, "signal " << sig << " to kid" << iter->kidId <<
291  ", PID=" << iter->pid);
292  kill(iter->pid, sig);
293  }
294 }
295 
297 {
298  if (!TheInstance)
299  TheInstance = new Coordinator;
300  // XXX: if the Coordinator job quits, this pointer will become invalid
301  // we could make Coordinator death fatal, except during exit, but since
302  // Strands do not re-register, even process death would be pointless.
303  return TheInstance;
304 }
305 
306 const Ipc::StrandCoords&
308 {
309  return strands_;
310 }
311 
void receive(const TypedMsgHdr &message) override
Definition: Coordinator.cc:84
@ mtStrandReady
an mtFindStrand answer: the strand exists and should be usable
Definition: Messages.h:26
RequestId requestId
matches the request[or] with the response
Definition: Request.h:38
void handleSearchRequest(const StrandSearchRequest &request)
answers or queues the request if the answer is not yet known
Definition: Coordinator.cc:204
StrandCoord strand
messageType-specific coordinates (e.g., sender)
Definition: StrandCoord.h:52
int kidId
internal Squid process number
Definition: StrandCoord.h:31
a request for a listen socket with given parameters
Definition: SharedListen.h:46
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:24
@ mtCacheMgrResponse
Definition: Messages.h:36
int requestorId
sender-provided return address
Definition: StrandSearch.h:29
int fdNote
index into fd_note() comment strings
Definition: SharedListen.h:36
@ mtStrandRegistered
acknowledges mtRegisterStrand acceptance
Definition: Messages.h:23
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
const Answer & Mine(const Answer &answer)
Definition: QuestionerId.h:56
static CacheManager * GetInstance()
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
Definition: FdNotes.cc:16
@ mtSnmpRequest
Definition: Messages.h:39
"shared listen" is when concurrent processes are listening on the same fd
Definition: SharedListen.h:28
@ mtSharedListenRequest
Definition: Messages.h:28
int rawType() const
Definition: TypedMsgHdr.h:51
void broadcastSignal(int sig) const
send sig to registered strands
Definition: Coordinator.cc:286
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition: Response.cc:43
void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note)
Definition: comm.cc:259
void leave_suid(void)
Definition: tools.cc:560
String tag
set when looking for a matching StrandCoord::tag
Definition: StrandSearch.h:30
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition: Response.cc:31
void handleSnmpResponse(const Snmp::Response &response)
Definition: Coordinator.cc:251
@ mtCacheMgrRequest
Definition: Messages.h:35
void start() override
called by AsyncStart; do not call directly
Definition: Coordinator.cc:44
void handleCacheMgrRequest(const Mgr::Request &request)
Definition: Coordinator.cc:172
void pack(MessageType, TypedMsgHdr &) const
Definition: StrandCoord.cc:54
int requestorId
kidId of the requestor; used for response destination
Definition: Request.h:37
SNMP request.
Definition: Request.h:24
@ mtSnmpResponse
Definition: Messages.h:40
StrandCoord * findStrand(int kidId)
registered strand or NULL
Definition: Coordinator.cc:49
static Coordinator * TheInstance
the only class instance in existence
Definition: Coordinator.h:77
Ip::Address local
Definition: Connection.h:149
cache manager request
Definition: Request.h:23
Waits for and receives incoming IPC messages; kids handle the messages.
Definition: Port.h:21
virtual void receive(const TypedMsgHdr &)=0
Definition: Port.cc:76
void notifySearcher(const StrandSearchRequest &request, const StrandCoord &)
answer the waiting search request
Definition: Coordinator.cc:225
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:189
SSL Connection
Definition: Session.h:49
Mgr::Action::Pointer createRequestedAction(const Mgr::ActionParams &)
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator)
@ mtFindStrand
a worker requests a strand from Coordinator
Definition: Messages.h:25
asynchronous strand search request
Definition: StrandSearch.h:21
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Coordinates shared activities of Strands (Squid processes or threads)
Definition: Coordinator.h:30
OpenListenerParams params
actual comm_open_sharedListen() parameters
Definition: SharedListen.h:56
Strand location details.
Definition: StrandCoord.h:21
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:34
ActionParams params
action name and parameters
Definition: Request.h:37
void registerStrand(const StrandCoord &)
adds or updates existing
Definition: Coordinator.cc:59
static Coordinator * Instance()
Definition: Coordinator.cc:296
an IPC message carrying StrandCoord
Definition: StrandCoord.h:38
@ mtRegisterStrand
notifies about our strand existence
Definition: Messages.h:22
void handleCacheMgrResponse(const Mgr::Response &response)
Definition: Coordinator.cc:198
void start() override=0
called by AsyncStart; do not call directly
Definition: Port.cc:32
a response to SharedListenRequest
Definition: SharedListen.h:62
Ip::Address addr
will be memset and memcopied
Definition: SharedListen.h:39
QuestionerId qid
the sender of the request
Definition: StrandSearch.h:31
size_type size() const
Definition: SquidString.h:74
const StrandCoords & strands() const
currently registered strands
Definition: Coordinator.cc:307
void enter_suid(void)
Definition: tools.cc:624
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition: Inquirer.cc:171
#define DBG_IMPORTANT
Definition: Stream.h:38
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:34
#define MYNAME
Definition: Stream.h:219
String actionName
action name (and credentials realm)
Definition: ActionParams.h:39
void handleRegistrationRequest(const StrandMessage &)
register,ACK
Definition: Coordinator.cc:141
int xclose(int fd)
POSIX close(2) equivalent.
Definition: unistd.h:43
Comm::ConnectionPointer openListenSocket(const SharedListenRequest &request, int &errNo)
calls comm_open_listener()
Definition: Coordinator.cc:259
RequestId mapId
to map future response to the requestor's callback
Definition: SharedListen.h:58
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
int requestorId
kidId of the requestor
Definition: SharedListen.h:54
void handleSnmpRequest(const Snmp::Request &request)
Definition: Coordinator.cc:238
void handleSharedListenRequest(const SharedListenRequest &request)
returns cached socket or calls openListenSocket()
Definition: Coordinator.cc:152
Definition: IpcIoFile.h:23
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition: Request.h:35

 

Introduction

Documentation

Support

Miscellaneous