PMDK C++ bindings 1.13.0
This is the C++ bindings documentation for PMDK's libpmemobj.
ringbuf.hpp
1/*
2 * Copyright (c) 2016 Mindaugas Rasiukevicius <rmind at noxt eu>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 */
26
27// SPDX-License-Identifier: BSD-3-Clause
28/* Copyright 2021, Intel Corporation */
29
30#ifndef RINGBUF_HPP
31#define RINGBUF_HPP
32
33#include <cstddef>
34
35#include <errno.h>
36#include <inttypes.h>
37#include <limits.h>
38#include <stdbool.h>
39#include <stddef.h>
40#include <stdio.h>
41#include <stdlib.h>
42#include <string.h>
43
44#include <algorithm>
45#include <atomic>
46#include <cassert>
47#include <memory>
48
50
51#ifdef _WIN32
52#define __predict_false(x) (x)
53#else
54#define __predict_false(x) __builtin_expect((x) != 0, 0)
55#endif /* _WIN32 */
56
57namespace pmem
58{
59
60namespace obj
61{
62
63namespace experimental
64{
65
66namespace ringbuf
67{
68static constexpr size_t RBUF_OFF_MASK = 0x00000000ffffffffUL;
69static constexpr size_t WRAP_LOCK_BIT = 0x8000000000000000UL;
70static constexpr size_t RBUF_OFF_MAX = UINT64_MAX & ~WRAP_LOCK_BIT;
71
72static constexpr size_t WRAP_COUNTER = 0x7fffffff00000000UL;
73static size_t
74WRAP_INCR(size_t x)
75{
76 return ((x + 0x100000000UL) & WRAP_COUNTER);
77}
78
79typedef uint64_t ringbuf_off_t;
80
81struct ringbuf_worker_t {
82 std::atomic<ringbuf_off_t> seen_off;
83 std::atomic<int> registered;
84
85 ringbuf_worker_t()
86 {
87 seen_off.store(0);
88 registered.store(0);
89 }
90};
91
92struct ringbuf_t {
93 /* Ring buffer space. */
94 size_t space;
95
96 /*
97 * The NEXT hand is atomically updated by the producer.
98 * WRAP_LOCK_BIT is set in case of wrap-around; in such case,
99 * the producer can update the 'end' offset.
100 */
101 std::atomic<ringbuf_off_t> next;
102 std::atomic<ringbuf_off_t> end;
103
104 /* The following are updated by the consumer. */
105 std::atomic<ringbuf_off_t> written;
106 unsigned nworkers;
107 std::unique_ptr<ringbuf_worker_t[]> workers;
108
109 /* Set by ringbuf_consume, reset by ringbuf_release. */
110 bool consume_in_progress;
111
117 ringbuf_t(size_t max_workers, size_t length)
118 : workers(new ringbuf_worker_t[max_workers])
119 {
120 if (length >= RBUF_OFF_MASK)
121 throw std::out_of_range("ringbuf length too big");
122
123 written.store(0);
124 next.store(0);
125 end.store(0);
126 space = length;
127 end = RBUF_OFF_MAX;
128 nworkers = max_workers;
129 consume_in_progress = false;
130
131 /* Helgrind/Drd does not understand std::atomic */
132#if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
133 VALGRIND_HG_DISABLE_CHECKING(&next, sizeof(next));
134 VALGRIND_HG_DISABLE_CHECKING(&end, sizeof(end));
135 VALGRIND_HG_DISABLE_CHECKING(&written, sizeof(written));
136
137 for (size_t i = 0; i < max_workers; i++) {
138 VALGRIND_HG_DISABLE_CHECKING(
139 &workers[i].seen_off,
140 sizeof(workers[i].seen_off));
141 VALGRIND_HG_DISABLE_CHECKING(
142 &workers[i].registered,
143 sizeof(workers[i].registered));
144 }
145#endif
146 }
147};
148
149/*
150 * ringbuf_register: register the worker (thread/process) as a producer
151 * and pass the pointer to its local store.
152 */
153inline ringbuf_worker_t *
154ringbuf_register(ringbuf_t *rbuf, unsigned i)
155{
156 ringbuf_worker_t *w = &rbuf->workers[i];
157
158 w->seen_off = RBUF_OFF_MAX;
159 std::atomic_store_explicit<int>(&w->registered, true,
160 std::memory_order_release);
161 return w;
162}
163
164inline void
165ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
166{
167 w->registered = false;
168 (void)rbuf;
169}
170
171/*
172 * stable_nextoff: capture and return a stable value of the 'next' offset.
173 */
174static inline ringbuf_off_t
175stable_nextoff(ringbuf_t *rbuf)
176{
177 ringbuf_off_t next;
178 for (pmem::detail::atomic_backoff backoff;;) {
179 next = std::atomic_load_explicit<ringbuf_off_t>(
180 &rbuf->next, std::memory_order_acquire);
181 if (next & WRAP_LOCK_BIT) {
182 backoff.pause();
183 } else {
184 break;
185 }
186 }
187 assert((next & RBUF_OFF_MASK) < rbuf->space);
188 return next;
189}
190
191/*
192 * stable_seenoff: capture and return a stable value of the 'seen' offset.
193 */
194static inline ringbuf_off_t
195stable_seenoff(ringbuf_worker_t *w)
196{
197 ringbuf_off_t seen_off;
198 for (pmem::detail::atomic_backoff backoff;;) {
199 seen_off = std::atomic_load_explicit<ringbuf_off_t>(
200 &w->seen_off, std::memory_order_acquire);
201 if (seen_off & WRAP_LOCK_BIT) {
202 backoff.pause();
203 } else {
204 break;
205 }
206 }
207 return seen_off;
208}
209
210/*
211 * ringbuf_acquire: request a space of a given length in the ring buffer.
212 *
213 * => On success: returns the offset at which the space is available.
214 * => On failure: returns -1.
215 */
216inline ptrdiff_t
217ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
218{
219 ringbuf_off_t seen, next, target;
220
221 assert(len > 0 && len <= rbuf->space);
222 assert(w->seen_off == RBUF_OFF_MAX);
223
224 do {
225 ringbuf_off_t written;
226
227 /*
228 * Get the stable 'next' offset. Save the observed 'next'
229 * value (i.e. the 'seen' offset), but mark the value as
230 * unstable (set WRAP_LOCK_BIT).
231 *
232 * Note: CAS will issue a memory_order_release for us and
233 * thus ensures that it reaches global visibility together
234 * with new 'next'.
235 */
236 seen = stable_nextoff(rbuf);
237 next = seen & RBUF_OFF_MASK;
238 assert(next < rbuf->space);
239 std::atomic_store_explicit<ringbuf_off_t>(
240 &w->seen_off, next | WRAP_LOCK_BIT,
241 std::memory_order_relaxed);
242
243 /*
244 * Compute the target offset. Key invariant: we cannot
245 * go beyond the WRITTEN offset or catch up with it.
246 */
247 target = next + len;
248 written = rbuf->written;
249 if (__predict_false(next < written && target >= written)) {
250 /* The producer must wait. */
251 std::atomic_store_explicit<ringbuf_off_t>(
252 &w->seen_off, RBUF_OFF_MAX,
253 std::memory_order_release);
254 return -1;
255 }
256
257 if (__predict_false(target >= rbuf->space)) {
258 const bool exceed = target > rbuf->space;
259
260 /*
261 * Wrap-around and start from the beginning.
262 *
263 * If we would exceed the buffer, then attempt to
264 * acquire the WRAP_LOCK_BIT and use the space in
265 * the beginning. If we used all space exactly to
266 * the end, then reset to 0.
267 *
268 * Check the invariant again.
269 */
270 target = exceed ? (WRAP_LOCK_BIT | len) : 0;
271 if ((target & RBUF_OFF_MASK) >= written) {
272 std::atomic_store_explicit<ringbuf_off_t>(
273 &w->seen_off, RBUF_OFF_MAX,
274 std::memory_order_release);
275 return -1;
276 }
277 /* Increment the wrap-around counter. */
278 target |= WRAP_INCR(seen & WRAP_COUNTER);
279 } else {
280 /* Preserve the wrap-around counter. */
281 target |= seen & WRAP_COUNTER;
282 }
283 } while (!std::atomic_compare_exchange_weak<ringbuf_off_t>(
284 &rbuf->next, &seen, target));
285
286 /*
287 * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value
288 * thus indicating that it is stable now.
289 *
290 * No need for memory_order_release, since CAS issued a fence.
291 */
292 std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off,
293 w->seen_off & ~WRAP_LOCK_BIT,
294 std::memory_order_relaxed);
295
296 /*
297 * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed
298 * the remaining space and need to wrap-around), then save the
299 * 'end' offset and release the lock.
300 */
301 if (__predict_false(target & WRAP_LOCK_BIT)) {
302 /* Cannot wrap-around again if consumer did not catch-up. */
303 assert(rbuf->written <= next);
304 assert(rbuf->end == RBUF_OFF_MAX);
305 rbuf->end = next;
306 next = 0;
307
308 /*
309 * Unlock: ensure the 'end' offset reaches global
310 * visibility before the lock is released.
311 */
312 std::atomic_store_explicit<ringbuf_off_t>(
313 &rbuf->next, (target & ~WRAP_LOCK_BIT),
314 std::memory_order_release);
315 }
316 assert((target & RBUF_OFF_MASK) <= rbuf->space);
317 return (ptrdiff_t)next;
318}
319
320/*
321 * ringbuf_produce: indicate the acquired range in the buffer is produced
322 * and is ready to be consumed.
323 */
324inline void
325ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
326{
327 (void)rbuf;
328 assert(w->registered);
329 assert(w->seen_off != RBUF_OFF_MAX);
330 std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off, RBUF_OFF_MAX,
331 std::memory_order_release);
332}
333
334/*
335 * ringbuf_consume: get a contiguous range which is ready to be consumed.
336 *
337 * Nested consumes are not allowed.
338 */
339inline size_t
340ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
341{
342 assert(!rbuf->consume_in_progress);
343
344 ringbuf_off_t written = rbuf->written, next, ready;
345 size_t towrite;
346retry:
347 /*
348 * Get the stable 'next' offset. Note: stable_nextoff() issued
349 * a load memory barrier. The area between the 'written' offset
350 * and the 'next' offset will be the *preliminary* target buffer
351 * area to be consumed.
352 */
353 next = stable_nextoff(rbuf) & RBUF_OFF_MASK;
354 if (written == next) {
355 /* If producers did not advance, then nothing to do. */
356 return 0;
357 }
358
359 /*
360 * Observe the 'ready' offset of each producer.
361 *
362 * At this point, some producer might have already triggered the
363 * wrap-around and some (or all) seen 'ready' values might be in
364 * the range between 0 and 'written'. We have to skip them.
365 */
366 ready = RBUF_OFF_MAX;
367
368 for (unsigned i = 0; i < rbuf->nworkers; i++) {
369 ringbuf_worker_t *w = &rbuf->workers[i];
370 ringbuf_off_t seen_off;
371
372 /*
373 * Skip if the worker has not registered.
374 *
375 * Get a stable 'seen' value. This is necessary since we
376 * want to discard the stale 'seen' values.
377 */
378 if (!std::atomic_load_explicit<int>(&w->registered,
379 std::memory_order_relaxed))
380 continue;
381 seen_off = stable_seenoff(w);
382
383 /*
384 * Ignore the offsets after the possible wrap-around.
385 * We are interested in the smallest seen offset that is
386 * not behind the 'written' offset.
387 */
388 if (seen_off >= written) {
389 ready = std::min<ringbuf_off_t>(seen_off, ready);
390 }
391 assert(ready >= written);
392 }
393
394 /*
395 * Finally, we need to determine whether wrap-around occurred
396 * and deduct the safe 'ready' offset.
397 */
398 if (next < written) {
399 const ringbuf_off_t end =
400 std::min<ringbuf_off_t>(rbuf->space, rbuf->end);
401
402 /*
403 * Wrap-around case. Check for the cut off first.
404 *
405 * Reset the 'written' offset if it reached the end of
406 * the buffer or the 'end' offset (if set by a producer).
407 * However, we must check that the producer is actually
408 * done (the observed 'ready' offsets are clear).
409 */
410 if (ready == RBUF_OFF_MAX && written == end) {
411 /*
412 * Clear the 'end' offset if was set.
413 */
414 if (rbuf->end != RBUF_OFF_MAX) {
415 rbuf->end = RBUF_OFF_MAX;
416 }
417
418 /*
419 * Wrap-around the consumer and start from zero.
420 */
421 written = 0;
422 std::atomic_store_explicit<ringbuf_off_t>(
423 &rbuf->written, written,
424 std::memory_order_release);
425 goto retry;
426 }
427
428 /*
429 * We cannot wrap-around yet; there is data to consume at
430 * the end. The ready range is smallest of the observed
431 * 'ready' or the 'end' offset. If neither is set, then
432 * the actual end of the buffer.
433 */
434 assert(ready > next);
435 ready = std::min<ringbuf_off_t>(ready, end);
436 assert(ready >= written);
437 } else {
438 /*
439 * Regular case. Up to the observed 'ready' (if set)
440 * or the 'next' offset.
441 */
442 ready = std::min<ringbuf_off_t>(ready, next);
443 }
444 towrite = ready - written;
445 *offset = written;
446
447 assert(ready >= written);
448 assert(towrite <= rbuf->space);
449
450 if (towrite)
451 rbuf->consume_in_progress = true;
452
453 return towrite;
454}
455
456/*
457 * ringbuf_release: indicate that the consumed range can now be released.
458 */
459inline void
460ringbuf_release(ringbuf_t *rbuf, size_t nbytes)
461{
462 rbuf->consume_in_progress = false;
463
464 const size_t nwritten = rbuf->written + nbytes;
465
466 assert(rbuf->written <= rbuf->space);
467 assert(rbuf->written <= rbuf->end);
468 assert(nwritten <= rbuf->space);
469
470 rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten;
471}
472
473} /* namespace ringbuf */
474} /* namespace experimental */
475} /* namespace obj*/
476} /* namespace pmem*/
477
478#endif /* RINGBUF_HPP */
Atomic backoff, for time delay.
pmem::obj::array< T, N >::iterator end(pmem::obj::array< T, N > &a)
Non-member end.
Definition: array.hpp:849
Persistent memory namespace.
Definition: allocation_flag.hpp:15