Go to the documentation of this file.
33 #include <sys/unistd.h>
51 typedef StrandCoords::iterator SI;
52 for (SI iter = strands_.begin(); iter != strands_.end(); ++iter) {
53 if (iter->kidId == kidId)
64 const String oldTag = found->tag;
69 strands_.push_back(strand);
73 typedef Searchers::iterator SRI;
74 for (SRI i = searchers.begin(); i != searchers.end();) {
75 if (i->tag == strand.
tag) {
76 notifySearcher(*i, strand);
77 i = searchers.erase(i);
88 debugs(54, 6,
"Registration request");
96 handleSearchRequest(sr);
101 debugs(54, 6,
"Shared listen request");
106 debugs(54, 6,
"Cache manager request");
108 handleCacheMgrRequest(req);
113 debugs(54, 6,
"Cache manager response");
115 handleCacheMgrResponse(
Mine(resp));
121 debugs(54, 6,
"SNMP request");
123 handleSnmpRequest(req);
128 debugs(54, 6,
"SNMP response");
130 handleSnmpResponse(
Mine(resp));
143 registerStrand(msg.
strand);
155 " needs shared listen FD for " << request.
params.
addr);
156 Listeners::const_iterator i = listeners.find(request.
params);
159 i->second : openListenSocket(request, errNo);
161 debugs(54, 3,
"sending shared listen " << c <<
" for " <<
163 " mapId=" << request.
mapId);
167 response.
pack(message);
180 }
catch (
const std::exception &ex) {
192 response.
pack(message);
208 typedef StrandCoords::const_iterator SCCI;
209 for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
210 if (i->tag == request.
tag)
215 notifySearcher(request, *strand);
219 searchers.push_back(request);
221 " who " << request.
tag <<
" is");
229 request.
tag <<
" is kid" << strand.
kidId);
244 response.
pack(message);
264 debugs(54, 6,
"opening listen FD at " << p.
addr <<
" for kid" <<
276 debugs(54, 6,
"tried listening on " << newConn <<
" for kid" <<
281 listeners[request.
params] = newConn;
288 typedef StrandCoords::const_iterator SCI;
289 for (SCI iter = strands_.begin(); iter != strands_.end(); ++iter) {
290 debugs(54, 5,
"signal " << sig <<
" to kid" << iter->kidId <<
291 ", PID=" << iter->pid);
292 kill(iter->pid, sig);
void receive(const TypedMsgHdr &message) override
@ mtStrandReady
an mtFindStrand answer: the strand exists and should be usable
RequestId requestId
matches the request[or] with the response
void handleSearchRequest(const StrandSearchRequest &request)
answers or queues the request if the answer is not yet known
StrandCoord strand
messageType-specific coordinates (e.g., sender)
int kidId
internal Squid process number
a request for a listen socket with given parameters
const char strandAddrLabel[]
strand's listening address unique label
int requestorId
sender-provided return address
int fdNote
index into fd_note() comment strings
@ mtStrandRegistered
acknowledges mtRegisterStrand acceptance
bool IsConnOpen(const Comm::ConnectionPointer &conn)
const Answer & Mine(const Answer &answer)
static CacheManager * GetInstance()
const char * FdNote(int fdNodeId)
converts FdNoteId into a string
"shared listen" is when concurrent processes are listening on the same fd
void broadcastSignal(int sig) const
send sig to registered strands
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note)
String tag
set when looking for a matching StrandCoord::tag
void pack(Ipc::TypedMsgHdr &msg) const override
prepare for sendmsg()
void handleSnmpResponse(const Snmp::Response &response)
void start() override
called by AsyncStart; do not call directly
void handleCacheMgrRequest(const Mgr::Request &request)
void pack(MessageType, TypedMsgHdr &) const
int requestorId
kidId of the requestor; used for response destination
StrandCoord * findStrand(int kidId)
registered strand or NULL
static Coordinator * TheInstance
the only class instance in existence
Waits for and receives incoming IPC messages; kids handle the messages.
virtual void receive(const TypedMsgHdr &)=0
void notifySearcher(const StrandSearchRequest &request, const StrandCoord &)
answer the waiting search request
void SendMessage(const String &toAddress, const TypedMsgHdr &message)
Mgr::Action::Pointer createRequestedAction(const Mgr::ActionParams &)
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator)
@ mtFindStrand
a worker requests a strand from Coordinator
asynchronous strand search request
void pack(TypedMsgHdr &hdrMsg) const
prepare for sendmsg()
Coordinates shared activities of Strands (Squid processes or threads)
OpenListenerParams params
actual comm_open_sharedListen() parameters
String tag
optional unique well-known key (e.g., cache_dir path)
ActionParams params
action name and parameters
void registerStrand(const StrandCoord &)
adds or updates existing
static Coordinator * Instance()
an IPC message carrying StrandCoord
@ mtRegisterStrand
notifies about our strand existence
void handleCacheMgrResponse(const Mgr::Response &response)
void start() override=0
called by AsyncStart; do not call directly
a response to SharedListenRequest
Ip::Address addr
will be memset and memcopied
QuestionerId qid
the sender of the request
const StrandCoords & strands() const
currently registered strands
static void HandleRemoteAck(const Response &response)
finds and calls the right Inquirer upon strand's response
std::vector< StrandCoord > StrandCoords
a collection of strand coordinates; the order, if any, is owner-dependent
struct msghdr with a known type, fixed-size I/O and control buffers
String actionName
action name (and credentials realm)
void handleRegistrationRequest(const StrandMessage &)
register,ACK
int xclose(int fd)
POSIX close(2) equivalent.
Comm::ConnectionPointer openListenSocket(const SharedListenRequest &request, int &errNo)
calls comm_open_listener()
RequestId mapId
to map future response to the requestor's callback
#define debugs(SECTION, LEVEL, CONTENT)
int requestorId
kidId of the requestor
void handleSnmpRequest(const Snmp::Request &request)
void handleSharedListenRequest(const SharedListenRequest &request)
returns cached socket or calls openListenSocket()
static void Start(const Pointer &job)
Comm::ConnectionPointer conn
HTTP client connection descriptor.