Queue.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3  *
4  * Squid software is distributed under GPLv2+ license and includes
5  * contributions from numerous individuals and organizations.
6  * Please see the COPYING and CONTRIBUTORS files for details.
7  */
8 
9 #ifndef SQUID_SRC_IPC_QUEUE_H
10 #define SQUID_SRC_IPC_QUEUE_H
11 
12 #include "base/InstanceId.h"
13 #include "debug/Stream.h"
14 #include "ipc/mem/FlexibleArray.h"
15 #include "ipc/mem/Pointer.h"
16 #include "util.h"
17 
18 #include <algorithm>
19 #include <atomic>
20 
21 class String;
22 
23 namespace Ipc
24 {
25 
29 {
30 public:
31  QueueReader(); // the initial state is "blocked without a signal"
32 
34  bool blocked() const { return popBlocked.load(); }
35 
37  bool signaled() const { return popSignal.load(); }
38 
40  void block() { popBlocked.store(true); }
41 
43  void unblock() { popBlocked.store(false); }
44 
47  bool raiseSignal() { return blocked() && !popSignal.exchange(true); }
48 
50  void clearSignal() { unblock(); popSignal.store(false); }
51 
52 private:
53  std::atomic<bool> popBlocked;
54  std::atomic<bool> popSignal;
55 
56 public:
57  typedef std::atomic<int> Rate;
59 
60  // we need a signed atomic type because balance may get negative
61  typedef std::atomic<int> AtomicSignedMsec;
65 
68 };
69 
72 {
73 public:
74  QueueReaders(const int aCapacity);
75  size_t sharedMemorySize() const;
76  static size_t SharedMemorySize(const int capacity);
77 
78  const int theCapacity;
80 };
81 
93 {
94 public:
95  // pop() and push() exceptions; TODO: use TextException instead
96  class Full {};
97  class ItemTooLarge {};
98 
99  OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
100 
101  unsigned int maxItemSize() const { return theMaxItemSize; }
102  int size() const { return theSize; }
103  int capacity() const { return theCapacity; }
105 
106  bool empty() const { return !theSize; }
107  bool full() const { return theSize == theCapacity; }
108 
109  static int Bytes2Items(const unsigned int maxItemSize, int size);
110  static int Items2Bytes(const unsigned int maxItemSize, const int size);
111 
113  template<class Value> bool pop(Value &value, QueueReader *const reader = nullptr);
114 
116  template<class Value> bool push(const Value &value, QueueReader *const reader = nullptr);
117 
119  template<class Value> bool peek(Value &value) const;
120 
122  template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
124  template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
125 
126 private:
127  void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
128  void statClose(std::ostream &) const;
129  template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
130  template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
131 
132  // optimization: these non-std::atomic data members are in shared memory,
133  // but each is used only by one process (aside from obscured reporting)
134  unsigned int theIn;
135  unsigned int theOut;
136 
137  std::atomic<uint32_t> theSize;
138  const unsigned int theMaxItemSize;
139  const uint32_t theCapacity;
140 
141  char theBuffer[];
142 };
143 
146 {
147 public:
148  OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
149 
150  size_t sharedMemorySize() const;
151  static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
152 
153  const OneToOneUniQueue &operator [](const int index) const;
154  inline OneToOneUniQueue &operator [](const int index);
155 
156 private:
157  inline const OneToOneUniQueue &front() const;
158 
159 public:
160  const int theCapacity;
161 };
162 
168 {
169 public:
170  BaseMultiQueue(const int aLocalProcessId);
171  virtual ~BaseMultiQueue() {}
172 
174  void clearReaderSignal(const int remoteProcessId);
175 
177  void clearAllReaderSignals();
178 
180  template <class Value> bool pop(int &remoteProcessId, Value &value);
181 
183  template <class Value> bool push(const int remoteProcessId, const Value &value);
184 
186  template<class Value> bool peek(int &remoteProcessId, Value &value) const;
187 
189  template<class Value> void stat(std::ostream &) const;
190 
193 
195  const QueueReader::Balance &balance(const int remoteProcessId) const;
196 
199 
201  const QueueReader::Rate &rateLimit(const int remoteProcessId) const;
202 
204  int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); }
205 
207  int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); }
208 
209 protected:
211  virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0;
212  OneToOneUniQueue &inQueue(const int remoteProcessId);
213 
215  virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0;
216  OneToOneUniQueue &outQueue(const int remoteProcessId);
217 
218  virtual const QueueReader &localReader() const = 0;
220 
221  virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0;
222  QueueReader &remoteReader(const int remoteProcessId);
223 
224  virtual int remotesCount() const = 0;
225  virtual int remotesIdOffset() const = 0;
226 
227 protected:
228  const int theLocalProcessId;
229 
230 private:
232 };
233 
243 {
244 public:
247 
248 private:
250  struct Metadata {
251  Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
252  size_t sharedMemorySize() const { return sizeof(*this); }
253  static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
254 
255  const int theGroupASize;
256  const int theGroupAIdOffset;
257  const int theGroupBSize;
258  const int theGroupBIdOffset;
259  };
260 
261 public:
262  class Owner
263  {
264  public:
265  Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
266  ~Owner();
267 
268  private:
272  };
273 
274  static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
275 
276  enum Group { groupA = 0, groupB = 1 };
277  FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
278 
280  static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity);
281 
284  template<class Value> bool findOldest(const int remoteProcessId, Value &value) const;
285 
286 protected:
287  const OneToOneUniQueue &inQueue(const int remoteProcessId) const override;
288  const OneToOneUniQueue &outQueue(const int remoteProcessId) const override;
289  const QueueReader &localReader() const override;
290  const QueueReader &remoteReader(const int processId) const override;
291  int remotesCount() const override;
292  int remotesIdOffset() const override;
293 
294 private:
295  bool validProcessId(const Group group, const int processId) const;
296  int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
297  const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const;
298  int readerIndex(const Group group, const int processId) const;
299  Group localGroup() const { return theLocalGroup; }
300  Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
301 
302 private:
306 
308 };
309 
317 {
318 public:
321 
322 private:
324  struct Metadata {
325  Metadata(const int aProcessCount, const int aProcessIdOffset);
326  size_t sharedMemorySize() const { return sizeof(*this); }
327  static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); }
328 
329  const int theProcessCount;
331  };
332 
333 public:
334  class Owner
335  {
336  public:
337  Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
338  ~Owner();
339 
340  private:
344  };
345 
346  static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity);
347 
348  MultiQueue(const String &id, const int localProcessId);
349 
350 protected:
351  const OneToOneUniQueue &inQueue(const int remoteProcessId) const override;
352  const OneToOneUniQueue &outQueue(const int remoteProcessId) const override;
353  const QueueReader &localReader() const override;
354  const QueueReader &remoteReader(const int remoteProcessId) const override;
355  int remotesCount() const override;
356  int remotesIdOffset() const override;
357 
358 private:
359  bool validProcessId(const int processId) const;
360  const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const;
361  const QueueReader &reader(const int processId) const;
362 
363 private:
367 };
368 
369 // OneToOneUniQueue
370 
371 template <class Value>
372 bool
373 OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
374 {
375  if (sizeof(value) > theMaxItemSize)
376  throw ItemTooLarge();
377 
378  // A writer might push between the empty test and block() below, so we do
379  // not return false right after calling block(), but test again.
380  if (empty()) {
381  if (!reader)
382  return false;
383 
384  reader->block();
385  // A writer might push between the empty test and block() below,
386  // so we must test again as such a writer will not signal us.
387  if (empty())
388  return false;
389  }
390 
391  if (reader)
392  reader->unblock();
393 
394  const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
395  memcpy(&value, theBuffer + pos, sizeof(value));
396  --theSize;
397 
398  return true;
399 }
400 
401 template <class Value>
402 bool
403 OneToOneUniQueue::peek(Value &value) const
404 {
405  if (sizeof(value) > theMaxItemSize)
406  throw ItemTooLarge();
407 
408  if (empty())
409  return false;
410 
411  // the reader may pop() before we copy; making this method imprecise
412  const unsigned int pos = (theOut % theCapacity) * theMaxItemSize;
413  memcpy(&value, theBuffer + pos, sizeof(value));
414  return true;
415 }
416 
417 template <class Value>
418 bool
419 OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
420 {
421  if (sizeof(value) > theMaxItemSize)
422  throw ItemTooLarge();
423 
424  if (full())
425  throw Full();
426 
427  const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
428  memcpy(theBuffer + pos, &value, sizeof(value));
429  const bool wasEmpty = !theSize++;
430 
431  return wasEmpty && (!reader || reader->raiseSignal());
432 }
433 
434 template <class Value>
435 void
436 OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
437 {
438  os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
439  // Nobody can modify our theOut so, after capturing some valid theSize value
440  // in count, we can reliably report all [theOut, theOut+count) items that
441  // were queued at theSize capturing time. We will miss new items push()ed by
442  // the other side, but it is OK -- we report state at the capturing time.
443  const auto count = theSize.load();
444  statOpen(os, "other", "popIndex", count);
445  statSamples<Value>(os, theOut, count);
446  statClose(os);
447 }
448 
449 template <class Value>
450 void
451 OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
452 {
453  os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
454  // Nobody can modify our theIn so, after capturing some valid theSize value
455  // in count, we can reliably report all [theIn-count, theIn) items that were
456  // queued at theSize capturing time. We may report items already pop()ed by
457  // the other side, but that is OK because pop() does not modify items -- it
458  // only increments theOut.
459  const auto count = theSize.load();
460  statOpen(os, "pushIndex", "other", count);
461  statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
462  statClose(os);
463 }
464 
466 template <class Value>
467 void
468 OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
469 {
470  if (!count) {
471  os << " ";
472  return;
473  }
474 
475  os << ", items: [\n";
476  // report a few leading and trailing items, without repetitions
477  const auto sampleSize = std::min(3U, count); // leading (and max) sample
478  statRange<Value>(os, start, sampleSize);
479  if (sampleSize < count) { // the first sample did not show some items
480  // The `start` offset aside, the first sample reported all items
481  // below the sampleSize offset. The second sample needs to report
482  // the last sampleSize items (i.e. starting at count-sampleSize
483  // offset) except those already reported by the first sample.
484  const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
485  const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
486 
487  // but first we print a sample separator, unless there are no items
488  // between the samples or the separator hides the only unsampled item
489  const auto bothSamples = sampleSize + secondSampleSize;
490  if (bothSamples + 1U == count)
491  statRange<Value>(os, start + sampleSize, 1);
492  else if (count > bothSamples)
493  os << " # ... " << (count - bothSamples) << " items not shown ...\n";
494 
495  statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
496  }
497  os << " ]";
498 }
499 
501 template <class Value>
502 void
503 OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
504 {
505  assert(sizeof(Value) <= theMaxItemSize);
506  auto offset = start;
507  for (uint32_t i = 0; i < n; ++i) {
508  // XXX: Throughout this C++ header, these overflow wrapping tricks work
509  // only because theCapacity currently happens to be a power of 2 (e.g.,
510  // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
511  const auto pos = (offset++ % theCapacity) * theMaxItemSize;
512  Value value;
513  memcpy(&value, theBuffer + pos, sizeof(value));
514  os << " { ";
515  value.stat(os);
516  os << " },\n";
517  }
518 }
519 
520 // OneToOneUniQueues
521 
522 inline OneToOneUniQueue &
524 {
525  return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
526 }
527 
528 inline const OneToOneUniQueue &
530 {
531  const char *const queue =
532  reinterpret_cast<const char *>(this) + sizeof(*this);
533  return *reinterpret_cast<const OneToOneUniQueue *>(queue);
534 }
535 
536 // BaseMultiQueue
537 
538 template <class Value>
539 bool
540 BaseMultiQueue::pop(int &remoteProcessId, Value &value)
541 {
542  // iterate all remote processes, starting after the one we visited last
543  for (int i = 0; i < remotesCount(); ++i) {
547  if (queue.pop(value, &localReader())) {
548  remoteProcessId = theLastPopProcessId;
549  debugs(54, 7, "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
550  return true;
551  }
552  }
553  return false; // no process had anything to pop
554 }
555 
556 template <class Value>
557 bool
558 BaseMultiQueue::push(const int remoteProcessId, const Value &value)
559 {
560  OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId);
561  QueueReader &reader = remoteReader(remoteProcessId);
562  debugs(54, 7, "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
563  return remoteQueue.push(value, &reader);
564 }
565 
566 template <class Value>
567 bool
568 BaseMultiQueue::peek(int &remoteProcessId, Value &value) const
569 {
570  // mimic FewToFewBiQueue::pop() but quit just before popping
571  int popProcessId = theLastPopProcessId; // preserve for future pop()
572  for (int i = 0; i < remotesCount(); ++i) {
573  if (++popProcessId >= remotesIdOffset() + remotesCount())
574  popProcessId = remotesIdOffset();
575  const OneToOneUniQueue &queue = inQueue(popProcessId);
576  if (queue.peek(value)) {
577  remoteProcessId = popProcessId;
578  return true;
579  }
580  }
581  return false; // most likely, no process had anything to pop
582 }
583 
584 template <class Value>
585 void
586 BaseMultiQueue::stat(std::ostream &os) const
587 {
588  for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
589  const auto &queue = inQueue(processId);
590  queue.statIn<Value>(os, theLocalProcessId, processId);
591  }
592 
593  os << "\n";
594 
595  for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
596  const auto &queue = outQueue(processId);
597  queue.statOut<Value>(os, theLocalProcessId, processId);
598  }
599 
600  os << "\n";
601 
602  const auto &reader = localReader();
603  os << " kid" << theLocalProcessId << " reader flags: " <<
604  "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n";
605 }
606 
607 // FewToFewBiQueue
608 
609 template <class Value>
610 bool
611 FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const
612 {
613  // we may be called before remote process configured its queue end
614  if (!validProcessId(remoteGroup(), remoteProcessId))
615  return false;
616 
617  // we need the oldest value, so start with the incoming, them-to-us queue:
618  const OneToOneUniQueue &in = inQueue(remoteProcessId);
619  debugs(54, 2, "peeking from " << remoteProcessId << " to " <<
620  theLocalProcessId << " at " << in.size());
621  if (in.peek(value))
622  return true;
623 
624  // if the incoming queue is empty, check the outgoing, us-to-them queue:
625  const OneToOneUniQueue &out = outQueue(remoteProcessId);
626  debugs(54, 2, "peeking from " << theLocalProcessId << " to " <<
627  remoteProcessId << " at " << out.size());
628  return out.peek(value);
629 }
630 
631 } // namespace Ipc
632 
633 #endif /* SQUID_SRC_IPC_QUEUE_H */
634 
bool validProcessId(const int processId) const
Definition: Queue.cc:386
void clearAllReaderSignals()
clears all reader notifications received by the local process
Definition: Queue.cc:172
bool signaled() const
whether writer has sent and reader has not received notification
Definition: Queue.h:37
int sharedMemorySize() const
Definition: Queue.h:104
virtual int remotesIdOffset() const =0
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:67
std::atomic< uint32_t > theSize
number of items in the queue
Definition: Queue.h:137
const QueueReader & localReader() const override
Definition: Queue.cc:424
const QueueReader & localReader() const override
Definition: Queue.cc:318
Group remoteGroup() const
Definition: Queue.h:300
std::atomic< int > AtomicSignedMsec
Definition: Queue.h:61
virtual const QueueReader & localReader() const =0
Definition: Queue.cc:210
const OneToOneUniQueue & front() const
Definition: Queue.h:529
const int theProcessIdOffset
Definition: Queue.h:330
void stat(std::ostream &) const
prints current state; suitable for cache manager reports
Definition: Queue.h:586
virtual ~BaseMultiQueue()
Definition: Queue.h:171
const QueueReader & remoteReader(const int processId) const override
Definition: Queue.cc:324
Rate rateLimit
pop()s per second limit if positive
Definition: Queue.h:58
std::atomic< bool > popBlocked
whether the reader is blocked on pop()
Definition: Queue.h:53
QueueReader::Balance & localBalance()
returns local reader's balance
Definition: Queue.h:192
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:343
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:233
void statSamples(std::ostream &, unsigned int start, uint32_t size) const
report a sample of [start, start + size) items
Definition: Queue.h:468
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:342
bool push(const int remoteProcessId, const Value &value)
calls OneToOneUniQueue::push() using the given process queue
Definition: Queue.h:558
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:295
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
Definition: Queue.h:139
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:447
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:418
bool raiseSignal()
Definition: Queue.h:47
const A & max(A const &lhs, A const &rhs)
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:64
const QueueReader & reader(const int processId) const
Definition: Queue.cc:404
int theLastPopProcessId
the ID of the last process we tried to pop() from
Definition: Queue.h:231
size_t sharedMemorySize() const
Definition: Queue.h:252
int remotesCount() const override
Definition: Queue.cc:436
std::atomic< int > Rate
pop()s per second
Definition: Queue.h:57
QueueReader::Rate & localRateLimit()
returns local reader's rate limit
Definition: Queue.h:198
OneToOneUniQueue::Full Full
Definition: Queue.h:245
OneToOneUniQueue::ItemTooLarge ItemTooLarge
Definition: Queue.h:320
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:365
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
OneToOneUniQueue::Full Full
Definition: Queue.h:319
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
size_t sharedMemorySize() const
Definition: Queue.cc:127
shared array of QueueReaders
Definition: Queue.h:71
static size_t SharedMemorySize(const int, const int, const int, const int)
Definition: Queue.h:253
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:351
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:303
bool peek(Value &value) const
returns true iff the value was set; the value may be stale!
Definition: Queue.h:403
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:364
int capacity() const
Definition: Queue.h:103
void statOut(std::ostream &, int localProcessId, int remoteProcessId) const
prints outgoing queue state; suitable for cache manager reports
Definition: Queue.h:451
void unblock()
removes the block() effects
Definition: Queue.h:43
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:343
size_t sharedMemorySize() const
Definition: Queue.h:326
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:368
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:119
int inSize(const int remoteProcessId) const
number of items in incoming queue from a given remote process
Definition: Queue.h:204
int remotesCount() const override
Definition: Queue.cc:330
bool findOldest(const int remoteProcessId, Value &value) const
Definition: Queue.h:611
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:133
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:50
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:269
virtual int remotesCount() const =0
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
const int theCapacity
Definition: Queue.h:78
bool pop(int &remoteProcessId, Value &value)
picks a process and calls OneToOneUniQueue::pop() using its queue
Definition: Queue.h:540
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
const QueueReader & remoteReader(const int remoteProcessId) const override
Definition: Queue.cc:430
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:152
static size_t SharedMemorySize(const int, const int)
Definition: Queue.h:327
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:247
Mem::Owner< QueueReaders > *const readersOwner
Definition: Queue.h:271
Ipc::Mem::FlexibleArray< QueueReader > theReaders
number of readers
Definition: Queue.h:79
#define assert(EX)
Definition: assert.h:17
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:305
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:412
void block()
marks the reader as blocked, waiting for a notification signal
Definition: Queue.h:40
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:393
bool empty() const
Definition: Queue.h:106
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:302
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
Definition: Queue.cc:102
bool peek(int &remoteProcessId, Value &value) const
peeks at the item likely to be pop()ed next
Definition: Queue.h:568
const int theCapacity
Definition: Queue.h:160
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:187
Shared metadata for MultiQueue.
Definition: Queue.h:324
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:304
void statIn(std::ostream &, int localProcessId, int remoteProcessId) const
prints incoming queue state; suitable for cache manager reports
Definition: Queue.h:436
std::atomic< bool > popSignal
whether writer has sent and reader has not received notification
Definition: Queue.h:54
const int theProcessCount
Definition: Queue.h:329
void statClose(std::ostream &) const
end state reporting started by statOpen()
Definition: Queue.cc:112
QueueReaders(const int aCapacity)
Definition: Queue.cc:55
AtomicSignedMsec Balance
Definition: Queue.h:62
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:289
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:228
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:180
Shared metadata for FewToFewBiQueue.
Definition: Queue.h:250
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
Mem::Owner< OneToOneUniQueues > *const queuesOwner
Definition: Queue.h:270
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:141
int remotesIdOffset() const override
Definition: Queue.cc:337
const unsigned int theMaxItemSize
maximum item size
Definition: Queue.h:138
size_t sharedMemorySize() const
Definition: Queue.cc:62
unsigned int theIn
current push() position; reporting aside, used only in push()
Definition: Queue.h:134
OneToOneUniQueue::ItemTooLarge ItemTooLarge
Definition: Queue.h:246
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
Definition: Queue.cc:159
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:253
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:373
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:309
int remotesIdOffset() const override
Definition: Queue.cc:442
int outSize(const int remoteProcessId) const
number of items in outgoing queue to a given remote process
Definition: Queue.h:207
bool push(const Value &value, QueueReader *const reader=nullptr)
returns true iff the caller must notify the reader of the pushed item
Definition: Queue.h:419
bool pop(Value &value, QueueReader *const reader=nullptr)
returns true iff the value was set; [un]blocks the reader as needed
Definition: Queue.h:373
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:267
const int theLocalProcessId
process ID of this queue
Definition: Queue.h:228
int size() const
Definition: Queue.h:102
bool blocked() const
whether the reader is waiting for a notification signal
Definition: Queue.h:34
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:453
const Group theLocalGroup
group of this queue
Definition: Queue.h:307
unsigned int theOut
current pop() position; reporting aside, used only in pop()/peek()
Definition: Queue.h:135
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:192
const A & min(A const &lhs, A const &rhs)
void statRange(std::ostream &, unsigned int start, uint32_t n) const
statSamples() helper that reports n items from start
Definition: Queue.h:503
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:366
shared array of OneToOneUniQueues
Definition: Queue.h:145
unsigned int maxItemSize() const
Definition: Queue.h:101
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68
Mem::Owner< Metadata > *const metadataOwner
Definition: Queue.h:341
Definition: IpcIoFile.h:23
bool full() const
Definition: Queue.h:107
Group localGroup() const
Definition: Queue.h:299

 

Introduction

Documentation

Support

Miscellaneous