Queue.cc
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 /* DEBUG: section 54 Interprocess Communication */
10 
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "debug/Stream.h"
14 #include "globals.h"
15 #include "ipc/Queue.h"
16 
17 #include <limits>
18 
20 static String
22 {
23  id.append("__metadata");
24  return id;
25 }
26 
28 static String
30 {
31  id.append("__queues");
32  return id;
33 }
34 
36 static String
38 {
39  id.append("__readers");
40  return id;
41 }
42 
43 /* QueueReader */
44 
46 
47 Ipc::QueueReader::QueueReader(): popBlocked(false), popSignal(false),
48  rateLimit(0), balance(0)
49 {
50  debugs(54, 7, "constructed " << id);
51 }
52 
53 /* QueueReaders */
54 
55 Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
56  theReaders(theCapacity)
57 {
58  Must(theCapacity > 0);
59 }
60 
61 size_t
63 {
64  return SharedMemorySize(theCapacity);
65 }
66 
67 size_t
69 {
70  return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71 }
72 
73 // OneToOneUniQueue
74 
75 Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
76  theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77  theCapacity(aCapacity)
78 {
79  Must(theMaxItemSize > 0);
80  Must(theCapacity > 0);
81 }
82 
83 int
84 Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
85 {
86  assert(maxItemSize > 0);
87  size -= sizeof(OneToOneUniQueue);
88  return size >= 0 ? size / maxItemSize : 0;
89 }
90 
91 int
92 Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
93 {
94  assert(size >= 0);
95  return sizeof(OneToOneUniQueue) + maxItemSize * size;
96 }
97 
101 void
102 Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
103 {
104  os << "{ size: " << count <<
105  ", capacity: " << theCapacity <<
106  ", " << inLabel << ": " << theIn <<
107  ", " << outLabel << ": " << theOut;
108 }
109 
111 void
112 Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
113 {
114  os << "}\n";
115 }
116 
117 /* OneToOneUniQueues */
118 
119 Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
120 {
121  Must(theCapacity > 0);
122  for (int i = 0; i < theCapacity; ++i)
123  new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
124 }
125 
126 size_t
128 {
129  return sizeof(*this) + theCapacity * front().sharedMemorySize();
130 }
131 
132 size_t
133 Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
134 {
135  const int queueSize =
136  OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
137  return sizeof(OneToOneUniQueues) + queueSize * capacity;
138 }
139 
140 const Ipc::OneToOneUniQueue &
142 {
143  Must(0 <= index && index < theCapacity);
144  const size_t queueSize = index ? front().sharedMemorySize() : 0;
145  const char *const queue =
146  reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
147  return *reinterpret_cast<const OneToOneUniQueue *>(queue);
148 }
149 
150 // BaseMultiQueue
151 
152 Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
153  theLocalProcessId(aLocalProcessId),
154  theLastPopProcessId(std::numeric_limits<int>::max() - 1)
155 {
156 }
157 
158 void
159 Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
160 {
161  // Unused remoteProcessId may be useful for at least two optimizations:
162  // * TODO: After QueueReader::popSignal is moved to each OneToOneUniQueue,
163  // we could clear just the remoteProcessId popSignal, further reducing the
164  // number of UDS notifications writers have to send.
165  // * We could adjust theLastPopProcessId to try popping from the
166  // remoteProcessId queue first. That does not seem to help much and might
167  // introduce some bias, so we do not do that for now.
168  clearAllReaderSignals();
169 }
170 
171 void
173 {
174  QueueReader &reader = localReader();
175  debugs(54, 7, "reader: " << reader.id);
176  reader.clearSignal();
177 }
178 
180 Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
181 {
182  const QueueReader &r = remoteReader(remoteProcessId);
183  return r.balance;
184 }
185 
187 Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
188 {
189  const QueueReader &r = remoteReader(remoteProcessId);
190  return r.rateLimit;
191 }
192 
194 Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
195 {
196  const OneToOneUniQueue &queue =
197  const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
198  return const_cast<OneToOneUniQueue &>(queue);
199 }
200 
202 Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
203 {
204  const OneToOneUniQueue &queue =
205  const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
206  return const_cast<OneToOneUniQueue &>(queue);
207 }
208 
211 {
212  const QueueReader &reader =
213  const_cast<const BaseMultiQueue *>(this)->localReader();
214  return const_cast<QueueReader &>(reader);
215 }
216 
218 Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
219 {
220  const QueueReader &reader =
221  const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
222  return const_cast<QueueReader &>(reader);
223 }
224 
225 // FewToFewBiQueue
226 
228 Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
229 {
230  return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
231 }
232 
233 Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
234  BaseMultiQueue(aLocalProcessId),
235  metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
236  queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
237  readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
238  theLocalGroup(aLocalGroup)
239 {
240  Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
241  Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
242 
243  debugs(54, 7, "queue " << id << " reader: " << localReader().id);
244 }
245 
246 int
247 Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
248 {
249  return capacity * groupASize * groupBSize * 2;
250 }
251 
252 bool
253 Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
254 {
255  switch (group) {
256  case groupA:
257  return metadata->theGroupAIdOffset <= processId &&
258  processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
259  case groupB:
260  return metadata->theGroupBIdOffset <= processId &&
261  processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
262  }
263  return false;
264 }
265 
266 int
267 Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
268 {
269  Must(fromGroup != toGroup);
270  assert(validProcessId(fromGroup, fromProcessId));
271  assert(validProcessId(toGroup, toProcessId));
272  int index1;
273  int index2;
274  int offset;
275  if (fromGroup == groupA) {
276  index1 = fromProcessId - metadata->theGroupAIdOffset;
277  index2 = toProcessId - metadata->theGroupBIdOffset;
278  offset = 0;
279  } else {
280  index1 = toProcessId - metadata->theGroupAIdOffset;
281  index2 = fromProcessId - metadata->theGroupBIdOffset;
282  offset = metadata->theGroupASize * metadata->theGroupBSize;
283  }
284  const int index = offset + index1 * metadata->theGroupBSize + index2;
285  return index;
286 }
287 
288 const Ipc::OneToOneUniQueue &
289 Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
290 {
291  return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
292 }
293 
294 const Ipc::OneToOneUniQueue &
295 Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
296 {
297  return oneToOneQueue(remoteGroup(), remoteProcessId,
298  theLocalGroup, theLocalProcessId);
299 }
300 
301 const Ipc::OneToOneUniQueue &
302 Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
303 {
304  return oneToOneQueue(theLocalGroup, theLocalProcessId,
305  remoteGroup(), remoteProcessId);
306 }
307 
308 int
309 Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
310 {
311  Must(validProcessId(group, processId));
312  return group == groupA ?
313  processId - metadata->theGroupAIdOffset :
314  metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
315 }
316 
317 const Ipc::QueueReader &
319 {
320  return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
321 }
322 
323 const Ipc::QueueReader &
324 Ipc::FewToFewBiQueue::remoteReader(const int processId) const
325 {
326  return readers->theReaders[readerIndex(remoteGroup(), processId)];
327 }
328 
329 int
331 {
332  return theLocalGroup == groupA ? metadata->theGroupBSize :
333  metadata->theGroupASize;
334 }
335 
336 int
338 {
339  return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
340  metadata->theGroupAIdOffset;
341 }
342 
343 Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
344  theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
345  theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
346 {
347  Must(theGroupASize > 0);
348  Must(theGroupBSize > 0);
349 }
350 
351 Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
352  metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
353  queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
354  readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
355 {
356 }
357 
359 {
360  delete metadataOwner;
361  delete queuesOwner;
362  delete readersOwner;
363 }
364 
365 // MultiQueue
366 
368 Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
369 {
370  return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
371 }
372 
373 Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
374  BaseMultiQueue(localProcessId),
375  metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
376  queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
377  readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
378 {
379  Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
380  Must(readers->theCapacity == metadata->theProcessCount);
381 
382  debugs(54, 7, "queue " << id << " reader: " << localReader().id);
383 }
384 
385 bool
386 Ipc::MultiQueue::validProcessId(const int processId) const
387 {
388  return metadata->theProcessIdOffset <= processId &&
389  processId < metadata->theProcessIdOffset + metadata->theProcessCount;
390 }
391 
392 const Ipc::OneToOneUniQueue &
393 Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
394 {
395  assert(validProcessId(fromProcessId));
396  assert(validProcessId(toProcessId));
397  const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
398  const int toIndex = toProcessId - metadata->theProcessIdOffset;
399  const int index = fromIndex * metadata->theProcessCount + toIndex;
400  return (*queues)[index];
401 }
402 
403 const Ipc::QueueReader &
404 Ipc::MultiQueue::reader(const int processId) const
405 {
406  assert(validProcessId(processId));
407  const int index = processId - metadata->theProcessIdOffset;
408  return readers->theReaders[index];
409 }
410 
411 const Ipc::OneToOneUniQueue &
412 Ipc::MultiQueue::inQueue(const int remoteProcessId) const
413 {
414  return oneToOneQueue(remoteProcessId, theLocalProcessId);
415 }
416 
417 const Ipc::OneToOneUniQueue &
418 Ipc::MultiQueue::outQueue(const int remoteProcessId) const
419 {
420  return oneToOneQueue(theLocalProcessId, remoteProcessId);
421 }
422 
423 const Ipc::QueueReader &
425 {
426  return reader(theLocalProcessId);
427 }
428 
429 const Ipc::QueueReader &
430 Ipc::MultiQueue::remoteReader(const int processId) const
431 {
432  return reader(processId);
433 }
434 
435 int
437 {
438  return metadata->theProcessCount;
439 }
440 
441 int
443 {
444  return metadata->theProcessIdOffset;
445 }
446 
447 Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
448  theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
449 {
450  Must(theProcessCount > 0);
451 }
452 
453 Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
454  metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
455  queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
456  readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
457 {
458 }
459 
461 {
462  delete metadataOwner;
463  delete queuesOwner;
464  delete readersOwner;
465 }
466 
bool validProcessId(const int processId) const
Definition: Queue.cc:386
void clearAllReaderSignals()
clears all reader notifications received by the local process
Definition: Queue.cc:172
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:67
const QueueReader & localReader() const override
Definition: Queue.cc:424
const QueueReader & localReader() const override
Definition: Queue.cc:318
virtual const QueueReader & localReader() const =0
Definition: Queue.cc:210
static String ReadersId(String id)
constructs QueueReaders ID from parent queue ID
Definition: Queue.cc:37
const QueueReader & remoteReader(const int processId) const override
Definition: Queue.cc:324
Rate rateLimit
pop()s per second limit if positive
Definition: Queue.h:58
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR")
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:233
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:295
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
Definition: Queue.h:139
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:447
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:418
const A & max(A const &lhs, A const &rhs)
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:64
const QueueReader & reader(const int processId) const
Definition: Queue.cc:404
int remotesCount() const override
Definition: Queue.cc:436
#define shm_new(Class)
Definition: Pointer.h:200
std::atomic< int > Rate
pop()s per second
Definition: Queue.h:57
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:365
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
size_t sharedMemorySize() const
Definition: Queue.cc:127
shared array of QueueReaders
Definition: Queue.h:71
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:351
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:303
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:364
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:343
int size
Definition: ModDevPoll.cc:69
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:368
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:119
int remotesCount() const override
Definition: Queue.cc:330
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:133
void append(char const *buf, int len)
Definition: String.cc:130
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:50
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
const int theCapacity
Definition: Queue.h:78
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
const QueueReader & remoteReader(const int remoteProcessId) const override
Definition: Queue.cc:430
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:152
#define shm_old(Class)
Definition: Pointer.h:201
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:247
#define assert(EX)
Definition: assert.h:17
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:305
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:412
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:393
static String MetadataId(String id)
constructs Metadata ID from parent queue ID
Definition: Queue.cc:21
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:302
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
Definition: Queue.cc:102
static String QueuesId(String id)
constructs one-to-one queues ID from parent queue ID
Definition: Queue.cc:29
const int theCapacity
Definition: Queue.h:160
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:187
Shared metadata for MultiQueue.
Definition: Queue.h:324
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:304
const int theProcessCount
Definition: Queue.h:329
void statClose(std::ostream &) const
end state reporting started by statOpen()
Definition: Queue.cc:112
QueueReaders(const int aCapacity)
Definition: Queue.cc:55
AtomicSignedMsec Balance
Definition: Queue.h:62
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:289
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:228
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:180
Shared metadata for FewToFewBiQueue.
Definition: Queue.h:250
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
#define Must(condition)
Definition: TextException.h:75
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:141
int remotesIdOffset() const override
Definition: Queue.cc:337
const unsigned int theMaxItemSize
maximum item size
Definition: Queue.h:138
size_t sharedMemorySize() const
Definition: Queue.cc:62
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
Definition: Queue.cc:159
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:253
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:373
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:309
int remotesIdOffset() const override
Definition: Queue.cc:442
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:267
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:453
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:366
shared array of OneToOneUniQueues
Definition: Queue.h:145
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68
int unsigned int
Definition: stub_fd.cc:19

 

Introduction

Documentation

Support

Miscellaneous