52 #define __predict_false(x) (x)
54 #define __predict_false(x) __builtin_expect((x) != 0, 0)
63 namespace experimental
68 static constexpr
size_t RBUF_OFF_MASK = 0x00000000ffffffffUL;
69 static constexpr
size_t WRAP_LOCK_BIT = 0x8000000000000000UL;
70 static constexpr
size_t RBUF_OFF_MAX = UINT64_MAX & ~WRAP_LOCK_BIT;
72 static constexpr
size_t WRAP_COUNTER = 0x7fffffff00000000UL;
76 return ((x + 0x100000000UL) & WRAP_COUNTER);
79 typedef uint64_t ringbuf_off_t;
81 struct ringbuf_worker_t {
82 std::atomic<ringbuf_off_t> seen_off;
83 std::atomic<int> registered;
101 std::atomic<ringbuf_off_t> next;
102 std::atomic<ringbuf_off_t>
end;
105 std::atomic<ringbuf_off_t> written;
107 std::unique_ptr<ringbuf_worker_t[]> workers;
110 bool consume_in_progress;
117 ringbuf_t(
size_t max_workers,
size_t length)
118 : workers(new ringbuf_worker_t[max_workers])
120 if (length >= RBUF_OFF_MASK)
121 throw std::out_of_range(
"ringbuf length too big");
128 nworkers = max_workers;
129 consume_in_progress =
false;
132 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
133 VALGRIND_HG_DISABLE_CHECKING(&next,
sizeof(next));
134 VALGRIND_HG_DISABLE_CHECKING(&
end,
sizeof(
end));
135 VALGRIND_HG_DISABLE_CHECKING(&written,
sizeof(written));
137 for (
size_t i = 0; i < max_workers; i++) {
138 VALGRIND_HG_DISABLE_CHECKING(
139 &workers[i].seen_off,
140 sizeof(workers[i].seen_off));
141 VALGRIND_HG_DISABLE_CHECKING(
142 &workers[i].registered,
143 sizeof(workers[i].registered));
153 inline ringbuf_worker_t *
154 ringbuf_register(ringbuf_t *rbuf,
unsigned i)
156 ringbuf_worker_t *w = &rbuf->workers[i];
158 w->seen_off = RBUF_OFF_MAX;
159 std::atomic_store_explicit<int>(&w->registered,
true,
160 std::memory_order_release);
165 ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
167 w->registered =
false;
174 static inline ringbuf_off_t
175 stable_nextoff(ringbuf_t *rbuf)
178 for (pmem::detail::atomic_backoff backoff;;) {
179 next = std::atomic_load_explicit<ringbuf_off_t>(
180 &rbuf->next, std::memory_order_acquire);
181 if (next & WRAP_LOCK_BIT) {
187 assert((next & RBUF_OFF_MASK) < rbuf->space);
194 static inline ringbuf_off_t
195 stable_seenoff(ringbuf_worker_t *w)
197 ringbuf_off_t seen_off;
198 for (pmem::detail::atomic_backoff backoff;;) {
199 seen_off = std::atomic_load_explicit<ringbuf_off_t>(
200 &w->seen_off, std::memory_order_acquire);
201 if (seen_off & WRAP_LOCK_BIT) {
217 ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w,
size_t len)
219 ringbuf_off_t seen, next, target;
221 assert(len > 0 && len <= rbuf->space);
222 assert(w->seen_off == RBUF_OFF_MAX);
225 ringbuf_off_t written;
236 seen = stable_nextoff(rbuf);
237 next = seen & RBUF_OFF_MASK;
238 assert(next < rbuf->space);
239 std::atomic_store_explicit<ringbuf_off_t>(
240 &w->seen_off, next | WRAP_LOCK_BIT,
241 std::memory_order_relaxed);
248 written = rbuf->written;
249 if (__predict_false(next < written && target >= written)) {
251 std::atomic_store_explicit<ringbuf_off_t>(
252 &w->seen_off, RBUF_OFF_MAX,
253 std::memory_order_release);
257 if (__predict_false(target >= rbuf->space)) {
258 const bool exceed = target > rbuf->space;
270 target = exceed ? (WRAP_LOCK_BIT | len) : 0;
271 if ((target & RBUF_OFF_MASK) >= written) {
272 std::atomic_store_explicit<ringbuf_off_t>(
273 &w->seen_off, RBUF_OFF_MAX,
274 std::memory_order_release);
278 target |= WRAP_INCR(seen & WRAP_COUNTER);
281 target |= seen & WRAP_COUNTER;
283 }
while (!std::atomic_compare_exchange_weak<ringbuf_off_t>(
284 &rbuf->next, &seen, target));
292 std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off,
293 w->seen_off & ~WRAP_LOCK_BIT,
294 std::memory_order_relaxed);
301 if (__predict_false(target & WRAP_LOCK_BIT)) {
303 assert(rbuf->written <= next);
304 assert(rbuf->end == RBUF_OFF_MAX);
312 std::atomic_store_explicit<ringbuf_off_t>(
313 &rbuf->next, (target & ~WRAP_LOCK_BIT),
314 std::memory_order_release);
316 assert((target & RBUF_OFF_MASK) <= rbuf->space);
317 return (ptrdiff_t)next;
325 ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
328 assert(w->registered);
329 assert(w->seen_off != RBUF_OFF_MAX);
330 std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off, RBUF_OFF_MAX,
331 std::memory_order_release);
340 ringbuf_consume(ringbuf_t *rbuf,
size_t *offset)
342 assert(!rbuf->consume_in_progress);
344 ringbuf_off_t written = rbuf->written, next, ready;
353 next = stable_nextoff(rbuf) & RBUF_OFF_MASK;
354 if (written == next) {
366 ready = RBUF_OFF_MAX;
368 for (
unsigned i = 0; i < rbuf->nworkers; i++) {
369 ringbuf_worker_t *w = &rbuf->workers[i];
370 ringbuf_off_t seen_off;
378 if (!std::atomic_load_explicit<int>(&w->registered,
379 std::memory_order_relaxed))
381 seen_off = stable_seenoff(w);
388 if (seen_off >= written) {
389 ready = std::min<ringbuf_off_t>(seen_off, ready);
391 assert(ready >= written);
398 if (next < written) {
399 const ringbuf_off_t
end =
400 std::min<ringbuf_off_t>(rbuf->space, rbuf->end);
410 if (ready == RBUF_OFF_MAX && written ==
end) {
414 if (rbuf->end != RBUF_OFF_MAX) {
415 rbuf->end = RBUF_OFF_MAX;
422 std::atomic_store_explicit<ringbuf_off_t>(
423 &rbuf->written, written,
424 std::memory_order_release);
434 assert(ready > next);
435 ready = std::min<ringbuf_off_t>(ready,
end);
436 assert(ready >= written);
442 ready = std::min<ringbuf_off_t>(ready, next);
444 towrite = ready - written;
447 assert(ready >= written);
448 assert(towrite <= rbuf->space);
451 rbuf->consume_in_progress =
true;
460 ringbuf_release(ringbuf_t *rbuf,
size_t nbytes)
462 rbuf->consume_in_progress =
false;
464 const size_t nwritten = rbuf->written + nbytes;
466 assert(rbuf->written <= rbuf->space);
467 assert(rbuf->written <= rbuf->end);
468 assert(nwritten <= rbuf->space);
470 rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten;
Atomic backoff, for time delay.
pmem::obj::array< T, N >::iterator end(pmem::obj::array< T, N > &a)
Non-member end.
Definition: array.hpp:849
Persistent memory namespace.
Definition: allocation_flag.hpp:15