46 typedef StrandCoords::iterator SI;
47 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
48 if (iter->kidId == kidId)
59 const String oldTag = found->tag;
64 strands_.push_back(strand);
68 typedef Searchers::iterator SRI;
69 for (SRI i = searchers.begin(); i != searchers.end();) {
70 if (i->tag == strand.
tag) {
71 notifySearcher(*i, strand);
72 i = searchers.erase(i);
83 debugs(54, 6,
"Registration request");
91 handleSearchRequest(sr);
96 debugs(54, 6,
"Shared listen request");
101 debugs(54, 6,
"Cache manager request");
103 handleCacheMgrRequest(req);
108 debugs(54, 6,
"Cache manager response");
110 handleCacheMgrResponse(
Mine(resp));
116 debugs(54, 6,
"SNMP request");
118 handleSnmpRequest(req);
123 debugs(54, 6,
"SNMP response");
125 handleSnmpResponse(
Mine(resp));
138 registerStrand(msg.
strand);
150 " needs shared listen FD for " << request.
params.
addr);
151 Listeners::const_iterator i = listeners.find(request.
params);
154 i->second : openListenSocket(request, errNo);
156 debugs(54, 3,
"sending shared listen " << c <<
" for " <<
158 " mapId=" << request.
mapId);
162 response.
pack(message);
175 }
catch (
const std::exception &ex) {
179 ::close(request.
conn->
fd);
187 response.
pack(message);
203 typedef StrandCoords::const_iterator SCCI;
204 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
205 if (i->tag == request.
tag)
210 notifySearcher(request, *strand);
214 searchers.push_back(request);
216 " who " << request.
tag <<
" is");
224 request.
tag <<
" is kid" << strand.
kidId);
239 response.
pack(message);
259 debugs(54, 6,
"opening listen FD at " << p.
addr <<
" for kid" <<
271 debugs(54, 6,
"tried listening on " << newConn <<
" for kid" <<
276 listeners[request.
params] = newConn;
283 typedef StrandCoords::const_iterator SCI;
284 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
285 debugs(54, 5,
"signal " << sig <<
" to kid" << iter->kidId <<
286 ", PID=" << iter->pid);
287 kill(iter->pid, sig);
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator)
static void Start(const Pointer &job)
Mgr::Action::Pointer createRequestedAction(const Mgr::ActionParams &)
static CacheManager * GetInstance()
Coordinates shared activities of Strands (Squid processes or threads)
StrandCoord * findStrand(int kidId)
registered strand or NULL
void handleSharedListenRequest(const SharedListenRequest &request)
returns cached socket or calls openListenSocket()
Comm::ConnectionPointer openListenSocket(const SharedListenRequest &request, int &errNo)
calls comm_open_listener()
void receive(const TypedMsgHdr &message) override
void handleSnmpRequest(const Snmp::Request &request)
static Coordinator * TheInstance
the only class instance in existence
const StrandCoords & strands() const
currently registered strands
void handleCacheMgrRequest(const Mgr::Request &request)
void notifySearcher(const StrandSearchRequest &request, const StrandCoord &)
answer the waiting search request
void handleSnmpResponse(const Snmp::Response &response)
void start() override
called by AsyncStart; do not call directly
static Coordinator * Instance()
void registerStrand(const StrandCoord &)
adds or updates existing
void handleSearchRequest(const StrandSearchRequest &request)
answers or queues the request if the answer is not yet known
void handleCacheMgrResponse(const Mgr::Response &response)
void handleRegistrationRequest(const StrandMessage &)
register,ACK
void broadcastSignal(int sig) const
send sig to registered strands
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
"shared listen" is when concurrent processes are listening on the same fd
int fdNote
index into fd_note() comment strings
Ip::Address addr
will be memset and memcopied
Waits for and receives incoming IPC messages; kids handle the messages.
void start() override=0
called by AsyncStart; do not call directly
virtual void receive(const TypedMsgHdr &)=0
int requestorId
kidId of the requestor; used for response destination
RequestId requestId
matches the request[or] with the response
a request for a listen socket with given parameters
OpenListenerParams params
actual comm_open_sharedListen() parameters
int requestorId
kidId of the requestor
RequestId mapId
to map future response to the requestor's callback
a response to SharedListenRequest
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
String tag
optional unique well-known key (e.g., cache_dir path)
int kidId
internal Squid process number
an IPC message carrying StrandCoord
void pack(MessageType, TypedMsgHdr &) const
StrandCoord strand
messageType-specific coordinates (e.g., sender)
asynchronous strand search request
int requestorId
sender-provided return address
String tag
set when looking for a matching StrandCoord::tag
QuestionerId qid
the sender of the request
struct msghdr with a known type, fixed-size I/O and control buffers
String actionName
action name (and credentials realm)
Comm::ConnectionPointer conn
HTTP client connection descriptor.
ActionParams params
action name and parameters
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note)
#define debugs(SECTION, LEVEL, CONTENT)
bool IsConnOpen(const Comm::ConnectionPointer &conn)
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
const char strandAddrLabel[]
strand's listening address unique label
const Answer & Mine(const Answer &answer)
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
@ mtRegisterStrand
notifies about our strand existence
@ mtStrandReady
an mtFindStrand answer: the strand exists and should be usable
@ mtFindStrand
a worker requests a strand from Coordinator
@ mtStrandRegistered
acknowledges mtRegisterStrand acceptance
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
static bool action(int fd, size_t metasize, const char *fn, const char *url, const SquidMetaList &meta)