horde/
sync_push_vec.rs

1//! A contiguous push-only array type with lock-free reads.
2
3use crate::{
4    collect::{self, Pin},
5    scopeguard::guard,
6};
7use core::ptr::NonNull;
8use parking_lot::{Mutex, MutexGuard};
9use std::{
10    alloc::{Allocator, Global, Layout, LayoutError, handle_alloc_error},
11    cell::UnsafeCell,
12    intrinsics::unlikely,
13    iter::FromIterator,
14    marker::PhantomData,
15    mem,
16    ops::{Deref, DerefMut},
17    sync::atomic::{AtomicPtr, Ordering},
18};
19use std::{
20    cmp,
21    ptr::slice_from_raw_parts,
22    sync::{Arc, atomic::AtomicUsize},
23};
24
25mod code;
26mod tests;
27
28/// A handle to a [SyncPushVec] with read access.
29///
30/// It is acquired either by a pin, or by exclusive access to the vector.
31pub struct Read<'a, T> {
32    table: &'a SyncPushVec<T>,
33}
34
35impl<T> Copy for Read<'_, T> {}
36impl<T> Clone for Read<'_, T> {
37    fn clone(&self) -> Self {
38        Self { table: self.table }
39    }
40}
41
42/// A handle to a [SyncPushVec] with write access.
43pub struct Write<'a, T> {
44    table: &'a SyncPushVec<T>,
45}
46
47/// A handle to a [SyncPushVec] with write access protected by a lock.
48pub struct LockedWrite<'a, T> {
49    table: Write<'a, T>,
50    _guard: MutexGuard<'a, ()>,
51}
52
53impl<'a, T> Deref for LockedWrite<'a, T> {
54    type Target = Write<'a, T>;
55
56    #[inline]
57    fn deref(&self) -> &Self::Target {
58        &self.table
59    }
60}
61
62impl<'a, T> DerefMut for LockedWrite<'a, T> {
63    #[inline]
64    fn deref_mut(&mut self) -> &mut Self::Target {
65        &mut self.table
66    }
67}
68
69/// A contiguous push-only array type with lock-free reads.
70pub struct SyncPushVec<T> {
71    current: AtomicPtr<TableInfo>,
72
73    lock: Mutex<()>,
74
75    old: UnsafeCell<Vec<Arc<DestroyTable<T>>>>,
76
77    // Tell dropck that we own instances of T.
78    marker: PhantomData<T>,
79}
80
81struct TableInfo {
82    items: AtomicUsize,
83    capacity: usize,
84}
85
86#[repr(transparent)]
87struct TableRef<T> {
88    data: NonNull<TableInfo>,
89
90    marker: PhantomData<*mut T>,
91}
92
93impl<T> Copy for TableRef<T> {}
94impl<T> Clone for TableRef<T> {
95    #[inline]
96    fn clone(&self) -> Self {
97        Self {
98            data: self.data,
99            marker: self.marker,
100        }
101    }
102}
103
104impl<T> TableRef<T> {
105    #[inline]
106    fn empty() -> Self {
107        if cfg!(debug_assertions) {
108            let real = Self::layout(0).unwrap().0;
109            let dummy = Layout::new::<TableInfo>().align_to(real.align()).unwrap();
110            debug_assert_eq!(real, dummy);
111        }
112
113        #[repr(C, align(64))]
114        struct EmptyTable {
115            info: TableInfo,
116        }
117
118        static EMPTY: EmptyTable = EmptyTable {
119            info: TableInfo {
120                capacity: 0,
121                items: AtomicUsize::new(0),
122            },
123        };
124
125        Self {
126            data: unsafe {
127                NonNull::new_unchecked(&EMPTY.info as *const TableInfo as *mut TableInfo)
128            },
129            marker: PhantomData,
130        }
131    }
132
133    #[inline]
134    fn layout(capacity: usize) -> Result<(Layout, usize), LayoutError> {
135        let data = Layout::new::<T>().repeat(capacity)?.0;
136        let info = Layout::new::<TableInfo>();
137        data.extend(info)
138    }
139
140    #[inline]
141    fn allocate(capacity: usize) -> Self {
142        let (layout, info_offset) = Self::layout(capacity).expect("capacity overflow");
143
144        let ptr: NonNull<u8> = Global
145            .allocate(layout)
146            .map(|ptr| ptr.cast())
147            .unwrap_or_else(|_| handle_alloc_error(layout));
148
149        let info =
150            unsafe { NonNull::new_unchecked(ptr.as_ptr().add(info_offset) as *mut TableInfo) };
151
152        let mut result = Self {
153            data: info,
154            marker: PhantomData,
155        };
156
157        unsafe {
158            *result.info_mut() = TableInfo {
159                capacity,
160                items: AtomicUsize::new(0),
161            };
162        }
163
164        result
165    }
166
167    #[inline]
168    unsafe fn free(self) {
169        unsafe {
170            let items = self.info().items.load(Ordering::Relaxed);
171            if items > 0 {
172                if mem::needs_drop::<T>() {
173                    for i in 0..items {
174                        self.data(i).drop_in_place();
175                    }
176                }
177
178                let (layout, info_offset) = Self::layout(self.info().capacity).unwrap_unchecked();
179
180                Global.deallocate(
181                    NonNull::new_unchecked((self.data.as_ptr() as *mut u8).sub(info_offset)),
182                    layout,
183                )
184            }
185        }
186    }
187
188    fn from_maybe_empty_iter<I: Iterator<Item = T>, const CHECK_LEN: bool>(
189        iter: I,
190        iter_size: usize,
191        capacity: usize,
192    ) -> TableRef<T> {
193        if iter_size == 0 {
194            TableRef::empty()
195        } else {
196            let capacity = cmp::max(iter_size, capacity);
197            unsafe { TableRef::from_iter::<_, CHECK_LEN>(iter, capacity) }
198        }
199    }
200
201    /// Allocates a new table and fills it with the content of an iterator
202    unsafe fn from_iter<I: Iterator<Item = T>, const CHECK_LEN: bool>(
203        iter: I,
204        new_capacity: usize,
205    ) -> TableRef<T> {
206        unsafe {
207            debug_assert!(new_capacity > 0);
208
209            let mut new_table = TableRef::<T>::allocate(new_capacity);
210
211            let mut guard = guard(Some(new_table), |new_table| {
212                new_table.map(|new_table| new_table.free());
213            });
214
215            // Copy all elements to the new table.
216            for (index, item) in iter.enumerate() {
217                debug_assert!(index < new_capacity);
218                if CHECK_LEN && index >= new_capacity {
219                    break;
220                }
221
222                new_table.first().add(index).write(item);
223
224                // Write items per iteration in case `next` on the iterator panics.
225
226                *new_table.info_mut().items.get_mut() = index + 1;
227            }
228
229            *guard = None;
230
231            new_table
232        }
233    }
234
235    unsafe fn info(&self) -> &TableInfo {
236        unsafe { self.data.as_ref() }
237    }
238
239    unsafe fn info_mut(&mut self) -> &mut TableInfo {
240        unsafe { self.data.as_mut() }
241    }
242
243    #[inline]
244    unsafe fn first(&self) -> *mut T {
245        unsafe { (self.data.as_ptr() as *mut T).sub(self.info().capacity) }
246    }
247
248    /// Returns a pointer to an element in the table.
249    #[inline]
250    unsafe fn slice(&self) -> *const [T] {
251        unsafe {
252            let items = self.info().items.load(Ordering::Acquire);
253            let base = if items == 0 && mem::align_of::<T>() > 64 {
254                // Need a special case here since our empty allocation isn't aligned to T.
255                // It only has an alignment of 64.
256                mem::align_of::<T>() as *const T
257            } else {
258                self.first() as *const T
259            };
260            slice_from_raw_parts(base, items)
261        }
262    }
263
264    /// Returns a pointer to an element in the table.
265    #[inline]
266    unsafe fn data(&self, index: usize) -> *mut T {
267        unsafe {
268            debug_assert!(index < self.info().items.load(Ordering::Acquire));
269
270            self.first().add(index)
271        }
272    }
273}
274
275impl<T: Clone> TableRef<T> {
276    /// Allocates a new table of a different size and moves the contents of the
277    /// current table into it.
278    unsafe fn clone(&self, new_capacity: usize) -> TableRef<T> {
279        unsafe {
280            debug_assert!(new_capacity >= self.info().capacity);
281
282            TableRef::from_iter::<_, false>((*self.slice()).iter().cloned(), new_capacity)
283        }
284    }
285}
286
287struct DestroyTable<T> {
288    table: TableRef<T>,
289    lock: Mutex<bool>,
290}
291
292unsafe impl<T> Sync for DestroyTable<T> {}
293unsafe impl<T: Send> Send for DestroyTable<T> {}
294
295impl<T> DestroyTable<T> {
296    unsafe fn run(&self) {
297        unsafe {
298            let mut status = self.lock.lock();
299            if !*status {
300                *status = true;
301                self.table.free();
302            }
303        }
304    }
305}
306
307unsafe impl<#[may_dangle] T> Drop for SyncPushVec<T> {
308    #[inline]
309    fn drop(&mut self) {
310        unsafe {
311            self.current().free();
312            for table in self.old.get_mut() {
313                table.run();
314            }
315        }
316    }
317}
318
319unsafe impl<T: Send> Send for SyncPushVec<T> {}
320unsafe impl<T: Sync> Sync for SyncPushVec<T> {}
321
322impl<T> Default for SyncPushVec<T> {
323    #[inline]
324    fn default() -> Self {
325        Self::new()
326    }
327}
328
329impl<T> SyncPushVec<T> {
330    /// Constructs a new, empty vector with zero capacity.
331    ///
332    /// The vector will not allocate until elements are pushed onto it.
333    #[inline]
334    pub fn new() -> Self {
335        Self::with_capacity(0)
336    }
337
338    /// Constructs a new, empty vector with the specified capacity.
339    ///
340    /// The vector will be able to hold exactly `capacity` elements without reallocating. If `capacity` is 0, the vector will not allocate.
341    #[inline]
342    pub fn with_capacity(capacity: usize) -> Self {
343        Self {
344            current: AtomicPtr::new(
345                if capacity > 0 {
346                    TableRef::<T>::allocate(capacity)
347                } else {
348                    TableRef::empty()
349                }
350                .data
351                .as_ptr(),
352            ),
353            old: UnsafeCell::new(Vec::new()),
354            marker: PhantomData,
355            lock: Mutex::new(()),
356        }
357    }
358
359    /// Gets a reference to the underlying mutex that protects writes.
360    #[inline]
361    pub fn mutex(&self) -> &Mutex<()> {
362        &self.lock
363    }
364
365    /// Creates a [Read] handle from a pinned region.
366    ///
367    /// Use [crate::collect::pin] to get a `Pin` instance.
368    #[inline]
369    pub fn read<'a>(&'a self, pin: Pin<'a>) -> Read<'a, T> {
370        let _pin = pin;
371        Read { table: self }
372    }
373
374    /// Creates a [Write] handle without checking for exclusive access.
375    ///
376    /// # Safety
377    /// It's up to the caller to ensure only one thread writes to the vector at a time.
378    #[inline]
379    pub unsafe fn unsafe_write(&self) -> Write<'_, T> {
380        Write { table: self }
381    }
382
383    /// Creates a [Write] handle from a mutable reference.
384    #[inline]
385    pub fn write(&mut self) -> Write<'_, T> {
386        Write { table: self }
387    }
388
389    /// Creates a [LockedWrite] handle by taking the underlying mutex that protects writes.
390    #[inline]
391    pub fn lock(&self) -> LockedWrite<'_, T> {
392        LockedWrite {
393            table: Write { table: self },
394            _guard: self.lock.lock(),
395        }
396    }
397
398    /// Creates a [LockedWrite] handle from a guard protecting the underlying mutex that protects writes.
399    #[inline]
400    pub fn lock_from_guard<'a>(&'a self, guard: MutexGuard<'a, ()>) -> LockedWrite<'a, T> {
401        // Verify that we are target of the guard
402        assert_eq!(
403            &self.lock as *const _,
404            MutexGuard::mutex(&guard) as *const _
405        );
406
407        LockedWrite {
408            table: Write { table: self },
409            _guard: guard,
410        }
411    }
412
413    /// Extracts a mutable slice of the entire vector.
414    #[inline]
415    pub fn as_mut_slice(&mut self) -> &mut [T] {
416        unsafe { &mut *(self.current().slice() as *mut [T]) }
417    }
418
419    #[inline]
420    fn current(&self) -> TableRef<T> {
421        TableRef {
422            data: unsafe { NonNull::new_unchecked(self.current.load(Ordering::Acquire)) },
423            marker: PhantomData,
424        }
425    }
426}
427
428impl<'a, T> Read<'a, T> {
429    /// Returns the number of elements the map can hold without reallocating.
430    #[inline]
431    pub fn capacity(self) -> usize {
432        unsafe { self.table.current().info().capacity }
433    }
434
435    /// Returns the number of elements in the table.
436    #[inline]
437    pub fn len(self) -> usize {
438        unsafe { self.table.current().info().items.load(Ordering::Acquire) }
439    }
440
441    /// Extracts a slice containing the entire vector.
442    #[inline]
443    pub fn as_slice(self) -> &'a [T] {
444        let table = self.table.current();
445        unsafe { &*table.slice() }
446    }
447}
448
449impl<T> Write<'_, T> {
450    /// Creates a [Read] handle which gives access to read operations.
451    #[inline]
452    pub fn read(&self) -> Read<'_, T> {
453        Read { table: self.table }
454    }
455}
456
457impl<'a, T: Send + Clone> Write<'a, T> {
458    /// Inserts a new element into the end of the table, and returns a refernce to it along
459    /// with its index.
460    #[inline]
461    pub fn push(&mut self, value: T) -> (&'a T, usize) {
462        let mut table = self.table.current();
463        unsafe {
464            let items = table.info().items.load(Ordering::Relaxed);
465
466            if unlikely(items == table.info().capacity) {
467                table = self.expand_by_one();
468            }
469
470            let result = table.first().add(items);
471
472            result.write(value);
473
474            table.info().items.store(items + 1, Ordering::Release);
475
476            (&*result, items)
477        }
478    }
479
480    /// Reserves capacity for at least `additional` more elements to be inserted
481    /// in the given vector. The collection may reserve more space to avoid
482    /// frequent reallocations. Does nothing if the capacity is already sufficient.
483    #[inline]
484    pub fn reserve(&mut self, additional: usize) {
485        let table = self.table.current();
486        unsafe {
487            let required = table
488                .info()
489                .items
490                .load(Ordering::Relaxed)
491                .checked_add(additional)
492                .expect("capacity overflow");
493
494            if table.info().capacity < required {
495                self.expand_by(additional);
496            }
497        }
498    }
499
500    #[cold]
501    #[inline(never)]
502    fn expand_by_one(&mut self) -> TableRef<T> {
503        self.expand_by(1)
504    }
505
506    // Tiny Vecs are dumb. Skip to:
507    // - 8 if the element size is 1, because any heap allocators is likely
508    //   to round up a request of less than 8 bytes to at least 8 bytes.
509    // - 4 if elements are moderate-sized (<= 1 KiB).
510    // - 1 otherwise, to avoid wasting too much space for very short Vecs.
511    const MIN_NON_ZERO_CAP: usize = if mem::size_of::<T>() == 1 {
512        8
513    } else if mem::size_of::<T>() <= 1024 {
514        4
515    } else {
516        1
517    };
518
519    fn expand_by(&mut self, additional: usize) -> TableRef<T> {
520        let table = self.table.current();
521
522        let items = unsafe { table.info().items.load(Ordering::Relaxed) };
523        let capacity = unsafe { table.info().capacity };
524
525        // Avoid `Option::ok_or_else` because it bloats LLVM IR.
526        let required_cap = match items.checked_add(additional) {
527            Some(required_cap) => required_cap,
528            None => panic!("capacity overflow"),
529        };
530
531        // This guarantees exponential growth. The doubling cannot overflow
532        // because `cap <= isize::MAX` and the type of `cap` is `usize`.
533        let cap = cmp::max(capacity * 2, required_cap);
534        let cap = cmp::max(Self::MIN_NON_ZERO_CAP, cap);
535
536        let new_table = unsafe { table.clone(cap) };
537
538        self.replace_table(new_table);
539
540        new_table
541    }
542}
543
544impl<T: Send> Write<'_, T> {
545    fn replace_table(&mut self, new_table: TableRef<T>) {
546        let table = self.table.current();
547
548        self.table
549            .current
550            .store(new_table.data.as_ptr(), Ordering::Release);
551
552        let destroy = Arc::new(DestroyTable {
553            table,
554            lock: Mutex::new(false),
555        });
556
557        unsafe {
558            (*self.table.old.get()).push(destroy.clone());
559
560            collect::defer_unchecked(move || destroy.run());
561        }
562    }
563
564    /// Replaces the content of the vector with the content of the iterator.
565    /// `capacity` specifies the new capacity if it's greater than the length of the iterator.
566    #[inline]
567    pub fn replace<I: IntoIterator<Item = T>>(&mut self, iter: I, capacity: usize) {
568        let iter = iter.into_iter();
569
570        let table = if let Some(max) = iter.size_hint().1 {
571            TableRef::from_maybe_empty_iter::<_, true>(iter, max, capacity)
572        } else {
573            let elements: Vec<_> = iter.collect();
574            let len = elements.len();
575            TableRef::from_maybe_empty_iter::<_, false>(elements.into_iter(), len, capacity)
576        };
577
578        self.replace_table(table);
579    }
580}
581
582impl<T: Clone + Send> Extend<T> for Write<'_, T> {
583    #[inline]
584    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
585        let iter = iter.into_iter();
586        self.reserve(iter.size_hint().0);
587        iter.for_each(|v| {
588            self.push(v);
589        });
590    }
591
592    #[inline]
593    fn extend_one(&mut self, item: T) {
594        self.push(item);
595    }
596
597    #[inline]
598    fn extend_reserve(&mut self, additional: usize) {
599        self.reserve(additional);
600    }
601}
602
603impl<T: Clone + Send> FromIterator<T> for SyncPushVec<T> {
604    #[inline]
605    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
606        let iter = iter.into_iter();
607        let mut map = Self::with_capacity(iter.size_hint().0);
608        let mut write = map.write();
609        iter.for_each(|v| {
610            write.push(v);
611        });
612        map
613    }
614}