SharedListen.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/AsyncCallbacks.h"
13 #include "base/TextException.h"
14 #include "comm.h"
15 #include "comm/Connection.h"
16 #include "globals.h"
17 #include "ipc/Kids.h"
18 #include "ipc/Messages.h"
19 #include "ipc/Port.h"
20 #include "ipc/SharedListen.h"
21 #include "ipc/StartListening.h"
22 #include "ipc/TypedMsgHdr.h"
23 #include "tools.h"
24 
25 #include <list>
26 #include <map>
27 
30 {
31 public:
34 };
35 
37 typedef std::map<Ipc::RequestId::Index, PendingOpenRequest> SharedListenRequestMap;
39 
41 typedef std::list<PendingOpenRequest> DelayedSharedListenRequests;
43 
44 // TODO: Encapsulate "Pending Request Map" logic shared by all RequestId users.
49 {
50  static Ipc::RequestId::Index LastIndex = 0;
51  // TODO: Switch Ipc::RequestId::Index to uint64_t and drop these 0 checks.
52  if (++LastIndex == 0) // don't use zero value as an ID
53  ++LastIndex;
55  TheSharedListenRequestMap[LastIndex] = por;
56  return LastIndex;
57 }
58 
59 bool
61 {
62  if (sock_type != p.sock_type)
63  return sock_type < p.sock_type;
64 
65  if (proto != p.proto)
66  return proto < p.proto;
67 
68  // ignore flags and fdNote differences because they do not affect binding
69 
70  return addr.compareWhole(p.addr) < 0;
71 }
72 
74  requestorId(KidIdentifier),
75  params(aParams),
76  mapId(aMapId)
77 {
78  // caller will then set public data members
79 }
80 
82 {
84  hdrMsg.getPod(*this);
85 }
86 
88 {
90  hdrMsg.putPod(*this);
91 }
92 
93 Ipc::SharedListenResponse::SharedListenResponse(const int aFd, const int anErrNo, const RequestId aMapId):
94  fd(aFd), errNo(anErrNo), mapId(aMapId)
95 {
96 }
97 
99  fd(-1),
100  errNo(0)
101 {
103  hdrMsg.getPod(*this);
104  fd = hdrMsg.getFd();
105  // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
106 }
107 
109 {
111  hdrMsg.putPod(*this);
112  // XXX: When we respond with an error, putFd() throws due to the negative fd
113  hdrMsg.putFd(fd);
114 }
115 
116 static void
118 {
119  const Ipc::SharedListenRequest request(por.params, Ipc::RequestId(AddToMap(por)));
120 
121  debugs(54, 3, "getting listening FD for " << request.params.addr <<
122  " mapId=" << request.mapId);
123 
124  Ipc::TypedMsgHdr message;
125  request.pack(message);
127 }
128 
129 static void
131 {
132  if (TheDelayedRequests.empty())
133  return; // no pending requests to resume
134 
135  debugs(54, 3, "resuming with " << TheSharedListenRequestMap.size() <<
136  " active + " << TheDelayedRequests.size() << " delayed requests");
137 
139  TheDelayedRequests.pop_front();
140 }
141 
142 void
144 {
145  PendingOpenRequest por;
146  por.params = params;
147  por.callback = cb;
148 
149  const DelayedSharedListenRequests::size_type concurrencyLimit = 1;
150  if (TheSharedListenRequestMap.size() >= concurrencyLimit) {
151  debugs(54, 3, "waiting for " << TheSharedListenRequestMap.size() <<
152  " active + " << TheDelayedRequests.size() << " delayed requests");
153  TheDelayedRequests.push_back(por);
154  } else {
156  }
157 }
158 
160 {
161  // Dont debugs c fully since only FD is filled right now.
162  debugs(54, 3, "got listening FD " << response.fd << " errNo=" <<
163  response.errNo << " mapId=" << response.mapId << " with " <<
164  TheSharedListenRequestMap.size() << " active + " <<
165  TheDelayedRequests.size() << " delayed requests");
166 
167  Must(response.mapId);
168  const auto pori = TheSharedListenRequestMap.find(response.mapId.index());
169  Must(pori != TheSharedListenRequestMap.end());
170  auto por = pori->second;
171  Must(por.callback);
172  TheSharedListenRequestMap.erase(pori);
173 
174  auto &answer = por.callback.answer();
175  Assure(answer.conn);
176  auto &conn = answer.conn;
177  conn->fd = response.fd;
178 
179  if (Comm::IsConnOpen(conn)) {
180  OpenListenerParams &p = por.params;
181  conn->local = p.addr;
182  conn->flags = p.flags;
183  // XXX: leave the comm AI stuff to comm_import_opened()?
184  struct addrinfo *AI = nullptr;
185  p.addr.getAddrInfo(AI);
186  AI->ai_socktype = p.sock_type;
187  AI->ai_protocol = p.proto;
188  comm_import_opened(conn, FdNote(p.fdNote), AI);
190  }
191 
192  answer.errNo = response.errNo;
193  ScheduleCallHere(por.callback.release());
194 
196 }
197 
int fd
opened listening socket or -1
Definition: SharedListen.h:73
SharedListenRequest(const OpenListenerParams &, RequestId aMapId)
sender's constructor
Definition: SharedListen.cc:73
void setType(int aType)
sets message type; use MessageType enum
Definition: TypedMsgHdr.cc:100
void checkType(int aType) const
Definition: TypedMsgHdr.cc:94
int KidIdentifier
@ mtSharedListenResponse
Definition: Messages.h:29
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
a request for a listen socket with given parameters
Definition: SharedListen.h:46
int fdNote
index into fd_note() comment strings
Definition: SharedListen.h:36
static void FreeAddr(struct addrinfo *&ai)
Definition: Address.cc:706
void comm_import_opened(const Comm::ConnectionPointer &conn, const char *note, struct addrinfo *AI)
update Comm state after getting a comm_open() FD from another process
Definition: comm.cc:549
int compareWhole(const Ip::Address &rhs) const
Definition: Address.cc:744
bool operator<(const OpenListenerParams &p) const
useful for map<>
Definition: SharedListen.cc:60
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
Definition: FdNotes.cc:16
"shared listen" is when concurrent processes are listening on the same fd
Definition: SharedListen.h:28
@ mtSharedListenRequest
Definition: Messages.h:28
int errNo
errno value from comm_open_sharedListen() call
Definition: SharedListen.h:74
void putPod(const Pod &pod)
store POD
Definition: TypedMsgHdr.h:126
static String CoordinatorAddr()
get the IPC message address for coordinator process
Definition: Port.cc:65
Ipc::OpenListenerParams params
actual comm_open_sharedListen() parameters
Definition: SharedListen.cc:32
void JoinSharedListen(const OpenListenerParams &, StartListeningCallback &)
prepare and send SharedListenRequest to Coordinator
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Definition: UdsOp.cc:188
#define assert(EX)
Definition: assert.h:17
int getFd() const
returns stored descriptor
Definition: TypedMsgHdr.cc:217
SharedListenResponse(int fd, int errNo, RequestId aMapId)
sender's constructor
Definition: SharedListen.cc:93
#define Assure(condition)
Definition: Assure.h:35
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
OpenListenerParams params
actual comm_open_sharedListen() parameters
Definition: SharedListen.h:56
static SharedListenRequestMap TheSharedListenRequestMap
Definition: SharedListen.cc:38
holds information necessary to handle JoinListen response
Definition: SharedListen.cc:29
RequestId mapId
to map future response to the requestor's callback
Definition: SharedListen.h:75
static Ipc::RequestId::Index AddToMap(const PendingOpenRequest &por)
Definition: SharedListen.cc:48
static void kickDelayedRequest()
void getAddrInfo(struct addrinfo *&ai, int force=AF_UNSPEC) const
Definition: Address.cc:619
a response to SharedListenRequest
Definition: SharedListen.h:62
Index index() const
Definition: RequestId.h:45
Ip::Address addr
will be memset and memcopied
Definition: SharedListen.h:39
#define Must(condition)
Definition: TextException.h:75
static DelayedSharedListenRequests TheDelayedRequests
Definition: SharedListen.cc:42
unsigned int Index
Definition: RequestId.h:27
struct msghdr with a known type, fixed-size I/O and control buffers
Definition: TypedMsgHdr.h:34
void SharedListenJoined(const SharedListenResponse &response)
process Coordinator response to SharedListenRequest
std::map< Ipc::RequestId::Index, PendingOpenRequest > SharedListenRequestMap
maps ID assigned at request time to the response callback
Definition: SharedListen.cc:37
std::list< PendingOpenRequest > DelayedSharedListenRequests
accumulates delayed requests until they are ready to be sent, in FIFO order
Definition: SharedListen.cc:41
Ipc::StartListeningCallback callback
Definition: SharedListen.cc:33
void putFd(int aFd)
stores descriptor
Definition: TypedMsgHdr.cc:196
void getPod(Pod &pod) const
load POD
Definition: TypedMsgHdr.h:118
RequestId mapId
to map future response to the requestor's callback
Definition: SharedListen.h:58
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
static void SendSharedListenRequest(const PendingOpenRequest &por)
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Definition: SharedListen.cc:87

 

Introduction

Documentation

Support

Miscellaneous