PeerPoolMgr.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "AccessLogEntry.h"
11 #include "base/AsyncCallbacks.h"
13 #include "base/RunnersRegistry.h"
14 #include "CachePeer.h"
15 #include "CachePeers.h"
16 #include "comm/Connection.h"
17 #include "comm/ConnOpener.h"
18 #include "debug/Stream.h"
19 #include "fd.h"
20 #include "FwdState.h"
21 #include "globals.h"
22 #include "HttpRequest.h"
23 #include "MasterXaction.h"
24 #include "neighbors.h"
25 #include "pconn.h"
26 #include "PeerPoolMgr.h"
27 #include "sbuf/Stream.h"
29 #include "SquidConfig.h"
30 
32 
34  peer(cbdataReference(aPeer)),
35  transportWait(),
36  encryptionWait(),
37  addrUsed(0)
38 {
39  const auto mx = MasterXaction::MakePortless<XactionInitiator::initPeerPool>();
40 
41  codeContext = new PrecomputedCodeContext("cache_peer standby pool", ToSBuf("current cache_peer standby pool: ", *peer,
42  Debug::Extra, "current master transaction: ", mx->id));
43 
44  // ErrorState, getOutgoingAddress(), and other APIs may require a request.
45  // We fake one. TODO: Optionally send this request to peers?
48 }
49 
51 {
53 }
54 
55 void
57 {
59  checkpoint("peer initialized");
60 }
61 
62 void
64 {
66 }
67 
68 bool
70 {
72 }
73 
74 bool
76 {
77  return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll();
78 }
79 
80 void
82 {
83  transportWait.finish();
84 
85  if (!validPeer()) {
86  debugs(48, 3, "peer gone");
87  if (params.conn != nullptr)
88  params.conn->close();
89  return;
90  }
91 
92  if (params.flag != Comm::OK) {
94  checkpoint("conn opening failure"); // may retry
95  return;
96  }
97 
98  Must(params.conn != nullptr);
99 
100  // Handle TLS peers.
102  // XXX: Exceptions orphan params.conn
103  const auto callback = asyncCallback(48, 4, PeerPoolMgr::handleSecuredPeer, this);
104 
105  const auto peerTimeout = peer->connectTimeout();
106  const int timeUsed = squid_curtime - params.conn->startTime();
107  // Use positive timeout when less than one second is left for conn.
108  const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
109  const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
110  encryptionWait.start(connector, callback);
111  return;
112  }
113 
114  pushNewConnection(params.conn);
115 }
116 
117 void
119 {
120  Must(validPeer());
121  Must(Comm::IsConnOpen(conn));
122  peer->standby.pool->push(conn, nullptr /* domain */);
123  // push() will trigger a checkpoint()
124 }
125 
126 void
128 {
129  encryptionWait.finish();
130 
131  if (!validPeer()) {
132  debugs(48, 3, "peer gone");
133  if (answer.conn != nullptr)
134  answer.conn->close();
135  return;
136  }
137 
138  assert(!answer.tunneled);
139  if (answer.error.get()) {
140  assert(!answer.conn);
141  // PeerConnector calls NoteOutgoingConnectionFailure() for us
142  checkpoint("conn securing failure"); // may retry
143  return;
144  }
145 
146  assert(answer.conn);
147 
148  // The socket could get closed while our callback was queued. Sync
149  // Connection. XXX: Connection::fd may already be stale/invalid here.
150  if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
151  answer.conn->noteClosure();
152  checkpoint("external connection closure"); // may retry
153  return;
154  }
155 
156  pushNewConnection(answer.conn);
157 }
158 
159 void
161 {
162  // KISS: Do nothing else when we are already doing something.
164  debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
165  return; // there will be another checkpoint when we are done opening/securing
166  }
167 
168  // Do not talk to a peer until it is ready.
169  if (!neighborUp(peer)) // provides debugging
170  return; // there will be another checkpoint when peer is up
171 
172  // Do not violate peer limits.
173  if (!peerCanOpenMore(peer)) { // provides debugging
174  peer->standby.waitingForClose = true; // may already be true
175  return; // there will be another checkpoint when a peer conn closes
176  }
177 
178  // Do not violate global restrictions.
179  if (fdUsageHigh()) {
180  debugs(48, 7, "overwhelmed");
181  peer->standby.waitingForClose = true; // may already be true
182  // There will be another checkpoint when a peer conn closes OR when
183  // a future pop() fails due to an empty pool. See PconnPool::pop().
184  return;
185  }
186 
187  peer->standby.waitingForClose = false;
188 
190  Must(peer->n_addresses); // guaranteed by neighborUp() above
191  // cycle through all available IP addresses
192  conn->remote = peer->addresses[addrUsed++ % peer->n_addresses];
193  conn->remote.port(peer->http_port);
194  conn->peerType = STANDBY_POOL; // should be reset by peerSelect()
195  conn->setPeer(peer);
198 
199  const auto ctimeout = peer->connectTimeout();
201  AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
202  const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
203  transportWait.start(cs, callback);
204 }
205 
206 void
208 {
209  debugs(48, 8, howMany);
210  peer->standby.pool->closeN(howMany);
211 }
212 
213 void
214 PeerPoolMgr::checkpoint(const char *reason)
215 {
216  if (!validPeer()) {
217  debugs(48, 3, reason << " and peer gone");
218  return; // nothing to do after our owner dies; the job will quit
219  }
220 
221  const int count = peer->standby.pool->count();
222  const int limit = peer->standby.limit;
223  debugs(48, 7, reason << " with " << count << " ? " << limit);
224 
225  if (count < limit)
227  else if (count > limit)
228  closeOldConnections(count - limit);
229 }
230 
231 void
232 PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason)
233 {
234  if (!mgr.valid()) {
235  debugs(48, 5, reason << " but no mgr");
236  return;
237  }
238 
239  CallService(mgr->codeContext, [&] {
240  CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason);
241  });
242 }
243 
246 {
247 public:
248  /* RegisteredRunner API */
249  void useConfig() override { syncConfig(); }
250  void syncConfig() override;
251 };
252 
254 
255 void
257 {
258  for (const auto &peer: CurrentCachePeers()) {
259  const auto p = peer.get();
260  // On reconfigure, Squid deletes the old config (and old peers in it),
261  // so should always be dealing with a brand new configuration.
262  assert(!p->standby.mgr);
263  assert(!p->standby.pool);
264  if (p->standby.limit) {
265  p->standby.mgr = new PeerPoolMgr(p);
266  p->standby.pool = new PconnPool(p->name, p->standby.mgr);
267  CallService(p->standby.mgr->codeContext, [&] {
268  AsyncJob::Start(p->standby.mgr.get());
269  });
270  }
271  }
272 }
273 
bool waitingForClose
a conn must close before we open a standby conn
Definition: CachePeer.h:212
void handleSecuredPeer(Security::EncryptorAnswer &answer)
Security::PeerConnector callback.
Definition: PeerPoolMgr.cc:127
Maintains an fixed-size "standby" PconnPool for a single CachePeer.
Definition: PeerPoolMgr.h:23
void closeN(int n)
closes any n connections, regardless of their destination
Definition: pconn.cc:516
Ip::Address addresses[10]
Definition: CachePeer.h:179
Cbc * get() const
a temporary valid raw Cbc pointer or NULL
Definition: CbcPointer.h:159
hier_code peerType
Definition: Connection.h:152
bool tunneled
whether we spliced the connections instead of negotiating encryption
JobWait< Security::BlindPeerConnector > encryptionWait
waits for the established transport connection to be secured/encrypted
Definition: PeerPoolMgr.h:72
AnyP::Uri url
the request URI
Definition: HttpRequest.h:115
void useConfig() override
Definition: PeerPoolMgr.cc:249
virtual void swanSong()
Definition: AsyncJob.h:61
CbcPointer< ErrorState > error
problem details (nil on success)
int fdUsageHigh(void)
Definition: fd.cc:273
PconnPool * pool
idle connection pool for this peer
Definition: CachePeer.h:209
void handleOpenedConnection(const CommConnectCbParams &params)
Comm::ConnOpener calls this when done opening a connection for us.
Definition: PeerPoolMgr.cc:81
@ STANDBY_POOL
Definition: hier_code.h:37
JobWait< Comm::ConnOpener > transportWait
waits for a transport connection to the peer to be established/opened
Definition: PeerPoolMgr.h:69
C * getRaw() const
Definition: RefCount.h:89
time_t startTime() const
Definition: Connection.h:120
int cbdataReferenceValid(const void *p)
Definition: cbdata.cc:270
bool IsConnOpen(const Comm::ConnectionPointer &conn)
Definition: Connection.cc:27
@ OK
Definition: Flag.h:16
int limit
the limit itself
Definition: CachePeer.h:211
time_t connectTimeout() const
Definition: CachePeer.cc:120
#define cbdataReference(var)
Definition: cbdata.h:348
CachePeer * peer
the owner of the pool we manage
Definition: PeerPoolMgr.h:65
@ METHOD_OPTIONS
Definition: MethodType.h:31
void getOutgoingAddress(HttpRequest *request, const Comm::ConnectionPointer &conn)
Definition: FwdState.cc:1481
unsigned int addrUsed
counter for cycling through peer addresses
Definition: PeerPoolMgr.h:74
Comm::ConnectionPointer conn
peer connection (secured on success)
bool validPeer() const
whether the peer is still out there and in a valid state we can safely use
Definition: PeerPoolMgr.cc:69
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
launches PeerPoolMgrs for peers configured with standby.limit
Definition: PeerPoolMgr.cc:245
int n_addresses
Definition: CachePeer.h:180
void count(int uses)
struct CachePeer::@32 standby
optional "cache_peer standby=limit" feature
unsigned short port() const
Definition: Address.cc:798
DefineRunnerRegistrator(PeerPoolMgrsRr)
void push(const Comm::ConnectionPointer &serverConn, const char *domain)
Definition: pconn.cc:421
void NoteOutgoingConnectionFailure(CachePeer *const peer)
Definition: CachePeer.h:246
void syncConfig() override
Definition: PeerPoolMgr.cc:256
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
Ip::Address remote
Definition: Connection.h:149
#define assert(EX)
Definition: assert.h:17
SSL Connection
Definition: Session.h:49
void start() override
called by AsyncStart; do not call directly
Definition: PeerPoolMgr.cc:56
~PeerPoolMgr() override
Definition: PeerPoolMgr.cc:50
PrecomputedCodeContextPointer codeContext
Definition: PeerPoolMgr.h:36
void swanSong() override
Definition: PeerPoolMgr.cc:63
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:82
#define cbdataReferenceDone(var)
Definition: cbdata.h:357
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
Definition: AsyncJobCalls.h:70
#define asyncCallback(dbgSection, dbgLevel, method, object)
#define CBDATA_CLASS_INIT(type)
Definition: cbdata.h:325
bool peerCanOpenMore(const CachePeer *p)
Whether we can open new connections to the peer (e.g., despite max-conn)
Definition: neighbors.cc:217
time_t squid_curtime
Definition: stub_libtime.cc:20
static std::ostream & Extra(std::ostream &)
Definition: debug.cc:1316
A PeerConnector for TLS cache_peers and origin servers. No SslBump capabilities.
CodeContext with constant details known at construction time.
#define fd_table
Definition: fde.h:189
void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback)
starts waiting for the given job to call the given callback
Definition: JobWait.h:69
time_t positiveTimeout(const time_t timeout)
Definition: neighbors.cc:1092
void openNewConnection()
starts the process of opening a new standby connection (if possible)
Definition: PeerPoolMgr.cc:160
@ PROTO_HTTP
Definition: ProtocolType.h:25
char * host
Definition: CachePeer.h:66
bool doneAll() const override
whether positive goal has been reached
Definition: PeerPoolMgr.cc:75
PeerPoolMgr(CachePeer *aPeer)
Definition: PeerPoolMgr.cc:33
void checkpoint(const char *reason)
Definition: PeerPoolMgr.cc:214
SBuf ToSBuf(Args &&... args)
slowly stream-prints all arguments into a freshly allocated SBuf
Definition: Stream.h:63
virtual void start()
called by AsyncStart; do not call directly
Definition: AsyncJob.cc:59
#define Must(condition)
Definition: TextException.h:75
int neighborUp(const CachePeer *p)
Definition: neighbors.cc:1058
int shutting_down
void setPeer(CachePeer *p)
Definition: Connection.cc:130
bool encryptTransport
whether transport encryption (TLS/SSL) is to be used on connections to the peer
Definition: PeerOptions.h:147
const CachePeers & CurrentCachePeers()
Definition: CachePeers.cc:43
bool isOpen() const
Definition: Connection.h:101
void CallService(const CodeContext::Pointer &serviceContext, Fun &&service)
Definition: CodeContext.h:151
void pushNewConnection(const Comm::ConnectionPointer &conn)
the final step in connection opening (and, optionally, securing) sequence
Definition: PeerPoolMgr.cc:118
static void Checkpoint(const Pointer &mgr, const char *reason)
Definition: PeerPoolMgr.cc:232
Security::PeerOptions secure
security settings for peer connection
Definition: CachePeer.h:219
void host(const char *src)
Definition: Uri.cc:123
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
unsigned short http_port
Definition: CachePeer.h:104
void closeOldConnections(const int howMany)
closes 'howMany' standby connections
Definition: PeerPoolMgr.cc:207
void GetMarkingsToServer(HttpRequest *request, Comm::Connection &conn)
Definition: FwdState.cc:1560
RefCount< HttpRequest > request
fake HTTP request for conn opening code
Definition: PeerPoolMgr.h:66

 

Introduction

Documentation

Support

Miscellaneous