Please, help us to better know about our user community by answering the following short survey: https://forms.gle/wpyrxWi18ox9Z5ae9
 
Loading...
Searching...
No Matches
RunQueue.h
1// This file is part of Eigen, a lightweight C++ template library
2// for linear algebra.
3//
4// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
5//
6// This Source Code Form is subject to the terms of the Mozilla
7// Public License v. 2.0. If a copy of the MPL was not distributed
8// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
9
10#ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
11#define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
12
13namespace Eigen {
14
15// RunQueue is a fixed-size, partially non-blocking deque or Work items.
16// Operations on front of the queue must be done by a single thread (owner),
17// operations on back of the queue can be done by multiple threads concurrently.
18//
19// Algorithm outline:
20// All remote threads operating on the queue back are serialized by a mutex.
21// This ensures that at most two threads access state: owner and one remote
22// thread (Size aside). The algorithm ensures that the occupied region of the
23// underlying array is logically continuous (can wraparound, but no stray
24// occupied elements). Owner operates on one end of this region, remote thread
25// operates on the other end. Synchronization between these threads
26// (potential consumption of the last element and take up of the last empty
27// element) happens by means of state variable in each element. States are:
28// empty, busy (in process of insertion of removal) and ready. Threads claim
29// elements (empty->busy and ready->busy transitions) by means of a CAS
30// operation. The finishing transition (busy->empty and busy->ready) are done
31// with plain store as the element is exclusively owned by the current thread.
32//
33// Note: we could permit only pointers as elements, then we would not need
34// separate state variable as null/non-null pointer value would serve as state,
35// but that would require malloc/free per operation for large, complex values
36// (and this is designed to store std::function<()>).
37template <typename Work, unsigned kSize>
38class RunQueue {
39 public:
40 RunQueue() : front_(0), back_(0) {
41 // require power-of-two for fast masking
42 eigen_plain_assert((kSize & (kSize - 1)) == 0);
43 eigen_plain_assert(kSize > 2); // why would you do this?
44 eigen_plain_assert(kSize <= (64 << 10)); // leave enough space for counter
45 for (unsigned i = 0; i < kSize; i++)
46 array_[i].state.store(kEmpty, std::memory_order_relaxed);
47 }
48
49 ~RunQueue() { eigen_plain_assert(Size() == 0); }
50
51 // PushFront inserts w at the beginning of the queue.
52 // If queue is full returns w, otherwise returns default-constructed Work.
53 Work PushFront(Work w) {
54 unsigned front = front_.load(std::memory_order_relaxed);
55 Elem* e = &array_[front & kMask];
56 uint8_t s = e->state.load(std::memory_order_relaxed);
57 if (s != kEmpty ||
58 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
59 return w;
60 front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
61 e->w = std::move(w);
62 e->state.store(kReady, std::memory_order_release);
63 return Work();
64 }
65
66 // PopFront removes and returns the first element in the queue.
67 // If the queue was empty returns default-constructed Work.
68 Work PopFront() {
69 unsigned front = front_.load(std::memory_order_relaxed);
70 Elem* e = &array_[(front - 1) & kMask];
71 uint8_t s = e->state.load(std::memory_order_relaxed);
72 if (s != kReady ||
73 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
74 return Work();
75 Work w = std::move(e->w);
76 e->state.store(kEmpty, std::memory_order_release);
77 front = ((front - 1) & kMask2) | (front & ~kMask2);
78 front_.store(front, std::memory_order_relaxed);
79 return w;
80 }
81
82 // PushBack adds w at the end of the queue.
83 // If queue is full returns w, otherwise returns default-constructed Work.
84 Work PushBack(Work w) {
85 std::unique_lock<std::mutex> lock(mutex_);
86 unsigned back = back_.load(std::memory_order_relaxed);
87 Elem* e = &array_[(back - 1) & kMask];
88 uint8_t s = e->state.load(std::memory_order_relaxed);
89 if (s != kEmpty ||
90 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
91 return w;
92 back = ((back - 1) & kMask2) | (back & ~kMask2);
93 back_.store(back, std::memory_order_relaxed);
94 e->w = std::move(w);
95 e->state.store(kReady, std::memory_order_release);
96 return Work();
97 }
98
99 // PopBack removes and returns the last elements in the queue.
100 Work PopBack() {
101 if (Empty()) return Work();
102 std::unique_lock<std::mutex> lock(mutex_);
103 unsigned back = back_.load(std::memory_order_relaxed);
104 Elem* e = &array_[back & kMask];
105 uint8_t s = e->state.load(std::memory_order_relaxed);
106 if (s != kReady ||
107 !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
108 return Work();
109 Work w = std::move(e->w);
110 e->state.store(kEmpty, std::memory_order_release);
111 back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
112 return w;
113 }
114
115 // PopBackHalf removes and returns half last elements in the queue.
116 // Returns number of elements removed.
117 unsigned PopBackHalf(std::vector<Work>* result) {
118 if (Empty()) return 0;
119 std::unique_lock<std::mutex> lock(mutex_);
120 unsigned back = back_.load(std::memory_order_relaxed);
121 unsigned size = Size();
122 unsigned mid = back;
123 if (size > 1) mid = back + (size - 1) / 2;
124 unsigned n = 0;
125 unsigned start = 0;
126 for (; static_cast<int>(mid - back) >= 0; mid--) {
127 Elem* e = &array_[mid & kMask];
128 uint8_t s = e->state.load(std::memory_order_relaxed);
129 if (n == 0) {
130 if (s != kReady || !e->state.compare_exchange_strong(
131 s, kBusy, std::memory_order_acquire))
132 continue;
133 start = mid;
134 } else {
135 // Note: no need to store temporal kBusy, we exclusively own these
136 // elements.
137 eigen_plain_assert(s == kReady);
138 }
139 result->push_back(std::move(e->w));
140 e->state.store(kEmpty, std::memory_order_release);
141 n++;
142 }
143 if (n != 0)
144 back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
145 return n;
146 }
147
148 // Size returns current queue size.
149 // Can be called by any thread at any time.
150 unsigned Size() const { return SizeOrNotEmpty<true>(); }
151
152 // Empty tests whether container is empty.
153 // Can be called by any thread at any time.
154 bool Empty() const { return SizeOrNotEmpty<false>() == 0; }
155
156 // Delete all the elements from the queue.
157 void Flush() {
158 while (!Empty()) {
159 PopFront();
160 }
161 }
162
163 private:
164 static const unsigned kMask = kSize - 1;
165 static const unsigned kMask2 = (kSize << 1) - 1;
166 struct Elem {
167 std::atomic<uint8_t> state;
168 Work w;
169 };
170 enum {
171 kEmpty,
172 kBusy,
173 kReady,
174 };
175 std::mutex mutex_;
176 // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
177 // front/back, respectively. The remaining bits contain modification counters
178 // that are incremented on Push operations. This allows us to (1) distinguish
179 // between empty and full conditions (if we would use log(kSize) bits for
180 // position, these conditions would be indistinguishable); (2) obtain
181 // consistent snapshot of front_/back_ for Size operation using the
182 // modification counters.
183 std::atomic<unsigned> front_;
184 std::atomic<unsigned> back_;
185 Elem array_[kSize];
186
187 // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false,
188 // only whether the size is 0 is guaranteed to be correct.
189 // Can be called by any thread at any time.
190 template<bool NeedSizeEstimate>
191 unsigned SizeOrNotEmpty() const {
192 // Emptiness plays critical role in thread pool blocking. So we go to great
193 // effort to not produce false positives (claim non-empty queue as empty).
194 unsigned front = front_.load(std::memory_order_acquire);
195 for (;;) {
196 // Capture a consistent snapshot of front/tail.
197 unsigned back = back_.load(std::memory_order_acquire);
198 unsigned front1 = front_.load(std::memory_order_relaxed);
199 if (front != front1) {
200 front = front1;
201 std::atomic_thread_fence(std::memory_order_acquire);
202 continue;
203 }
204 if (NeedSizeEstimate) {
205 return CalculateSize(front, back);
206 } else {
207 // This value will be 0 if the queue is empty, and undefined otherwise.
208 unsigned maybe_zero = ((front ^ back) & kMask2);
209 // Queue size estimate must agree with maybe zero check on the queue
210 // empty/non-empty state.
211 eigen_assert((CalculateSize(front, back) == 0) == (maybe_zero == 0));
212 return maybe_zero;
213 }
214 }
215 }
216
217 EIGEN_ALWAYS_INLINE
218 unsigned CalculateSize(unsigned front, unsigned back) const {
219 int size = (front & kMask2) - (back & kMask2);
220 // Fix overflow.
221 if (size < 0) size += 2 * kSize;
222 // Order of modification in push/pop is crafted to make the queue look
223 // larger than it is during concurrent modifications. E.g. push can
224 // increment size before the corresponding pop has decremented it.
225 // So the computed size can be up to kSize + 1, fix it.
226 if (size > static_cast<int>(kSize)) size = kSize;
227 return static_cast<unsigned>(size);
228 }
229
230 RunQueue(const RunQueue&) = delete;
231 void operator=(const RunQueue&) = delete;
232};
233
234} // namespace Eigen
235
236#endif // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
Namespace containing all symbols from the Eigen library.