Coordinator.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/Subscription.h"
13 #include "base/TextException.h"
14 #include "CacheManager.h"
15 #include "comm.h"
16 #include "comm/Connection.h"
17 #include "ipc/Coordinator.h"
18 #include "ipc/SharedListen.h"
19 #include "mgr/Inquirer.h"
20 #include "mgr/Request.h"
21 #include "mgr/Response.h"
22 #include "tools.h"
23 #if SQUID_SNMP
24 #include "snmp/Inquirer.h"
25 #include "snmp/Request.h"
26 #include "snmp/Response.h"
27 #endif
28 
29 #include <cerrno>
30 
31 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
33 
35  Port(Ipc::Port::CoordinatorAddr())
36 {
37 }
38 
40 {
41  Port::start();
42 }
43 
45 {
46  typedef StrandCoords::iterator SI;
47  for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
48  if (iter->kidId == kidId)
49  return &(*iter);
50  }
51  return nullptr;
52 }
53 
55 {
56  debugs(54, 3, "registering kid" << strand.kidId <<
57  ' ' << strand.tag);
58  if (StrandCoord* found = findStrand(strand.kidId)) {
59  const String oldTag = found->tag;
60  *found = strand;
61  if (oldTag.size() && !strand.tag.size())
62  found->tag = oldTag; // keep more detailed info (XXX?)
63  } else {
64  strands_.push_back(strand);
65  }
66 
67  // notify searchers waiting for this new strand, if any
68  typedef Searchers::iterator SRI;
69  for (SRI i = searchers.begin(); i != searchers.end();) {
70  if (i->tag == strand.tag) {
71  notifySearcher(*i, strand);
72  i = searchers.erase(i);
73  } else {
74  ++i;
75  }
76  }
77 }
78 
80 {
81  switch (message.rawType()) {
82  case mtRegisterStrand:
83  debugs(54, 6, "Registration request");
84  handleRegistrationRequest(StrandMessage(message));
85  break;
86 
87  case mtFindStrand: {
88  const StrandSearchRequest sr(message);
89  debugs(54, 6, "Strand search request: " << sr.requestorId <<
90  " tag: " << sr.tag);
91  handleSearchRequest(sr);
92  break;
93  }
94 
96  debugs(54, 6, "Shared listen request");
97  handleSharedListenRequest(SharedListenRequest(message));
98  break;
99 
100  case mtCacheMgrRequest: {
101  debugs(54, 6, "Cache manager request");
102  const Mgr::Request req(message);
103  handleCacheMgrRequest(req);
104  }
105  break;
106 
107  case mtCacheMgrResponse: {
108  debugs(54, 6, "Cache manager response");
109  const Mgr::Response resp(message);
110  handleCacheMgrResponse(Mine(resp));
111  }
112  break;
113 
114 #if SQUID_SNMP
115  case mtSnmpRequest: {
116  debugs(54, 6, "SNMP request");
117  const Snmp::Request req(message);
118  handleSnmpRequest(req);
119  }
120  break;
121 
122  case mtSnmpResponse: {
123  debugs(54, 6, "SNMP response");
124  const Snmp::Response resp(message);
125  handleSnmpResponse(Mine(resp));
126  }
127  break;
128 #endif
129 
130  default:
131  Port::receive(message);
132  break;
133  }
134 }
135 
137 {
138  registerStrand(msg.strand);
139 
140  // send back an acknowledgement; TODO: remove as not needed?
141  TypedMsgHdr message;
142  msg.pack(mtStrandRegistered, message);
143  SendMessage(MakeAddr(strandAddrLabel, msg.strand.kidId), message);
144 }
145 
146 void
148 {
149  debugs(54, 4, "kid" << request.requestorId <<
150  " needs shared listen FD for " << request.params.addr);
151  Listeners::const_iterator i = listeners.find(request.params);
152  int errNo = 0;
153  const Comm::ConnectionPointer c = (i != listeners.end()) ?
154  i->second : openListenSocket(request, errNo);
155 
156  debugs(54, 3, "sending shared listen " << c << " for " <<
157  request.params.addr << " to kid" << request.requestorId <<
158  " mapId=" << request.mapId);
159 
160  SharedListenResponse response(c->fd, errNo, request.mapId);
161  TypedMsgHdr message;
162  response.pack(message);
163  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
164 }
165 
166 void
168 {
169  debugs(54, 4, MYNAME);
170 
171  try {
172  Mgr::Action::Pointer action =
174  AsyncJob::Start(new Mgr::Inquirer(action, request, strands_));
175  } catch (const std::exception &ex) {
176  debugs(54, DBG_IMPORTANT, "ERROR: Squid BUG: cannot aggregate mgr:" <<
177  request.params.actionName << ": " << ex.what());
178  // TODO: Avoid half-baked Connections or teach them how to close.
179  ::close(request.conn->fd);
180  request.conn->fd = -1;
181  return; // the worker will timeout and close
182  }
183 
184  // Let the strand know that we are now responsible for handling the request
185  Mgr::Response response(request.requestId);
186  TypedMsgHdr message;
187  response.pack(message);
188  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
189 
190 }
191 
192 void
194 {
196 }
197 
198 void
200 {
201  // do we know of a strand with the given search tag?
202  const StrandCoord *strand = nullptr;
203  typedef StrandCoords::const_iterator SCCI;
204  for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
205  if (i->tag == request.tag)
206  strand = &(*i);
207  }
208 
209  if (strand) {
210  notifySearcher(request, *strand);
211  return;
212  }
213 
214  searchers.push_back(request);
215  debugs(54, 3, "cannot yet tell kid" << request.requestorId <<
216  " who " << request.tag << " is");
217 }
218 
219 void
221  const StrandCoord& strand)
222 {
223  debugs(54, 3, "tell kid" << request.requestorId << " that " <<
224  request.tag << " is kid" << strand.kidId);
225  const StrandMessage response(strand, request.qid);
226  TypedMsgHdr message;
227  response.pack(mtStrandReady, message);
228  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
229 }
230 
231 #if SQUID_SNMP
232 void
234 {
235  debugs(54, 4, MYNAME);
236 
237  Snmp::Response response(request.requestId);
238  TypedMsgHdr message;
239  response.pack(message);
240  SendMessage(MakeAddr(strandAddrLabel, request.requestorId), message);
241 
242  AsyncJob::Start(new Snmp::Inquirer(request, strands_));
243 }
244 
245 void
247 {
248  debugs(54, 4, MYNAME);
250 }
251 #endif
252 
255  int &errNo)
256 {
257  const OpenListenerParams &p = request.params;
258 
259  debugs(54, 6, "opening listen FD at " << p.addr << " for kid" <<
260  request.requestorId);
261 
263  newConn->local = p.addr; // comm_open_listener may modify it
264  newConn->flags = p.flags;
265 
266  enter_suid();
267  comm_open_listener(p.sock_type, p.proto, newConn, FdNote(p.fdNote));
268  errNo = Comm::IsConnOpen(newConn) ? 0 : errno;
269  leave_suid();
270 
271  debugs(54, 6, "tried listening on " << newConn << " for kid" <<
272  request.requestorId);
273 
274  // cache positive results
275  if (Comm::IsConnOpen(newConn))
276  listeners[request.params] = newConn;
277 
278  return newConn;
279 }
280 
282 {
283  typedef StrandCoords::const_iterator SCI;
284  for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
285  debugs(54, 5, "signal " << sig << " to kid" << iter->kidId <<
286  ", PID=" << iter->pid);
287  kill(iter->pid, sig);
288  }
289 }
290 
292 {
293  if (!TheInstance)
294  TheInstance = new Coordinator;
295  // XXX: if the Coordinator job quits, this pointer will become invalid
296  // we could make Coordinator death fatal, except during exit, but since
297  // Strands do not re-register, even process death would be pointless.
298  return TheInstance;
299 }
300 
301 const Ipc::StrandCoords&
303 {
304  return strands_;
305 }
306 
void receive(const TypedMsgHdr &message) override
Definition: Coordinator.cc:79
@ mtStrandReady
an mtFindStrand answer: the strand exists and should be usable
Definition: Messages.h:26
RequestId requestId
matches the request[or] with the response
Definition: Request.h:38
void handleSearchRequest(const StrandSearchRequest &request)
answers or queues the request if the answer is not yet known
Definition: Coordinator.cc:199
StrandCoord strand
messageType-specific coordinates (e.g., sender)
Definition: StrandCoord.h:52
int kidId
internal Squid process number
Definition: StrandCoord.h:31
a request for a listen socket with given parameters
Definition: SharedListen.h:46
const char strandAddrLabel[]
strand's listening address unique label
Definition: Port.cc:23
@ mtCacheMgrResponse
Definition: Messages.h:36
int requestorId
sender-provided return address
Definition: StrandSearch.h:29
int fdNote
index into fd_note() comment strings
Definition: SharedListen.h:36
@ mtStrandRegistered
acknowledges mtRegisterStrand acceptance
Definition: Messages.h:23
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
const Answer & Mine(const Answer &answer)
Definition: QuestionerId.h:56
static CacheManager * GetInstance()
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
Definition: FdNotes.cc:16
@ mtSnmpRequest
Definition: Messages.h:39
"shared listen" is when concurrent processes are listening on the same fd
Definition: SharedListen.h:28
@ mtSharedListenRequest
Definition: Messages.h:28
int rawType() const
Definition: TypedMsgHdr.h:51
void broadcastSignal(int sig) const
send sig to registered strands
Definition: Coordinator.cc:281
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition: Response.cc:43
void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note)
Definition: comm.cc:257
void leave_suid(void)
Definition: tools.cc:559
String tag
set when looking for a matching StrandCoord::tag
Definition: StrandSearch.h:30
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
Definition: Response.cc:31
void handleSnmpResponse(const Snmp::Response &response)
Definition: Coordinator.cc:246
@ mtCacheMgrRequest
Definition: Messages.h:35
void start() override
called by AsyncStart; do not call directly
Definition: Coordinator.cc:39
void handleCacheMgrRequest(const Mgr::Request &request)
Definition: Coordinator.cc:167
void pack(MessageType, TypedMsgHdr &) const
Definition: StrandCoord.cc:54
int requestorId
kidId of the requestor; used for response destination
Definition: Request.h:37
SNMP request.
Definition: Request.h:24
@ mtSnmpResponse
Definition: Messages.h:40
StrandCoord * findStrand(int kidId)
registered strand or NULL
Definition: Coordinator.cc:44
static Coordinator * TheInstance
the only class instance in existence
Definition: Coordinator.h:77
Ip::Address local
Definition: Connection.h:146
cache manager request
Definition: Request.h:23
Waits for and receives incoming IPC messages; kids handle the messages.
Definition: Port.h:21
virtual void receive(const TypedMsgHdr &)=0
Definition: Port.cc:78
void notifySearcher(const StrandSearchRequest &request, const StrandCoord &)
answer the waiting search request
Definition: Coordinator.cc:220
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
SSL Connection
Definition: Session.h:49
Mgr::Action::Pointer createRequestedAction(const Mgr::ActionParams &)
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator)
@ mtFindStrand
a worker requests a strand from Coordinator
Definition: Messages.h:25
asynchronous strand search request
Definition: StrandSearch.h:21
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Coordinates shared activities of Strands (Squid processes or threads)
Definition: Coordinator.h:30
OpenListenerParams params
actual comm_open_sharedListen() parameters
Definition: SharedListen.h:56
Strand location details.
Definition: StrandCoord.h:21
String tag
optional unique well-known key (e.g., cache_dir path)
Definition: StrandCoord.h:34
ActionParams params
action name and parameters
Definition: Request.h:37
void registerStrand(const StrandCoord &)
adds or updates existing
Definition: Coordinator.cc:54
static Coordinator * Instance()
Definition: Coordinator.cc:291
an IPC message carrying StrandCoord
Definition: StrandCoord.h:38
@ mtRegisterStrand
notifies about our strand existence
Definition: Messages.h:22
void handleCacheMgrResponse(const Mgr::Response &response)
Definition: Coordinator.cc:193
void start() override=0
called by AsyncStart; do not call directly
Definition: Port.cc:31
a response to SharedListenRequest
Definition: SharedListen.h:62
Ip::Address addr
will be memset and memcopied
Definition: SharedListen.h:39
QuestionerId qid
the sender of the request
Definition: StrandSearch.h:31
size_type size() const
Definition: SquidString.h:73
const StrandCoords & strands() const
currently registered strands
Definition: Coordinator.cc:302
void enter_suid(void)
Definition: tools.cc:623
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
Definition: Inquirer.cc:171
#define DBG_IMPORTANT
Definition: Stream.h:38
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
Definition: StrandCoords.h:19
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:34
#define MYNAME
Definition: Stream.h:219
String actionName
action name (and credentials realm)
Definition: ActionParams.h:39
void handleRegistrationRequest(const StrandMessage &)
register,ACK
Definition: Coordinator.cc:136
Comm::ConnectionPointer openListenSocket(const SharedListenRequest &request, int &errNo)
calls comm_open_listener()
Definition: Coordinator.cc:254
RequestId mapId
to map future response to the requestor's callback
Definition: SharedListen.h:58
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
int requestorId
kidId of the requestor
Definition: SharedListen.h:54
void handleSnmpRequest(const Snmp::Request &request)
Definition: Coordinator.cc:233
void handleSharedListenRequest(const SharedListenRequest &request)
returns cached socket or calls openListenSocket()
Definition: Coordinator.cc:147
Definition: IpcIoFile.h:23
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
Comm::ConnectionPointer conn
HTTP client connection descriptor.
Definition: Request.h:35

 

Introduction

Documentation

Support

Miscellaneous