Re: Max simultaneous connections limit on per-destination basis

From: Radu Rendec <>
Date: Fri, 04 Nov 2005 17:22:23 +0200

Ok, you're right. I don't know exactly what the list policies are and I
didn't want to spam with large attachments or pieces of code.

I included the 2 functions that I had modified at the end of this
message. Hope it's ok. My hook is destdbUpdateActiveRequests(). The
second argument is the offset that is added to the counter.



On Thu, 2005-11-03 at 23:19 +0100, Henrik Nordstrom wrote:
> On Thu, 3 Nov 2005, Radu Rendec wrote:
> > Is there any chance I could miss a new request or the end of a request?
> Nothing obvious, but it's always easier to comment on the code than
> descriptions of the same..
> Regards
> Henrik

static void
httpRequestFree(void *data)
    clientHttpRequest *http = data;
    clientHttpRequest **H;
    ConnStateData *conn = http->conn;
    StoreEntry *e;
    request_t *request = http->request;
    MemObject *mem = NULL;
    debug(33, 3) ("httpRequestFree: %s\n", storeUrl(http->entry));
    if (!clientCheckTransferDone(http)) {
     * DW: this seems odd here, is it really needed? It causes
     * incomplete transfers to get logged with "000" status
     * code because http->entry becomes NULL.
    if ((e = http->entry)) {
        http->entry = NULL;
        storeUnregister(http->sc, e, http);
    if (http->entry && http->entry->ping_status == PING_WAITING)
    assert(http->log_type < LOG_TYPE_MAX);
    if (http->entry)
    mem = http->entry->mem_obj;
    if (http->out.size || http->log_type) {
    http->al.icp.opcode = ICP_INVALID;
    http->al.url = http->log_uri;
    debug(33, 9) ("httpRequestFree: al.url='%s'\n", http->al.url);
    if (mem) {
        http->al.http.code = mem->reply->sline.status;
        http->al.http.content_type = strBuf(mem->reply->content_type);
    http->al.cache.caddr = conn->log_addr;
    http->al.cache.size = http->out.size;
    http->al.cache.code = http->log_type;
    http->al.cache.msec = tvSubMsec(http->start, current_time);
    if (request) {
        Packer p;
        MemBuf mb;
        packerToMemInit(&p, &mb);
        httpHeaderPackInto(&request->header, &p);
        http->al.http.method = request->method;
        http->al.http.version = request->http_ver;
        http->al.headers.request = xstrdup(mb.buf);
        http->al.hier = request->hier;
        if (request->user_ident[0])
        http->al.cache.ident = request->user_ident;
        http->al.cache.ident = conn->ident;
    clientdbUpdate(conn->peer.sin_addr, http->log_type, PROTO_HTTP, http->out.size);
    destdbUpdateActiveRequests(http->request->original_host, -1);
    if (http->acl_checklist)
    if (request)
    checkFailureRatio(request->err_type, http->al.hier.code);
    if ((e = http->entry)) {
    http->entry = NULL;
    storeUnregister(http->sc, e, http);
    http->sc = NULL;
    /* old_entry might still be set if we didn't yet get the reply
     * code in clientHandleIMSReply() */
    if ((e = http->old_entry)) {
    http->old_entry = NULL;
    storeUnregister(http->old_sc, e, http);
    http->old_sc = NULL;
    assert(http != http->next);
    assert(http->conn->chr != NULL);
    H = &http->conn->chr;
    while (*H) {
    if (*H == http)
    H = &(*H)->next;
    assert(*H != NULL);
    *H = http->next;
    http->next = NULL;
    dlinkDelete(&http->active, &ClientActiveRequests);

static void
clientReadRequest(int fd, void *data)
    ConnStateData *conn = data;
    int parser_return_code = 0;
    int k;
    request_t *request = NULL;
    int size;
    void *p;
    method_t method;
    clientHttpRequest *http = NULL;
    clientHttpRequest **H = NULL;
    char *prefix = NULL;
    ErrorState *err = NULL;
    fde *F = &fd_table[fd];
    int len = conn->in.size - conn->in.offset - 1;
    debug(33, 4) ("clientReadRequest: FD %d: reading request...\n", fd);
    size = read(fd, conn->in.buf + conn->in.offset, len);
    if (size > 0) {
        fd_bytes(fd, size, FD_READ);
        kb_incr(&statCounter.client_http.kbytes_in, size);
     * Don't reset the timeout value here. The timeout value will be
     * set to Config.Timeout.request by httpAccept() and
     * clientWriteComplete(), and should apply to the request as a
     * whole, not individual read() calls. Plus, it breaks our
     * lame half-close detection
    commSetSelect(fd, COMM_SELECT_READ, clientReadRequest, conn, 0);
    if (size == 0) {
        if (conn->chr == NULL) {
            /* no current or pending requests */
        } else if (!Config.onoff.half_closed_clients) {
            /* admin doesn't want to support half-closed client sockets */
        /* It might be half-closed, we can't tell */
        debug(33, 5) ("clientReadRequest: FD %d closed?\n", fd);
        F->flags.socket_eof = 1;
        conn->defer.until = squid_curtime + 1;
        fd_note(fd, "half-closed");
    } else if (size < 0) {
        if (!ignoreErrno(errno)) {
            debug(50, 2) ("clientReadRequest: FD %d: %s\n", fd, xstrerror());
        } else if (conn->in.offset == 0) {
            debug(50, 2) ("clientReadRequest: FD %d: no data to process (%s)\n", fd, xstrerror());
        /* Continue to process previously read data */
        size = 0;
    conn->in.offset += size;
    /* Skip leading (and trailing) whitespace */
    while (conn->in.offset > 0) {
        int nrequests;
        size_t req_line_sz;
        while (conn->in.offset > 0 && xisspace(conn->in.buf[0])) {
            xmemmove(conn->in.buf, conn->in.buf + 1, conn->in.offset - 1);
        conn->in.buf[conn->in.offset] = '\0'; /* Terminate the string */
        if (conn->in.offset == 0)
        /* Limit the number of concurrent requests to 2 */
        for (H = &conn->chr, nrequests = 0; *H; H = &(*H)->next, nrequests++);
        if (nrequests >= (Config.onoff.pipeline_prefetch ? 2 : 1)) {
            debug(33, 3) ("clientReadRequest: FD %d max concurrent requests reached\n", fd);
            debug(33, 5) ("clientReadRequest: FD %d defering new request until one is done\n", fd);
            conn->defer.until = squid_curtime + 100; /* Reset when a request is complete */
        /* Process request */
        http = parseHttpRequest(conn,
        if (!http)
        if (http) {
            assert(http->req_sz > 0);
            conn->in.offset -= http->req_sz;
            assert(conn->in.offset >= 0);
            debug(33, 5) ("conn->in.offset = %d\n", (int) conn->in.offset);
             * If we read past the end of this request, move the remaining
             * data to the beginning
            if (conn->in.offset > 0)
                xmemmove(conn->in.buf, conn->in.buf + http->req_sz, conn->in.offset);
            /* add to the client request queue */
            for (H = &conn->chr; *H; H = &(*H)->next);
            *H = http;
             * I wanted to lock 'http' here since its callback data for
             * clientLifetimeTimeout(), but there's no logical place to
             * cbdataUnlock if the timeout never happens. Maybe its safe
             * enough to assume that if the FD is open, and the timeout
             * triggers, that 'http' is valid.
            commSetTimeout(fd, Config.Timeout.lifetime, clientLifetimeTimeout, http);
            if (parser_return_code < 0) {
                debug(33, 1) ("clientReadRequest: FD %d Invalid Request\n", fd);
                err = errorCon(ERR_INVALID_REQ, HTTP_BAD_REQUEST);
                err->request_hdrs = xstrdup(conn->in.buf);
                http->entry = clientCreateStoreEntry(http, method, null_request_flags);
                errorAppendEntry(http->entry, err);
            if ((request = urlParse(method, http->uri)) == NULL) {
                debug(33, 5) ("Invalid URL: %s\n", http->uri);
                err = errorCon(ERR_INVALID_URL, HTTP_BAD_REQUEST);
                err->src_addr = conn->peer.sin_addr;
                err->url = xstrdup(http->uri);
                http->al.http.code = err->http_status;
                http->entry = clientCreateStoreEntry(http, method, null_request_flags);
                errorAppendEntry(http->entry, err);
            } else {
                /* compile headers */
                /* we should skip request line! */
                if (!httpRequestParseHeader(request, prefix + req_line_sz))
                    debug(33, 1) ("Failed to parse request headers: %s\n%s\n",
                        http->uri, prefix);
                /* continue anyway? */
            request->flags.accelerated = http->flags.accel;
            if (!http->flags.internal) {
                if (internalCheck(strBuf(request->urlpath))) {
                    if (internalHostnameIs(request->host) &&
                        request->port == ntohs(Config.Sockaddr.http->s.sin_port)) {
                        http->flags.internal = 1;
                    } else if (internalStaticCheck(strBuf(request->urlpath))) {
                        xstrncpy(request->host, internalHostname(), SQUIDHOSTNAMELEN);
                        request->port = ntohs(Config.Sockaddr.http->s.sin_port);
                        http->flags.internal = 1;
             * cache the Content-length value in request_t.
            request->content_length = httpHeaderGetInt(&request->header,
            request->flags.internal = http->flags.internal;
            http->log_uri = xstrdup(urlCanonicalClean(request));
            request->client_addr = conn->peer.sin_addr;
            request->my_addr = conn->me.sin_addr;
            request->my_port = ntohs(conn->me.sin_port);
            request->http_ver = http->http_ver;
            if (!urlCheckRequest(request)) {
                err = errorCon(ERR_UNSUP_REQ, HTTP_NOT_IMPLEMENTED);
                err->src_addr = conn->peer.sin_addr;
                err->request = requestLink(request);
                request->flags.proxy_keepalive = 0;
                http->al.http.code = err->http_status;
                http->entry = clientCreateStoreEntry(http, request->method, null_request_flags);
                errorAppendEntry(http->entry, err);
            if (0 == clientCheckContentLength(request)) {
                err = errorCon(ERR_INVALID_REQ, HTTP_LENGTH_REQUIRED);
                err->src_addr = conn->peer.sin_addr;
                err->request = requestLink(request);
                http->al.http.code = err->http_status;
                http->entry = clientCreateStoreEntry(http, request->method, null_request_flags);
                errorAppendEntry(http->entry, err);
            http->request = requestLink(request);
             * We need to set the keepalive flag before doing some
             * hacks for POST/PUT requests below. Maybe we could
             * set keepalive flag even earlier.
             * break here if the request has a content-length
             * because there is a reqeust body following and we
             * don't want to parse it as though it was new request.
            if (request->content_length >= 0) {
                int copy_len = XMIN(conn->in.offset, request->content_length);
                if (copy_len > 0) {
                    assert(conn->in.offset >= copy_len);
                    request->body_sz = copy_len;
                    request->body = xmalloc(request->body_sz);
                    xmemcpy(request->body, conn->in.buf, request->body_sz);
                    conn->in.offset -= copy_len;
                    if (conn->in.offset)
                        xmemmove(conn->in.buf, conn->in.buf + copy_len, conn->in.offset);
                 * if we didn't get the full body now, then more will
                 * be arriving on the client socket. Lets cancel
                 * the read handler until this request gets forwarded.
                if (request->body_sz < request->content_length)
                    commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
                if (request->content_length < 0)
                    (void) 0;
                else if (clientRequestBodyTooLarge(request->content_length)) {
                    err = errorCon(ERR_TOO_BIG, HTTP_REQUEST_ENTITY_TOO_LARGE);
                    err->request = requestLink(request);
                    http->entry = clientCreateStoreEntry(http,
                        METHOD_NONE, null_request_flags);
                    errorAppendEntry(http->entry, err);
                destdbUpdateActiveRequests(http->request->host, 1);
            continue; /* while offset > 0 */
        } else if (parser_return_code == 0) {
             * Partial request received; reschedule until parseHttpRequest()
             * is happy with the input
            k = conn->in.size - 1 - conn->in.offset;
            if (k == 0) {
                if (conn->in.offset >= Config.maxRequestHeaderSize) {
                    /* The request is too large to handle */
                    debug(33, 1) ("Request header is too large (%d bytes)\n",
                        (int) conn->in.offset);
                    debug(33, 1) ("Config 'request_header_max_size'= %d bytes.\n",
                    err = errorCon(ERR_TOO_BIG, HTTP_REQUEST_ENTITY_TOO_LARGE);
                    http = parseHttpRequestAbort(conn, "error:request-too-large");
                    /* add to the client request queue */
                    for (H = &conn->chr; *H; H = &(*H)->next);
                    *H = http;
                    http->entry = clientCreateStoreEntry(http, METHOD_NONE, null_request_flags);
                    errorAppendEntry(http->entry, err);
                /* Grow the request memory area to accomodate for a large request */
                conn->in.size += CLIENT_REQ_BUF_SZ;
                if (conn->in.size == 2 * CLIENT_REQ_BUF_SZ) {
                    p = conn->in.buf; /* get rid of fixed size Pooled buffer */
                    conn->in.buf = xcalloc(2, CLIENT_REQ_BUF_SZ);
                    xmemcpy(conn->in.buf, p, CLIENT_REQ_BUF_SZ);
                    memFree(p, MEM_CLIENT_REQ_BUF);
                } else
                    conn->in.buf = xrealloc(conn->in.buf, conn->in.size);
                /* XXX account conn->in.buf */
                debug(33, 3) ("Handling a large request, offset=%d inbufsize=%d\n",
                    (int) conn->in.offset, conn->in.size);
                k = conn->in.size - 1 - conn->in.offset;
