1use crate::{
4 collect::{self, Pin},
5 scopeguard::guard,
6};
7use core::ptr::NonNull;
8use parking_lot::{Mutex, MutexGuard};
9use std::{
10 alloc::{Allocator, Global, Layout, LayoutError, handle_alloc_error},
11 cell::UnsafeCell,
12 intrinsics::unlikely,
13 iter::FromIterator,
14 marker::PhantomData,
15 mem,
16 ops::{Deref, DerefMut},
17 sync::atomic::{AtomicPtr, Ordering},
18};
19use std::{
20 cmp,
21 ptr::slice_from_raw_parts,
22 sync::{Arc, atomic::AtomicUsize},
23};
24
25mod code;
26mod tests;
27
28pub struct Read<'a, T> {
32 table: &'a SyncPushVec<T>,
33}
34
35impl<T> Copy for Read<'_, T> {}
36impl<T> Clone for Read<'_, T> {
37 fn clone(&self) -> Self {
38 Self { table: self.table }
39 }
40}
41
42pub struct Write<'a, T> {
44 table: &'a SyncPushVec<T>,
45}
46
47pub struct LockedWrite<'a, T> {
49 table: Write<'a, T>,
50 _guard: MutexGuard<'a, ()>,
51}
52
53impl<'a, T> Deref for LockedWrite<'a, T> {
54 type Target = Write<'a, T>;
55
56 #[inline]
57 fn deref(&self) -> &Self::Target {
58 &self.table
59 }
60}
61
62impl<'a, T> DerefMut for LockedWrite<'a, T> {
63 #[inline]
64 fn deref_mut(&mut self) -> &mut Self::Target {
65 &mut self.table
66 }
67}
68
69pub struct SyncPushVec<T> {
71 current: AtomicPtr<TableInfo>,
72
73 lock: Mutex<()>,
74
75 old: UnsafeCell<Vec<Arc<DestroyTable<T>>>>,
76
77 marker: PhantomData<T>,
79}
80
81struct TableInfo {
82 items: AtomicUsize,
83 capacity: usize,
84}
85
86#[repr(transparent)]
87struct TableRef<T> {
88 data: NonNull<TableInfo>,
89
90 marker: PhantomData<*mut T>,
91}
92
93impl<T> Copy for TableRef<T> {}
94impl<T> Clone for TableRef<T> {
95 #[inline]
96 fn clone(&self) -> Self {
97 Self {
98 data: self.data,
99 marker: self.marker,
100 }
101 }
102}
103
104impl<T> TableRef<T> {
105 #[inline]
106 fn empty() -> Self {
107 if cfg!(debug_assertions) {
108 let real = Self::layout(0).unwrap().0;
109 let dummy = Layout::new::<TableInfo>().align_to(real.align()).unwrap();
110 debug_assert_eq!(real, dummy);
111 }
112
113 #[repr(C, align(64))]
114 struct EmptyTable {
115 info: TableInfo,
116 }
117
118 static EMPTY: EmptyTable = EmptyTable {
119 info: TableInfo {
120 capacity: 0,
121 items: AtomicUsize::new(0),
122 },
123 };
124
125 Self {
126 data: unsafe {
127 NonNull::new_unchecked(&EMPTY.info as *const TableInfo as *mut TableInfo)
128 },
129 marker: PhantomData,
130 }
131 }
132
133 #[inline]
134 fn layout(capacity: usize) -> Result<(Layout, usize), LayoutError> {
135 let data = Layout::new::<T>().repeat(capacity)?.0;
136 let info = Layout::new::<TableInfo>();
137 data.extend(info)
138 }
139
140 #[inline]
141 fn allocate(capacity: usize) -> Self {
142 let (layout, info_offset) = Self::layout(capacity).expect("capacity overflow");
143
144 let ptr: NonNull<u8> = Global
145 .allocate(layout)
146 .map(|ptr| ptr.cast())
147 .unwrap_or_else(|_| handle_alloc_error(layout));
148
149 let info =
150 unsafe { NonNull::new_unchecked(ptr.as_ptr().add(info_offset) as *mut TableInfo) };
151
152 let mut result = Self {
153 data: info,
154 marker: PhantomData,
155 };
156
157 unsafe {
158 *result.info_mut() = TableInfo {
159 capacity,
160 items: AtomicUsize::new(0),
161 };
162 }
163
164 result
165 }
166
167 #[inline]
168 unsafe fn free(self) {
169 unsafe {
170 let items = self.info().items.load(Ordering::Relaxed);
171 if items > 0 {
172 if mem::needs_drop::<T>() {
173 for i in 0..items {
174 self.data(i).drop_in_place();
175 }
176 }
177
178 let (layout, info_offset) = Self::layout(self.info().capacity).unwrap_unchecked();
179
180 Global.deallocate(
181 NonNull::new_unchecked((self.data.as_ptr() as *mut u8).sub(info_offset)),
182 layout,
183 )
184 }
185 }
186 }
187
188 fn from_maybe_empty_iter<I: Iterator<Item = T>, const CHECK_LEN: bool>(
189 iter: I,
190 iter_size: usize,
191 capacity: usize,
192 ) -> TableRef<T> {
193 if iter_size == 0 {
194 TableRef::empty()
195 } else {
196 let capacity = cmp::max(iter_size, capacity);
197 unsafe { TableRef::from_iter::<_, CHECK_LEN>(iter, capacity) }
198 }
199 }
200
201 unsafe fn from_iter<I: Iterator<Item = T>, const CHECK_LEN: bool>(
203 iter: I,
204 new_capacity: usize,
205 ) -> TableRef<T> {
206 unsafe {
207 debug_assert!(new_capacity > 0);
208
209 let mut new_table = TableRef::<T>::allocate(new_capacity);
210
211 let mut guard = guard(Some(new_table), |new_table| {
212 new_table.map(|new_table| new_table.free());
213 });
214
215 for (index, item) in iter.enumerate() {
217 debug_assert!(index < new_capacity);
218 if CHECK_LEN && index >= new_capacity {
219 break;
220 }
221
222 new_table.first().add(index).write(item);
223
224 *new_table.info_mut().items.get_mut() = index + 1;
227 }
228
229 *guard = None;
230
231 new_table
232 }
233 }
234
235 unsafe fn info(&self) -> &TableInfo {
236 unsafe { self.data.as_ref() }
237 }
238
239 unsafe fn info_mut(&mut self) -> &mut TableInfo {
240 unsafe { self.data.as_mut() }
241 }
242
243 #[inline]
244 unsafe fn first(&self) -> *mut T {
245 unsafe { (self.data.as_ptr() as *mut T).sub(self.info().capacity) }
246 }
247
248 #[inline]
250 unsafe fn slice(&self) -> *const [T] {
251 unsafe {
252 let items = self.info().items.load(Ordering::Acquire);
253 let base = if items == 0 && mem::align_of::<T>() > 64 {
254 mem::align_of::<T>() as *const T
257 } else {
258 self.first() as *const T
259 };
260 slice_from_raw_parts(base, items)
261 }
262 }
263
264 #[inline]
266 unsafe fn data(&self, index: usize) -> *mut T {
267 unsafe {
268 debug_assert!(index < self.info().items.load(Ordering::Acquire));
269
270 self.first().add(index)
271 }
272 }
273}
274
275impl<T: Clone> TableRef<T> {
276 unsafe fn clone(&self, new_capacity: usize) -> TableRef<T> {
279 unsafe {
280 debug_assert!(new_capacity >= self.info().capacity);
281
282 TableRef::from_iter::<_, false>((*self.slice()).iter().cloned(), new_capacity)
283 }
284 }
285}
286
287struct DestroyTable<T> {
288 table: TableRef<T>,
289 lock: Mutex<bool>,
290}
291
292unsafe impl<T> Sync for DestroyTable<T> {}
293unsafe impl<T: Send> Send for DestroyTable<T> {}
294
295impl<T> DestroyTable<T> {
296 unsafe fn run(&self) {
297 unsafe {
298 let mut status = self.lock.lock();
299 if !*status {
300 *status = true;
301 self.table.free();
302 }
303 }
304 }
305}
306
307unsafe impl<#[may_dangle] T> Drop for SyncPushVec<T> {
308 #[inline]
309 fn drop(&mut self) {
310 unsafe {
311 self.current().free();
312 for table in self.old.get_mut() {
313 table.run();
314 }
315 }
316 }
317}
318
319unsafe impl<T: Send> Send for SyncPushVec<T> {}
320unsafe impl<T: Sync> Sync for SyncPushVec<T> {}
321
322impl<T> Default for SyncPushVec<T> {
323 #[inline]
324 fn default() -> Self {
325 Self::new()
326 }
327}
328
329impl<T> SyncPushVec<T> {
330 #[inline]
334 pub fn new() -> Self {
335 Self::with_capacity(0)
336 }
337
338 #[inline]
342 pub fn with_capacity(capacity: usize) -> Self {
343 Self {
344 current: AtomicPtr::new(
345 if capacity > 0 {
346 TableRef::<T>::allocate(capacity)
347 } else {
348 TableRef::empty()
349 }
350 .data
351 .as_ptr(),
352 ),
353 old: UnsafeCell::new(Vec::new()),
354 marker: PhantomData,
355 lock: Mutex::new(()),
356 }
357 }
358
359 #[inline]
361 pub fn mutex(&self) -> &Mutex<()> {
362 &self.lock
363 }
364
365 #[inline]
369 pub fn read<'a>(&'a self, pin: Pin<'a>) -> Read<'a, T> {
370 let _pin = pin;
371 Read { table: self }
372 }
373
374 #[inline]
379 pub unsafe fn unsafe_write(&self) -> Write<'_, T> {
380 Write { table: self }
381 }
382
383 #[inline]
385 pub fn write(&mut self) -> Write<'_, T> {
386 Write { table: self }
387 }
388
389 #[inline]
391 pub fn lock(&self) -> LockedWrite<'_, T> {
392 LockedWrite {
393 table: Write { table: self },
394 _guard: self.lock.lock(),
395 }
396 }
397
398 #[inline]
400 pub fn lock_from_guard<'a>(&'a self, guard: MutexGuard<'a, ()>) -> LockedWrite<'a, T> {
401 assert_eq!(
403 &self.lock as *const _,
404 MutexGuard::mutex(&guard) as *const _
405 );
406
407 LockedWrite {
408 table: Write { table: self },
409 _guard: guard,
410 }
411 }
412
413 #[inline]
415 pub fn as_mut_slice(&mut self) -> &mut [T] {
416 unsafe { &mut *(self.current().slice() as *mut [T]) }
417 }
418
419 #[inline]
420 fn current(&self) -> TableRef<T> {
421 TableRef {
422 data: unsafe { NonNull::new_unchecked(self.current.load(Ordering::Acquire)) },
423 marker: PhantomData,
424 }
425 }
426}
427
428impl<'a, T> Read<'a, T> {
429 #[inline]
431 pub fn capacity(self) -> usize {
432 unsafe { self.table.current().info().capacity }
433 }
434
435 #[inline]
437 pub fn len(self) -> usize {
438 unsafe { self.table.current().info().items.load(Ordering::Acquire) }
439 }
440
441 #[inline]
443 pub fn as_slice(self) -> &'a [T] {
444 let table = self.table.current();
445 unsafe { &*table.slice() }
446 }
447}
448
449impl<T> Write<'_, T> {
450 #[inline]
452 pub fn read(&self) -> Read<'_, T> {
453 Read { table: self.table }
454 }
455}
456
457impl<'a, T: Send + Clone> Write<'a, T> {
458 #[inline]
461 pub fn push(&mut self, value: T) -> (&'a T, usize) {
462 let mut table = self.table.current();
463 unsafe {
464 let items = table.info().items.load(Ordering::Relaxed);
465
466 if unlikely(items == table.info().capacity) {
467 table = self.expand_by_one();
468 }
469
470 let result = table.first().add(items);
471
472 result.write(value);
473
474 table.info().items.store(items + 1, Ordering::Release);
475
476 (&*result, items)
477 }
478 }
479
480 #[inline]
484 pub fn reserve(&mut self, additional: usize) {
485 let table = self.table.current();
486 unsafe {
487 let required = table
488 .info()
489 .items
490 .load(Ordering::Relaxed)
491 .checked_add(additional)
492 .expect("capacity overflow");
493
494 if table.info().capacity < required {
495 self.expand_by(additional);
496 }
497 }
498 }
499
500 #[cold]
501 #[inline(never)]
502 fn expand_by_one(&mut self) -> TableRef<T> {
503 self.expand_by(1)
504 }
505
506 const MIN_NON_ZERO_CAP: usize = if mem::size_of::<T>() == 1 {
512 8
513 } else if mem::size_of::<T>() <= 1024 {
514 4
515 } else {
516 1
517 };
518
519 fn expand_by(&mut self, additional: usize) -> TableRef<T> {
520 let table = self.table.current();
521
522 let items = unsafe { table.info().items.load(Ordering::Relaxed) };
523 let capacity = unsafe { table.info().capacity };
524
525 let required_cap = match items.checked_add(additional) {
527 Some(required_cap) => required_cap,
528 None => panic!("capacity overflow"),
529 };
530
531 let cap = cmp::max(capacity * 2, required_cap);
534 let cap = cmp::max(Self::MIN_NON_ZERO_CAP, cap);
535
536 let new_table = unsafe { table.clone(cap) };
537
538 self.replace_table(new_table);
539
540 new_table
541 }
542}
543
544impl<T: Send> Write<'_, T> {
545 fn replace_table(&mut self, new_table: TableRef<T>) {
546 let table = self.table.current();
547
548 self.table
549 .current
550 .store(new_table.data.as_ptr(), Ordering::Release);
551
552 let destroy = Arc::new(DestroyTable {
553 table,
554 lock: Mutex::new(false),
555 });
556
557 unsafe {
558 (*self.table.old.get()).push(destroy.clone());
559
560 collect::defer_unchecked(move || destroy.run());
561 }
562 }
563
564 #[inline]
567 pub fn replace<I: IntoIterator<Item = T>>(&mut self, iter: I, capacity: usize) {
568 let iter = iter.into_iter();
569
570 let table = if let Some(max) = iter.size_hint().1 {
571 TableRef::from_maybe_empty_iter::<_, true>(iter, max, capacity)
572 } else {
573 let elements: Vec<_> = iter.collect();
574 let len = elements.len();
575 TableRef::from_maybe_empty_iter::<_, false>(elements.into_iter(), len, capacity)
576 };
577
578 self.replace_table(table);
579 }
580}
581
582impl<T: Clone + Send> Extend<T> for Write<'_, T> {
583 #[inline]
584 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
585 let iter = iter.into_iter();
586 self.reserve(iter.size_hint().0);
587 iter.for_each(|v| {
588 self.push(v);
589 });
590 }
591
592 #[inline]
593 fn extend_one(&mut self, item: T) {
594 self.push(item);
595 }
596
597 #[inline]
598 fn extend_reserve(&mut self, additional: usize) {
599 self.reserve(additional);
600 }
601}
602
603impl<T: Clone + Send> FromIterator<T> for SyncPushVec<T> {
604 #[inline]
605 fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
606 let iter = iter.into_iter();
607 let mut map = Self::with_capacity(iter.size_hint().0);
608 let mut write = map.write();
609 iter.for_each(|v| {
610 write.push(v);
611 });
612 map
613 }
614}