PMDK C++ bindings  1.13.0-git23.gf49772ac
This is the C++ bindings documentation for PMDK's libpmemobj.
ringbuf.hpp
1 /*
2  * Copyright (c) 2016 Mindaugas Rasiukevicius <rmind at noxt eu>
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  * notice, this list of conditions and the following disclaimer in the
12  * documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  */
26 
27 // SPDX-License-Identifier: BSD-3-Clause
28 /* Copyright 2021, Intel Corporation */
29 
30 #ifndef RINGBUF_HPP
31 #define RINGBUF_HPP
32 
33 #include <cstddef>
34 
35 #include <errno.h>
36 #include <inttypes.h>
37 #include <limits.h>
38 #include <stdbool.h>
39 #include <stddef.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 
44 #include <algorithm>
45 #include <atomic>
46 #include <cassert>
47 #include <memory>
48 
50 
51 #ifdef _WIN32
52 #define __predict_false(x) (x)
53 #else
54 #define __predict_false(x) __builtin_expect((x) != 0, 0)
55 #endif /* _WIN32 */
56 
57 namespace pmem
58 {
59 
60 namespace obj
61 {
62 
63 namespace experimental
64 {
65 
66 namespace ringbuf
67 {
68 static constexpr size_t RBUF_OFF_MASK = 0x00000000ffffffffUL;
69 static constexpr size_t WRAP_LOCK_BIT = 0x8000000000000000UL;
70 static constexpr size_t RBUF_OFF_MAX = UINT64_MAX & ~WRAP_LOCK_BIT;
71 
72 static constexpr size_t WRAP_COUNTER = 0x7fffffff00000000UL;
73 static size_t
74 WRAP_INCR(size_t x)
75 {
76  return ((x + 0x100000000UL) & WRAP_COUNTER);
77 }
78 
79 typedef uint64_t ringbuf_off_t;
80 
81 struct ringbuf_worker_t {
82  std::atomic<ringbuf_off_t> seen_off;
83  std::atomic<int> registered;
84 
85  ringbuf_worker_t()
86  {
87  seen_off.store(0);
88  registered.store(0);
89  }
90 };
91 
92 struct ringbuf_t {
93  /* Ring buffer space. */
94  size_t space;
95 
96  /*
97  * The NEXT hand is atomically updated by the producer.
98  * WRAP_LOCK_BIT is set in case of wrap-around; in such case,
99  * the producer can update the 'end' offset.
100  */
101  std::atomic<ringbuf_off_t> next;
102  std::atomic<ringbuf_off_t> end;
103 
104  /* The following are updated by the consumer. */
105  std::atomic<ringbuf_off_t> written;
106  unsigned nworkers;
107  std::unique_ptr<ringbuf_worker_t[]> workers;
108 
109  /* Set by ringbuf_consume, reset by ringbuf_release. */
110  bool consume_in_progress;
111 
117  ringbuf_t(size_t max_workers, size_t length)
118  : workers(new ringbuf_worker_t[max_workers])
119  {
120  if (length >= RBUF_OFF_MASK)
121  throw std::out_of_range("ringbuf length too big");
122 
123  written.store(0);
124  next.store(0);
125  end.store(0);
126  space = length;
127  end = RBUF_OFF_MAX;
128  nworkers = max_workers;
129  consume_in_progress = false;
130 
131  /* Helgrind/Drd does not understand std::atomic */
132 #if LIBPMEMOBJ_CPP_VG_HELGRIND_ENABLED
133  VALGRIND_HG_DISABLE_CHECKING(&next, sizeof(next));
134  VALGRIND_HG_DISABLE_CHECKING(&end, sizeof(end));
135  VALGRIND_HG_DISABLE_CHECKING(&written, sizeof(written));
136 
137  for (size_t i = 0; i < max_workers; i++) {
138  VALGRIND_HG_DISABLE_CHECKING(
139  &workers[i].seen_off,
140  sizeof(workers[i].seen_off));
141  VALGRIND_HG_DISABLE_CHECKING(
142  &workers[i].registered,
143  sizeof(workers[i].registered));
144  }
145 #endif
146  }
147 };
148 
149 /*
150  * ringbuf_register: register the worker (thread/process) as a producer
151  * and pass the pointer to its local store.
152  */
153 inline ringbuf_worker_t *
154 ringbuf_register(ringbuf_t *rbuf, unsigned i)
155 {
156  ringbuf_worker_t *w = &rbuf->workers[i];
157 
158  w->seen_off = RBUF_OFF_MAX;
159  std::atomic_store_explicit<int>(&w->registered, true,
160  std::memory_order_release);
161  return w;
162 }
163 
164 inline void
165 ringbuf_unregister(ringbuf_t *rbuf, ringbuf_worker_t *w)
166 {
167  w->registered = false;
168  (void)rbuf;
169 }
170 
171 /*
172  * stable_nextoff: capture and return a stable value of the 'next' offset.
173  */
174 static inline ringbuf_off_t
175 stable_nextoff(ringbuf_t *rbuf)
176 {
177  ringbuf_off_t next;
178  for (pmem::detail::atomic_backoff backoff;;) {
179  next = std::atomic_load_explicit<ringbuf_off_t>(
180  &rbuf->next, std::memory_order_acquire);
181  if (next & WRAP_LOCK_BIT) {
182  backoff.pause();
183  } else {
184  break;
185  }
186  }
187  assert((next & RBUF_OFF_MASK) < rbuf->space);
188  return next;
189 }
190 
191 /*
192  * stable_seenoff: capture and return a stable value of the 'seen' offset.
193  */
194 static inline ringbuf_off_t
195 stable_seenoff(ringbuf_worker_t *w)
196 {
197  ringbuf_off_t seen_off;
198  for (pmem::detail::atomic_backoff backoff;;) {
199  seen_off = std::atomic_load_explicit<ringbuf_off_t>(
200  &w->seen_off, std::memory_order_acquire);
201  if (seen_off & WRAP_LOCK_BIT) {
202  backoff.pause();
203  } else {
204  break;
205  }
206  }
207  return seen_off;
208 }
209 
210 /*
211  * ringbuf_acquire: request a space of a given length in the ring buffer.
212  *
213  * => On success: returns the offset at which the space is available.
214  * => On failure: returns -1.
215  */
216 inline ptrdiff_t
217 ringbuf_acquire(ringbuf_t *rbuf, ringbuf_worker_t *w, size_t len)
218 {
219  ringbuf_off_t seen, next, target;
220 
221  assert(len > 0 && len <= rbuf->space);
222  assert(w->seen_off == RBUF_OFF_MAX);
223 
224  do {
225  ringbuf_off_t written;
226 
227  /*
228  * Get the stable 'next' offset. Save the observed 'next'
229  * value (i.e. the 'seen' offset), but mark the value as
230  * unstable (set WRAP_LOCK_BIT).
231  *
232  * Note: CAS will issue a memory_order_release for us and
233  * thus ensures that it reaches global visibility together
234  * with new 'next'.
235  */
236  seen = stable_nextoff(rbuf);
237  next = seen & RBUF_OFF_MASK;
238  assert(next < rbuf->space);
239  std::atomic_store_explicit<ringbuf_off_t>(
240  &w->seen_off, next | WRAP_LOCK_BIT,
241  std::memory_order_relaxed);
242 
243  /*
244  * Compute the target offset. Key invariant: we cannot
245  * go beyond the WRITTEN offset or catch up with it.
246  */
247  target = next + len;
248  written = rbuf->written;
249  if (__predict_false(next < written && target >= written)) {
250  /* The producer must wait. */
251  std::atomic_store_explicit<ringbuf_off_t>(
252  &w->seen_off, RBUF_OFF_MAX,
253  std::memory_order_release);
254  return -1;
255  }
256 
257  if (__predict_false(target >= rbuf->space)) {
258  const bool exceed = target > rbuf->space;
259 
260  /*
261  * Wrap-around and start from the beginning.
262  *
263  * If we would exceed the buffer, then attempt to
264  * acquire the WRAP_LOCK_BIT and use the space in
265  * the beginning. If we used all space exactly to
266  * the end, then reset to 0.
267  *
268  * Check the invariant again.
269  */
270  target = exceed ? (WRAP_LOCK_BIT | len) : 0;
271  if ((target & RBUF_OFF_MASK) >= written) {
272  std::atomic_store_explicit<ringbuf_off_t>(
273  &w->seen_off, RBUF_OFF_MAX,
274  std::memory_order_release);
275  return -1;
276  }
277  /* Increment the wrap-around counter. */
278  target |= WRAP_INCR(seen & WRAP_COUNTER);
279  } else {
280  /* Preserve the wrap-around counter. */
281  target |= seen & WRAP_COUNTER;
282  }
283  } while (!std::atomic_compare_exchange_weak<ringbuf_off_t>(
284  &rbuf->next, &seen, target));
285 
286  /*
287  * Acquired the range. Clear WRAP_LOCK_BIT in the 'seen' value
288  * thus indicating that it is stable now.
289  *
290  * No need for memory_order_release, since CAS issued a fence.
291  */
292  std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off,
293  w->seen_off & ~WRAP_LOCK_BIT,
294  std::memory_order_relaxed);
295 
296  /*
297  * If we set the WRAP_LOCK_BIT in the 'next' (because we exceed
298  * the remaining space and need to wrap-around), then save the
299  * 'end' offset and release the lock.
300  */
301  if (__predict_false(target & WRAP_LOCK_BIT)) {
302  /* Cannot wrap-around again if consumer did not catch-up. */
303  assert(rbuf->written <= next);
304  assert(rbuf->end == RBUF_OFF_MAX);
305  rbuf->end = next;
306  next = 0;
307 
308  /*
309  * Unlock: ensure the 'end' offset reaches global
310  * visibility before the lock is released.
311  */
312  std::atomic_store_explicit<ringbuf_off_t>(
313  &rbuf->next, (target & ~WRAP_LOCK_BIT),
314  std::memory_order_release);
315  }
316  assert((target & RBUF_OFF_MASK) <= rbuf->space);
317  return (ptrdiff_t)next;
318 }
319 
320 /*
321  * ringbuf_produce: indicate the acquired range in the buffer is produced
322  * and is ready to be consumed.
323  */
324 inline void
325 ringbuf_produce(ringbuf_t *rbuf, ringbuf_worker_t *w)
326 {
327  (void)rbuf;
328  assert(w->registered);
329  assert(w->seen_off != RBUF_OFF_MAX);
330  std::atomic_store_explicit<ringbuf_off_t>(&w->seen_off, RBUF_OFF_MAX,
331  std::memory_order_release);
332 }
333 
334 /*
335  * ringbuf_consume: get a contiguous range which is ready to be consumed.
336  *
337  * Nested consumes are not allowed.
338  */
339 inline size_t
340 ringbuf_consume(ringbuf_t *rbuf, size_t *offset)
341 {
342  assert(!rbuf->consume_in_progress);
343 
344  ringbuf_off_t written = rbuf->written, next, ready;
345  size_t towrite;
346 retry:
347  /*
348  * Get the stable 'next' offset. Note: stable_nextoff() issued
349  * a load memory barrier. The area between the 'written' offset
350  * and the 'next' offset will be the *preliminary* target buffer
351  * area to be consumed.
352  */
353  next = stable_nextoff(rbuf) & RBUF_OFF_MASK;
354  if (written == next) {
355  /* If producers did not advance, then nothing to do. */
356  return 0;
357  }
358 
359  /*
360  * Observe the 'ready' offset of each producer.
361  *
362  * At this point, some producer might have already triggered the
363  * wrap-around and some (or all) seen 'ready' values might be in
364  * the range between 0 and 'written'. We have to skip them.
365  */
366  ready = RBUF_OFF_MAX;
367 
368  for (unsigned i = 0; i < rbuf->nworkers; i++) {
369  ringbuf_worker_t *w = &rbuf->workers[i];
370  ringbuf_off_t seen_off;
371 
372  /*
373  * Skip if the worker has not registered.
374  *
375  * Get a stable 'seen' value. This is necessary since we
376  * want to discard the stale 'seen' values.
377  */
378  if (!std::atomic_load_explicit<int>(&w->registered,
379  std::memory_order_relaxed))
380  continue;
381  seen_off = stable_seenoff(w);
382 
383  /*
384  * Ignore the offsets after the possible wrap-around.
385  * We are interested in the smallest seen offset that is
386  * not behind the 'written' offset.
387  */
388  if (seen_off >= written) {
389  ready = std::min<ringbuf_off_t>(seen_off, ready);
390  }
391  assert(ready >= written);
392  }
393 
394  /*
395  * Finally, we need to determine whether wrap-around occurred
396  * and deduct the safe 'ready' offset.
397  */
398  if (next < written) {
399  const ringbuf_off_t end =
400  std::min<ringbuf_off_t>(rbuf->space, rbuf->end);
401 
402  /*
403  * Wrap-around case. Check for the cut off first.
404  *
405  * Reset the 'written' offset if it reached the end of
406  * the buffer or the 'end' offset (if set by a producer).
407  * However, we must check that the producer is actually
408  * done (the observed 'ready' offsets are clear).
409  */
410  if (ready == RBUF_OFF_MAX && written == end) {
411  /*
412  * Clear the 'end' offset if was set.
413  */
414  if (rbuf->end != RBUF_OFF_MAX) {
415  rbuf->end = RBUF_OFF_MAX;
416  }
417 
418  /*
419  * Wrap-around the consumer and start from zero.
420  */
421  written = 0;
422  std::atomic_store_explicit<ringbuf_off_t>(
423  &rbuf->written, written,
424  std::memory_order_release);
425  goto retry;
426  }
427 
428  /*
429  * We cannot wrap-around yet; there is data to consume at
430  * the end. The ready range is smallest of the observed
431  * 'ready' or the 'end' offset. If neither is set, then
432  * the actual end of the buffer.
433  */
434  assert(ready > next);
435  ready = std::min<ringbuf_off_t>(ready, end);
436  assert(ready >= written);
437  } else {
438  /*
439  * Regular case. Up to the observed 'ready' (if set)
440  * or the 'next' offset.
441  */
442  ready = std::min<ringbuf_off_t>(ready, next);
443  }
444  towrite = ready - written;
445  *offset = written;
446 
447  assert(ready >= written);
448  assert(towrite <= rbuf->space);
449 
450  if (towrite)
451  rbuf->consume_in_progress = true;
452 
453  return towrite;
454 }
455 
456 /*
457  * ringbuf_release: indicate that the consumed range can now be released.
458  */
459 inline void
460 ringbuf_release(ringbuf_t *rbuf, size_t nbytes)
461 {
462  rbuf->consume_in_progress = false;
463 
464  const size_t nwritten = rbuf->written + nbytes;
465 
466  assert(rbuf->written <= rbuf->space);
467  assert(rbuf->written <= rbuf->end);
468  assert(nwritten <= rbuf->space);
469 
470  rbuf->written = (nwritten == rbuf->space) ? 0 : nwritten;
471 }
472 
473 } /* namespace ringbuf */
474 } /* namespace experimental */
475 } /* namespace obj*/
476 } /* namespace pmem*/
477 
478 #endif /* RINGBUF_HPP */
Atomic backoff, for time delay.
pmem::obj::array< T, N >::iterator end(pmem::obj::array< T, N > &a)
Non-member end.
Definition: array.hpp:849
Persistent memory namespace.
Definition: allocation_flag.hpp:15