TcpLogger.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #include "squid.h"
10 #include "comm.h"
11 #include "comm/Connection.h"
12 #include "comm/ConnOpener.h"
13 #include "comm/Loops.h"
14 #include "comm/Write.h"
15 #include "fatal.h"
16 #include "fde.h"
17 #include "globals.h" // for shutting_down
18 #include "log/CustomLog.h"
19 #include "log/File.h"
20 #include "log/TcpLogger.h"
21 #include "Parsing.h"
22 #include "sbuf/MemBlob.h"
23 #include "SquidConfig.h"
24 
25 // a single I/O buffer should be large enough to store any access.log record
26 const size_t Log::TcpLogger::IoBufSize = 2*MAX_URL;
27 
28 // We need at least two buffers because when we write the first buffer,
29 // we have to use the second buffer to accumulate new entries.
31 
32 #define MY_DEBUG_SECTION 50 /* Log file handling */
33 
35 
36 Log::TcpLogger::TcpLogger(size_t bufCap, bool dieOnErr, Ip::Address them):
37  AsyncJob("TcpLogger"),
38  dieOnError(dieOnErr),
39  bufferCapacity(bufCap),
40  bufferedSize(0),
41  flushDebt(0),
42  quitOnEmpty(false),
43  reconnectScheduled(false),
44  writeScheduled(false),
45  conn(nullptr),
46  remote(them),
47  connectFailures(0),
48  drops(0)
49 {
52  "WARNING: tcp:" << remote << " logger configured buffer " <<
53  "size " << bufferCapacity << " is smaller than the " <<
54  BufferCapacityMin << "-byte" << " minimum. " <<
55  "Using the minimum instead.");
57  }
58 }
59 
61 {
62  // make sure Comm::Write does not have our buffer pointer
63  assert(!writeScheduled);
64 }
65 
66 void
68 {
69  doConnect();
70 }
71 
72 bool
74 {
75  debugs(MY_DEBUG_SECTION, 5, "quitOnEmpty: " << quitOnEmpty <<
76  " buffered: " << bufferedSize <<
77  " conn: " << conn << ' ' << connectFailures);
78 
79  // we do not quit unless we are told that we may
80  if (!quitOnEmpty)
81  return false;
82 
83  /* We were asked to quit after we are done writing buffers. Are we done? */
84 
85  // If we have records but are failing to connect, quit. Otherwise, we may
86  // be trying to connect forever due to a [since fixed] misconfiguration!
87  const bool failingToConnect = !conn && connectFailures;
88  if (bufferedSize && !failingToConnect)
89  return false;
90 
91  return AsyncJob::doneAll();
92 }
93 
94 void
96 {
97  disconnect(); // optional: refcounting should close/delete conn eventually
99 }
100 
101 void
103 {
104  // job call protection must end our job if we are done logging current bufs
105  assert(inCall != nullptr);
106  quitOnEmpty = true;
107  flush();
108 }
109 
110 void
112 {
113  flushDebt = bufferedSize;
114  writeIfNeeded();
115 }
116 
117 void
118 Log::TcpLogger::logRecord(const char *buf, const size_t len)
119 {
120  appendRecord(buf, len);
121  writeIfNeeded();
122 }
123 
125 void
127 {
128  // write if an earlier flush command forces us to write or
129  // if we have filled at least one I/O buffer
130  if (flushDebt > 0 || buffers.size() > 1)
131  writeIfPossible();
132 }
133 
136 {
137  debugs(MY_DEBUG_SECTION, 7, "guards: " << (!writeScheduled) <<
138  (bufferedSize > 0) << (conn != nullptr) <<
139  (conn != nullptr && !fd_table[conn->fd].closing()) << " buffered: " <<
140  bufferedSize << '/' << buffers.size());
141 
142  // XXX: Squid shutdown sequence starts closing our connection before
143  // calling LogfileClose, leading to loss of log records during shutdown.
144  if (!writeScheduled && bufferedSize > 0 && conn != nullptr &&
145  !fd_table[conn->fd].closing()) {
146  debugs(MY_DEBUG_SECTION, 5, "writing first buffer");
147 
148  typedef CommCbMemFunT<TcpLogger, CommIoCbParams> WriteDialer;
150  const MemBlob::Pointer &buffer = buffers.front();
151  Comm::Write(conn, buffer->mem, buffer->size, callback, nullptr);
152  writeScheduled = true;
153  }
154 }
155 
157 bool
158 Log::TcpLogger::canFit(const size_t len) const
159 {
160  // TODO: limit reporting frequency in addition to reporting only changes
161 
162  if (bufferedSize+len <= bufferCapacity) {
163  if (drops) {
164  // We can get here if a shorter record accidentally fits after we
165  // started dropping records. When that happens, the following
166  // DBG_IMPORTANT message will mislead admin into thinking that
167  // the problem was resolved (for a brief period of time, until
168  // another record comes in and overflows the buffer). It is
169  // difficult to prevent this without also creating the opposite
170  // problem: A huge record that does not fit and is dropped blocks
171  // subsequent regular records from being buffered until we write.
172  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
173  " logger stops dropping records after " << drops << " drops" <<
174  "; current buffer use: " << (bufferedSize+len) <<
175  " out of " << bufferCapacity << " bytes");
176  }
177  return true;
178  }
179 
180  if (!drops || dieOnError) {
182  "ERROR: tcp:" << remote << " logger " << bufferCapacity << "-byte " <<
183  "buffer overflowed; cannot fit " <<
184  (bufferedSize+len-bufferCapacity) << " bytes");
185  }
186 
187  if (dieOnError)
188  fatal("tcp logger buffer overflowed");
189 
190  if (!drops) {
191  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
192  " logger starts dropping records.");
193  }
194 
195  return false;
196 }
197 
199 void
200 Log::TcpLogger::appendRecord(const char *record, const size_t len)
201 {
202  // they should not happen, but to be safe, let's protect drop start/stop
203  // monitoring algorithm from empty records (which can never be dropped)
204  if (!len)
205  return;
206 
207  if (!canFit(len)) {
208  ++drops;
209  return;
210  }
211 
212  drops = 0;
213  // append without splitting buf, unless it exceeds IoBufSize
214  for (size_t off = 0; off < len; off += IoBufSize)
215  appendChunk(record + off, min(len - off, IoBufSize));
216 }
217 
219 void
220 Log::TcpLogger::appendChunk(const char *chunk, const size_t len)
221 {
222  Must(len <= IoBufSize);
223  // add a buffer if there is not one that can accommodate len bytes
224  bool addBuffer = buffers.empty() ||
225  (buffers.back()->size+len > IoBufSize);
226  // also add a buffer if there is only one and that one is being written
227  addBuffer = addBuffer || (writeScheduled && buffers.size() == 1);
228 
229  if (addBuffer) {
230  buffers.push_back(new MemBlob(IoBufSize));
231  debugs(MY_DEBUG_SECTION, 7, "added buffer #" << buffers.size());
232  }
233 
234  Must(!buffers.empty());
235  buffers.back()->append(chunk, len);
236  bufferedSize += len;
237 }
238 
240 void
242 {
243  if (shutting_down)
244  return;
245 
246  debugs(MY_DEBUG_SECTION, 3, "connecting");
247  Must(!conn);
248 
249  Comm::ConnectionPointer futureConn = new Comm::Connection;
250  futureConn->remote = remote;
251  futureConn->local.setAnyAddr();
252  if (futureConn->remote.isIPv4())
253  futureConn->local.setIPv4();
254 
257  const auto cs = new Comm::ConnOpener(futureConn, call, 2);
258  connWait.start(cs, call);
259 }
260 
262 void
264 {
265  connWait.finish();
266 
267  if (params.flag != Comm::OK) {
268  const double delay = 0.5; // seconds
269  if (connectFailures++ % 100 == 0) {
270  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "ERROR: tcp:" << remote <<
271  " logger connection attempt #" << connectFailures <<
272  " failed. Will keep trying every " << delay << " seconds.");
273  }
274 
275  if (!reconnectScheduled) {
276  reconnectScheduled = true;
277  eventAdd("Log::TcpLogger::DelayedReconnect",
279  new Pointer(this), 0.5, 0, false);
280  }
281  } else {
282  if (connectFailures > 0) {
283  debugs(MY_DEBUG_SECTION, DBG_IMPORTANT, "tcp:" << remote <<
284  " logger connectivity restored after " <<
285  (connectFailures+1) << " attempts.");
286  connectFailures = 0;
287  }
288 
289  Must(!conn);
290  conn = params.conn;
291 
292  Must(!closer);
294  closer = JobCallback(MY_DEBUG_SECTION, 4, Closer, this, Log::TcpLogger::handleClosure);
295  comm_add_close_handler(conn->fd, closer);
296 
297  writeIfNeeded();
298  }
299 }
300 
301 // XXX: Needed until eventAdd() starts accepting Async calls directly.
303 void
305 {
306  Pointer *ptr = static_cast<Pointer*>(data);
307  assert(ptr);
308  if (TcpLogger *logger = ptr->valid()) {
309  // Get back inside AsyncJob protections by scheduling another call.
310  typedef NullaryMemFunT<TcpLogger> Dialer;
312  logger,
314  ScheduleCallHere(call);
315  }
316  delete ptr;
317 }
318 
320 void
322 {
323  Must(reconnectScheduled);
324  Must(!conn);
325  reconnectScheduled = false;
326  doConnect();
327 }
328 
330 void
332 {
333  writeScheduled = false;
334  if (io.flag == Comm::ERR_CLOSING) {
335  debugs(MY_DEBUG_SECTION, 7, "closing");
336  // do nothing here -- our comm_close_handler will be called to clean up
337  } else if (io.flag != Comm::OK) {
338  debugs(MY_DEBUG_SECTION, 2, "write failure: " << xstrerr(io.xerrno));
339  // keep the first buffer (the one we failed to write)
340  disconnect();
341  doConnect();
342  } else {
343  debugs(MY_DEBUG_SECTION, 5, "write successful");
344 
345  Must(!buffers.empty()); // we had a buffer to write
346  const MemBlob::Pointer &written = buffers.front();
347  const size_t writtenSize = static_cast<size_t>(written->size);
348  // and we wrote the whole buffer
349  Must(io.size == writtenSize);
350  Must(bufferedSize >= writtenSize);
351  bufferedSize -= writtenSize;
352 
353  buffers.pop_front();
354 
355  if (flushDebt > io.size)
356  flushDebt -= io.size;
357  else
358  flushDebt = 0; // wrote everything we owed (or more)
359 
360  writeIfNeeded();
361  }
362 }
363 
366 void
368 {
369  assert(inCall != nullptr);
370  closer = nullptr;
371  if (conn) {
372  conn->noteClosure();
373  conn = nullptr;
374  }
375  // in all current use cases, we should not try to reconnect
376  mustStop("Log::TcpLogger::handleClosure");
377 }
378 
380 void
382 {
383  if (conn != nullptr) {
384  if (closer != nullptr) {
385  comm_remove_close_handler(conn->fd, closer);
386  closer = nullptr;
387  }
388  conn->close();
389  conn = nullptr;
390  }
391 }
392 
397 {
398  if (Pointer *pptr = static_cast<Pointer*>(lf->data))
399  return pptr->get(); // may be nil
400  return nullptr;
401 }
402 
403 void
405 {
406  if (TcpLogger *logger = StillLogging(lf))
407  logger->flush();
408 }
409 
410 void
411 Log::TcpLogger::WriteLine(Logfile * lf, const char *buf, size_t len)
412 {
413  if (TcpLogger *logger = StillLogging(lf))
414  logger->logRecord(buf, len);
415 }
416 
417 void
419 {
420 }
421 
422 void
424 {
426  Flush(lf);
427 }
428 
429 void
431 {
432 }
433 
434 void
436 {
437  if (TcpLogger *logger = StillLogging(lf)) {
438  debugs(50, 3, "Closing " << logger);
439  typedef NullaryMemFunT<TcpLogger> Dialer;
440  Dialer dialer(logger, &Log::TcpLogger::endGracefully);
441  AsyncCall::Pointer call = asyncCall(50, 3, "Log::TcpLogger::endGracefully", dialer);
442  ScheduleCallHere(call);
443  }
444  delete static_cast<Pointer*>(lf->data);
445  lf->data = nullptr;
446 }
447 
448 /*
449  * This code expects the path to be //host:port
450  */
451 int
452 Log::TcpLogger::Open(Logfile * lf, const char *path, size_t bufsz, int fatalFlag)
453 {
454  assert(!StillLogging(lf));
455  debugs(5, 3, "Tcp Open called");
456 
457  Ip::Address addr;
458 
459  if (strncmp(path, "//", 2) == 0)
460  path += 2;
461  char *strAddr = xstrdup(path);
462  if (!GetHostWithPort(strAddr, &addr)) {
463  if (lf->flags.fatal) {
464  fatalf("Invalid TCP logging address '%s'\n", lf->path);
465  } else {
466  debugs(50, DBG_IMPORTANT, "ERROR: Invalid TCP logging address '" << lf->path << "'");
467  safe_free(strAddr);
468  return FALSE;
469  }
470  }
471  safe_free(strAddr);
472 
473  TcpLogger *logger = new TcpLogger(bufsz, fatalFlag, addr);
474  lf->data = new Pointer(logger);
475  lf->f_close = &Close;
476  lf->f_linewrite = &WriteLine;
477  lf->f_linestart = &StartLine;
478  lf->f_lineend = &EndLine;
479  lf->f_flush = &Flush;
480  lf->f_rotate = &Rotate;
481  AsyncJob::Start(logger);
482 
483  return 1;
484 }
485 
void fatal(const char *message)
Definition: fatal.cc:28
const char * xstrerr(int error)
Definition: xstrerror.cc:83
void logRecord(const char *buf, size_t len)
buffers record and possibly writes it to the remote logger
Definition: TcpLogger.cc:118
AsyncCall::Pointer comm_add_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:952
static TcpLogger * StillLogging(Logfile *lf)
Definition: TcpLogger.cc:396
void endGracefully()
Definition: TcpLogger.cc:102
void start() override
called by AsyncStart; do not call directly
Definition: TcpLogger.cc:67
#define DBG_CRITICAL
Definition: Stream.h:37
void handleClosure(const CommCloseCbParams &io)
Definition: TcpLogger.cc:367
void connectDone(const CommConnectCbParams &conn)
Comm::ConnOpener callback.
Definition: TcpLogger.cc:263
#define FALSE
Definition: std-includes.h:56
virtual void swanSong()
Definition: AsyncJob.h:61
#define ScheduleCallHere(call)
Definition: AsyncCall.h:166
void writeIfNeeded()
starts writing if and only if it is time to write accumulated records
Definition: TcpLogger.cc:126
size_type size
maximum allocated memory in use by callers
Definition: MemBlob.h:112
#define xstrdup
struct SquidConfig::@97 onoff
bool canFit(const size_t len) const
whether len more bytes can be buffered
Definition: TcpLogger.cc:158
static void EndLine(Logfile *lf)
Definition: TcpLogger.cc:423
#define MY_DEBUG_SECTION
Definition: TcpLogger.cc:32
@ OK
Definition: Flag.h:16
static void Flush(Logfile *lf)
Definition: TcpLogger.cc:404
bool isIPv4() const
Definition: Address.cc:178
void disconnect()
close our connection now, without flushing
Definition: TcpLogger.cc:381
LOGWRITE * f_linewrite
Definition: File.h:58
@ ERR_CLOSING
Definition: Flag.h:24
unsigned int fatal
Definition: File.h:49
int xerrno
The last errno to occur. non-zero if flag is Comm::COMM_ERROR.
Definition: CommCalls.h:83
static void StartLine(Logfile *lf)
Definition: TcpLogger.cc:418
RefCount< AsyncCallT< Dialer > > asyncCall(int aDebugSection, int aDebugLevel, const char *aName, const Dialer &aDialer)
Definition: AsyncCall.h:156
static int Open(Logfile *lf, const char *path, size_t bufSz, int fatalFlag)
Definition: TcpLogger.cc:452
void * data
Definition: File.h:55
#define MAX_URL
Definition: defines.h:76
static void Close(Logfile *lf)
Definition: TcpLogger.cc:435
virtual bool doneAll() const
whether positive goal has been reached
Definition: AsyncJob.cc:112
int buffered_logs
Definition: SquidConfig.h:284
Ip::Address local
Definition: Connection.h:146
LOGLINEEND * f_lineend
Definition: File.h:59
Comm::ConnectionPointer conn
Definition: CommCalls.h:80
#define safe_free(x)
Definition: xalloc.h:73
Ip::Address remote
Definition: Connection.h:149
#define assert(EX)
Definition: assert.h:17
SSL Connection
Definition: Session.h:49
void fatalf(const char *fmt,...)
Definition: fatal.cc:68
bool setIPv4()
Definition: Address.cc:244
static void WriteLine(Logfile *lf, const char *buf, size_t len)
Definition: TcpLogger.cc:411
CBDATA_NAMESPACED_CLASS_INIT(Log, TcpLogger)
Comm::Flag flag
comm layer result status.
Definition: CommCalls.h:82
static void DelayedReconnect(void *data)
Log::TcpLogger::delayedReconnect() wrapper.
Definition: TcpLogger.cc:304
LOGCLOSE * f_close
Definition: File.h:62
#define JobCallback(dbgSection, dbgLevel, Dialer, job, method)
Convenience macro to create a Dialer-based job callback.
Definition: AsyncJobCalls.h:70
void swanSong() override
Definition: TcpLogger.cc:95
void Write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE *free_func)
Definition: Write.cc:33
LOGROTATE * f_rotate
Definition: File.h:61
static const size_t BufferCapacityMin
minimum bufferCapacity value
Definition: TcpLogger.h:92
Definition: File.h:38
Definition: Config.h:17
Ip::Address remote
where the remote logger expects our records
Definition: TcpLogger.h:109
#define fd_table
Definition: fde.h:189
void appendChunk(const char *chunk, const size_t len)
buffer a record chunk without splitting it across buffers
Definition: TcpLogger.cc:220
bool GetHostWithPort(char *token, Ip::Address *ipa)
Definition: Parsing.cc:257
void flush()
write all currently buffered records ASAP
Definition: TcpLogger.cc:111
LOGFLUSH * f_flush
Definition: File.h:60
static const size_t IoBufSize
fixed I/O buffer size
Definition: TcpLogger.h:91
#define Must(condition)
Definition: TextException.h:75
void setAnyAddr()
NOTE: Does NOT clear the Port stored. Only the Address and Type.
Definition: Address.cc:197
void delayedReconnect()
"sleep a little before trying to connect again" event callback
Definition: TcpLogger.cc:321
#define DBG_IMPORTANT
Definition: Stream.h:38
struct Logfile::@70 flags
static void Rotate(Logfile *lf, const int16_t)
Definition: TcpLogger.cc:430
int shutting_down
void writeDone(const CommIoCbParams &io)
Comm::Write callback.
Definition: TcpLogger.cc:331
TcpLogger(size_t, bool, Ip::Address)
Definition: TcpLogger.cc:36
void appendRecord(const char *buf, size_t len)
buffer a record that might exceed IoBufSize
Definition: TcpLogger.cc:200
LOGLINESTART * f_linestart
Definition: File.h:57
void writeIfPossible()
starts writing if possible
Definition: TcpLogger.cc:135
char path[MAXPATHLEN]
Definition: File.h:46
char * mem
raw allocated memory block
Definition: MemBlob.h:110
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
const A & min(A const &lhs, A const &rhs)
bool doneAll() const override
whether positive goal has been reached
Definition: TcpLogger.cc:73
void comm_remove_close_handler(int fd, CLCB *handler, void *data)
Definition: comm.cc:981
size_t bufferCapacity
bufferedSize limit
Definition: TcpLogger.h:100
void eventAdd(const char *name, EVH *func, void *arg, double when, int weight, bool cbdata)
Definition: event.cc:107
class SquidConfig Config
Definition: SquidConfig.cc:12
~TcpLogger() override
Definition: TcpLogger.cc:60
static void Start(const Pointer &job)
Definition: AsyncJob.cc:37
void doConnect()
starts [re]connecting to the remote logger
Definition: TcpLogger.cc:241

 

Introduction

Documentation

Support

Miscellaneous