//! Bounded MPMC (multi-producer, multi-consumer) channel.
//!
//! Lock-free ring buffer with atomic head/tail pointers.
//! Capacity is a const generic, consistent with the compile-time
//! bounds philosophy.
//!
//! ```ignore
//! let (tx, rx) = bounded_channel::<Transaction, 1000>();
//! tx.try_send(msg).ok();
//! let msg = rx.try_recv();
//! ```
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};
/// Error returned when a channel is full.
#[derive(Debug, PartialEq, Eq)]
pub struct Full<T>(
/// The value that could not be sent.
pub T,
);
impl<T> Full<T> {
/// Unwrap the unsent value.
pub fn into_inner(self) -> T {
self.0
}
}
/// Internal shared state for the channel.
///
/// Uses a ring buffer with power-of-two sizing for efficient modular indexing.
/// The actual usable capacity is `N` slots; the buffer is `N` entries.
///
/// Head and tail are monotonically increasing indices. Wrapping is handled
/// by masking or modular arithmetic on access.
struct ChannelInner<T, const N: usize> {
buffer: [UnsafeCell<MaybeUninit<T>>; N],
head: AtomicUsize, // next slot to write
tail: AtomicUsize, // next slot to read
_sender_count: AtomicUsize,
_receiver_count: AtomicUsize,
}
// SAFETY: The ring buffer slots are accessed at distinct indices by
// producers (head) and consumers (tail). The atomic operations ensure
// that no two threads access the same slot simultaneously in a conflicting way.
unsafe impl<T: Send, const N: usize> Sync for ChannelInner<T, N> {}
unsafe impl<T: Send, const N: usize> Send for ChannelInner<T, N> {}
#[allow(dead_code)]
impl<T, const N: usize> ChannelInner<T, N> {
const fn new() -> Self {
// SAFETY: Array of UnsafeCell<MaybeUninit<T>> does not require
// initialization — MaybeUninit is explicitly uninitialized.
Self {
buffer: unsafe {
MaybeUninit::<[UnsafeCell<MaybeUninit<T>>; N]>::uninit().assume_init()
},
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
_sender_count: AtomicUsize::new(1),
_receiver_count: AtomicUsize::new(1),
}
}
fn try_send(&self, value: T) -> Result<(), Full<T>> {
if N == 0 {
return Err(Full(value));
}
loop {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
// Channel is full when head - tail == N
if head.wrapping_sub(tail) >= N {
return Err(Full(value));
}
// Try to claim the head slot
if self
.head
.compare_exchange_weak(head, head.wrapping_add(1), Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let idx = head % N;
// SAFETY: We have exclusive access to this slot via the CAS.
unsafe {
(*self.buffer[idx].get()) = MaybeUninit::new(value);
}
return Ok(());
}
// CAS failed — another producer won. Retry.
core::hint::spin_loop();
}
}
fn try_recv(&self) -> Option<T> {
if N == 0 {
return None;
}
loop {
let tail = self.tail.load(Ordering::Acquire);
let head = self.head.load(Ordering::Acquire);
// Channel is empty when tail == head
if tail == head {
return None;
}
// Try to claim the tail slot
if self
.tail
.compare_exchange_weak(tail, tail.wrapping_add(1), Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let idx = tail % N;
// SAFETY: This slot was written by a producer and is now
// exclusively owned by this consumer via the CAS.
let value = unsafe { (*self.buffer[idx].get()).assume_init_read() };
return Some(value);
}
// CAS failed — another consumer won. Retry.
core::hint::spin_loop();
}
}
fn len(&self) -> usize {
let head = self.head.load(Ordering::Acquire);
let tail = self.tail.load(Ordering::Acquire);
head.wrapping_sub(tail)
}
fn is_empty(&self) -> bool {
self.len() == 0
}
fn is_full(&self) -> bool {
self.len() >= N
}
}
impl<T, const N: usize> Drop for ChannelInner<T, N> {
fn drop(&mut self) {
// Drop any remaining items in the buffer.
let tail = *self.tail.get_mut();
let head = *self.head.get_mut();
let mut i = tail;
while i != head {
let idx = i % N;
// SAFETY: Slots between tail and head are initialized.
unsafe {
self.buffer[idx].get_mut().assume_init_drop();
}
i = i.wrapping_add(1);
}
}
}
// =============================================================================
// Shared ownership via manual reference counting
// =============================================================================
// We use a raw pointer to a heap-allocated ChannelInner for shared ownership.
// In no_std without alloc, we provide a stack-based alternative.
/// Wrapper around the shared channel state.
///
/// In no_std + no alloc mode, the channel must be created with a static
/// or caller-managed backing store. For simplicity and correctness, we
/// always use the inner struct directly with reference counting.
#[cfg(any(feature = "std", test))]
mod shared {
use super::*;
use alloc::sync::Arc;
/// Sending half of a bounded channel.
pub struct Sender<T, const N: usize> {
inner: Arc<ChannelInner<T, N>>,
}
/// Receiving half of a bounded channel.
pub struct Receiver<T, const N: usize> {
inner: Arc<ChannelInner<T, N>>,
}
impl<T, const N: usize> Sender<T, N> {
/// Try to send a value. Returns `Err(Full(value))` if the channel is full.
pub fn try_send(&self, value: T) -> Result<(), Full<T>> {
self.inner.try_send(value)
}
/// Returns the number of items currently in the channel.
pub fn len(&self) -> usize {
self.inner.len()
}
/// Returns `true` if the channel is empty.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Returns `true` if the channel is full.
pub fn is_full(&self) -> bool {
self.inner.is_full()
}
}
impl<T, const N: usize> Clone for Sender<T, N> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T, const N: usize> Receiver<T, N> {
/// Try to receive a value. Returns `None` if the channel is empty.
pub fn try_recv(&self) -> Option<T> {
self.inner.try_recv()
}
/// Returns the number of items currently in the channel.
pub fn len(&self) -> usize {
self.inner.len()
}
/// Returns `true` if the channel is empty.
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
/// Returns `true` if the channel is full.
pub fn is_full(&self) -> bool {
self.inner.is_full()
}
}
impl<T, const N: usize> Clone for Receiver<T, N> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<T, const N: usize> core::fmt::Debug for Sender<T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Sender")
.field("len", &self.len())
.field("capacity", &N)
.finish()
}
}
impl<T, const N: usize> core::fmt::Debug for Receiver<T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Receiver")
.field("len", &self.len())
.field("capacity", &N)
.finish()
}
}
/// Create a bounded MPMC channel with capacity `N`.
///
/// Returns a `(Sender, Receiver)` pair. Both can be cloned for
/// multi-producer / multi-consumer patterns.
pub fn bounded_channel<T, const N: usize>() -> (Sender<T, N>, Receiver<T, N>) {
let inner = Arc::new(ChannelInner::new());
(
Sender { inner: inner.clone() },
Receiver { inner },
)
}
}
/// no_std without alloc: pointer-based channel backed by caller-managed storage.
#[cfg(not(any(feature = "std", test)))]
#[allow(dead_code)]
mod shared {
use super::*;
/// Sending half of a bounded channel (no_std, pointer-based).
pub struct Sender<T, const N: usize> { inner: *const ChannelInner<T, N> }
/// Receiving half of a bounded channel (no_std, pointer-based).
pub struct Receiver<T, const N: usize> { inner: *const ChannelInner<T, N> }
unsafe impl<T: Send, const N: usize> Send for Sender<T, N> {}
unsafe impl<T: Send, const N: usize> Sync for Sender<T, N> {}
unsafe impl<T: Send, const N: usize> Send for Receiver<T, N> {}
unsafe impl<T: Send, const N: usize> Sync for Receiver<T, N> {}
impl<T, const N: usize> Sender<T, N> {
/// Try to send a value. Returns `Err(Full(value))` if the channel is full.
pub fn try_send(&self, value: T) -> Result<(), Full<T>> { unsafe { &*self.inner }.try_send(value) }
/// Number of items in the channel.
pub fn len(&self) -> usize { unsafe { &*self.inner }.len() }
/// Returns `true` if the channel is empty.
pub fn is_empty(&self) -> bool { unsafe { &*self.inner }.is_empty() }
/// Returns `true` if the channel is full.
pub fn is_full(&self) -> bool { unsafe { &*self.inner }.is_full() }
}
impl<T, const N: usize> Clone for Sender<T, N> { fn clone(&self) -> Self { Self { inner: self.inner } } }
impl<T, const N: usize> Receiver<T, N> {
/// Try to receive a value. Returns `None` if the channel is empty.
pub fn try_recv(&self) -> Option<T> { unsafe { &*self.inner }.try_recv() }
/// Number of items in the channel.
pub fn len(&self) -> usize { unsafe { &*self.inner }.len() }
/// Returns `true` if the channel is empty.
pub fn is_empty(&self) -> bool { unsafe { &*self.inner }.is_empty() }
/// Returns `true` if the channel is full.
pub fn is_full(&self) -> bool { unsafe { &*self.inner }.is_full() }
}
impl<T, const N: usize> Clone for Receiver<T, N> { fn clone(&self) -> Self { Self { inner: self.inner } } }
impl<T, const N: usize> core::fmt::Debug for Sender<T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("Sender").field("capacity", &N).finish() }
}
impl<T, const N: usize> core::fmt::Debug for Receiver<T, N> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { f.debug_struct("Receiver").field("capacity", &N).finish() }
}
/// Create a bounded channel. Panics in no_std without alloc — use static storage.
pub fn bounded_channel<T, const N: usize>() -> (Sender<T, N>, Receiver<T, N>) {
panic!("bounded_channel requires alloc feature or static storage in no_std mode")
}
}
pub use shared::{bounded_channel, Receiver, Sender};
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn channel_send_recv() {
let (tx, rx) = bounded_channel::<u32, 4>();
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
assert_eq!(rx.try_recv(), Some(1));
assert_eq!(rx.try_recv(), Some(2));
assert_eq!(rx.try_recv(), None);
}
#[test]
fn channel_full() {
let (tx, rx) = bounded_channel::<u32, 2>();
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
assert!(tx.is_full());
let err = tx.try_send(3).unwrap_err();
assert_eq!(err.into_inner(), 3);
// After receiving, can send again
rx.try_recv().unwrap();
tx.try_send(3).unwrap();
}
#[test]
fn channel_empty() {
let (_tx, rx) = bounded_channel::<u32, 4>();
assert!(rx.is_empty());
assert_eq!(rx.try_recv(), None);
}
#[test]
fn channel_len() {
let (tx, rx) = bounded_channel::<u32, 8>();
assert_eq!(tx.len(), 0);
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
assert_eq!(tx.len(), 2);
assert_eq!(rx.len(), 2);
rx.try_recv().unwrap();
assert_eq!(tx.len(), 1);
}
#[test]
fn channel_fifo_order() {
let (tx, rx) = bounded_channel::<u32, 8>();
for i in 0..5 {
tx.try_send(i).unwrap();
}
for i in 0..5 {
assert_eq!(rx.try_recv(), Some(i));
}
}
#[test]
fn channel_clone_sender() {
let (tx, rx) = bounded_channel::<u32, 8>();
let tx2 = tx.clone();
tx.try_send(1).unwrap();
tx2.try_send(2).unwrap();
let a = rx.try_recv().unwrap();
let b = rx.try_recv().unwrap();
assert!((a == 1 && b == 2) || (a == 2 && b == 1));
}
#[test]
fn channel_clone_receiver() {
let (tx, rx) = bounded_channel::<u32, 8>();
let rx2 = rx.clone();
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
// Each item is received by exactly one receiver
let a = rx.try_recv();
let b = rx2.try_recv();
assert!(a.is_some() || b.is_some());
}
#[test]
fn channel_wrap_around() {
let (tx, rx) = bounded_channel::<u32, 4>();
// Fill and drain multiple times to exercise wrap-around
for round in 0..3 {
for i in 0..4 {
tx.try_send(round * 4 + i).unwrap();
}
for i in 0..4 {
assert_eq!(rx.try_recv(), Some(round * 4 + i));
}
}
}
#[test]
fn channel_zero_capacity() {
let (tx, _rx) = bounded_channel::<u32, 0>();
assert!(tx.try_send(1).is_err());
}
#[test]
fn channel_drop_with_items() {
use core::sync::atomic::{AtomicU32, Ordering};
static DROP_COUNT: AtomicU32 = AtomicU32::new(0);
#[derive(Debug)]
struct Droppable(u32);
impl Drop for Droppable {
fn drop(&mut self) {
DROP_COUNT.fetch_add(1, Ordering::Relaxed);
}
}
DROP_COUNT.store(0, Ordering::Relaxed);
{
let (tx, _rx) = bounded_channel::<Droppable, 8>();
tx.try_send(Droppable(1)).unwrap();
tx.try_send(Droppable(2)).unwrap();
tx.try_send(Droppable(3)).unwrap();
// Drop channel with 3 items still in it
}
assert_eq!(DROP_COUNT.load(Ordering::Relaxed), 3);
}
#[test]
fn channel_debug() {
let (tx, rx) = bounded_channel::<u32, 8>();
tx.try_send(1).unwrap();
let s = alloc::format!("{:?}", tx);
assert!(s.contains("Sender"));
let s = alloc::format!("{:?}", rx);
assert!(s.contains("Receiver"));
}
#[test]
fn full_into_inner() {
let f = Full(42u32);
assert_eq!(f.into_inner(), 42);
}
}
rs/core/src/channel.rs
π 0.0%
//! Bounded MPMC (multi-producer, multi-consumer) channel.
//!
//! Lock-free ring buffer with atomic head/tail pointers.
//! Capacity is a const generic, consistent with the compile-time
//! bounds philosophy.
//!
//! ```ignore
//! let (tx, rx) = bounded_channel::<Transaction, 1000>();
//! tx.try_send(msg).ok();
//! let msg = rx.try_recv();
//! ```
use UnsafeCell;
use MaybeUninit;
use ;
/// Error returned when a channel is full.
;
/// Internal shared state for the channel.
///
/// Uses a ring buffer with power-of-two sizing for efficient modular indexing.
/// The actual usable capacity is `N` slots; the buffer is `N` entries.
///
/// Head and tail are monotonically increasing indices. Wrapping is handled
/// by masking or modular arithmetic on access.
// SAFETY: The ring buffer slots are accessed at distinct indices by
// producers (head) and consumers (tail). The atomic operations ensure
// that no two threads access the same slot simultaneously in a conflicting way.
unsafe
unsafe
// =============================================================================
// Shared ownership via manual reference counting
// =============================================================================
// We use a raw pointer to a heap-allocated ChannelInner for shared ownership.
// In no_std without alloc, we provide a stack-based alternative.
/// Wrapper around the shared channel state.
///
/// In no_std + no alloc mode, the channel must be created with a static
/// or caller-managed backing store. For simplicity and correctness, we
/// always use the inner struct directly with reference counting.
/// no_std without alloc: pointer-based channel backed by caller-managed storage.
pub use ;