| - |
use std::sync::{Arc, Condvar, Mutex};
|
| - |
|
| - |
/// A pull queue.
|
| - |
///
|
| - |
/// A producer produces an item when a consumer is ready for it, not
|
| - |
/// before. There can be any number of consumers.
|
| - |
///
|
| - |
/// This type has interior mutability. Each thread should own
|
| - |
/// their clone of this, and all clones refer to the same queue.
|
| - |
///
|
| - |
/// # Example
|
| - |
/// ```
|
| - |
/// # use std::thread::spawn;
|
| - |
/// # use radicle_ci_broker::pull_queue::PullQueue;
|
| - |
/// let mut queue = PullQueue::new();
|
| - |
/// let mut clone = queue.clone();
|
| - |
/// let producer = spawn(move || {
|
| - |
/// for i in 0..10 {
|
| - |
/// queue.push(i);
|
| - |
/// }
|
| - |
/// });
|
| - |
/// let consumer = spawn(move ||
|
| - |
/// while let Ok(Some(i)) = clone.pop() {
|
| - |
/// println!("{i}");
|
| - |
/// }
|
| - |
/// );
|
| - |
/// producer.join().unwrap();
|
| - |
/// consumer.join().unwrap();
|
| - |
/// ```
|
| - |
pub struct PullQueue<T> {
|
| - |
q: Arc<(Mutex<Unlocked<T>>, Condvar)>,
|
| - |
}
|
| - |
|
| - |
impl<T> Clone for PullQueue<T> {
|
| - |
fn clone(&self) -> Self {
|
| - |
Self { q: self.q.clone() }
|
| - |
}
|
| - |
}
|
| - |
|
| - |
impl<T: Clone> Default for PullQueue<T> {
|
| - |
fn default() -> Self {
|
| - |
Self::new()
|
| - |
}
|
| - |
}
|
| - |
|
| - |
impl<T> Drop for PullQueue<T> {
|
| - |
fn drop(&mut self) {
|
| - |
let (mutex, var) = &*self.q;
|
| - |
let mut locked = mutex
|
| - |
.lock()
|
| - |
.map_err(|_| PullQueueError::Mutex)
|
| - |
.expect("FATAL: mutex was poisoned");
|
| - |
locked.end();
|
| - |
var.notify_all();
|
| - |
}
|
| - |
}
|
| - |
|
| - |
impl<T: Clone> PullQueue<T> {
|
| - |
/// Create a new [`PullQueue`].
|
| - |
pub fn new() -> Self {
|
| - |
Self {
|
| - |
q: Arc::new((Mutex::new(Unlocked::new()), Condvar::new())),
|
| - |
}
|
| - |
}
|
| - |
|
| - |
/// Push a new item to the queue. This blocks until there is a
|
| - |
/// consumer who wants the item.
|
| - |
pub fn push(&mut self, i: T) -> Result<(), PullQueueError> {
|
| - |
let (mutex, var) = &*self.q;
|
| - |
let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
|
| - |
loop {
|
| - |
if locked.try_set(i.clone()) {
|
| - |
var.notify_all();
|
| - |
break;
|
| - |
} else {
|
| - |
// Wait for the previous item to be consumed.
|
| - |
locked = var.wait(locked).map_err(|_| PullQueueError::Mutex)?;
|
| - |
}
|
| - |
}
|
| - |
Ok(())
|
| - |
}
|
| - |
|
| - |
/// Consume an item, if one is available. This will block until an
|
| - |
/// item is available or not more items will ever be produced.
|
| - |
/// Return `None` when no more items will ever be produced.
|
| - |
pub fn pop(&mut self) -> Result<Option<T>, PullQueueError> {
|
| - |
let (mutex, var) = &*self.q;
|
| - |
let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
|
| - |
|
| - |
loop {
|
| - |
if locked.is_ended() {
|
| - |
// Queue is empty and won't get anything more. We're
|
| - |
// done.
|
| - |
var.notify_all();
|
| - |
return Ok(None);
|
| - |
} else if let Some(i) = locked.get() {
|
| - |
// There was something in the queue, return it.
|
| - |
var.notify_all();
|
| - |
return Ok(Some(i));
|
| - |
} else {
|
| - |
locked = var.wait(locked).map_err(|_| PullQueueError::Mutex)?;
|
| - |
}
|
| - |
}
|
| - |
}
|
| - |
}
|
| - |
|
| - |
struct Unlocked<T> {
|
| - |
ended: bool,
|
| - |
item: Option<T>,
|
| - |
}
|
| - |
|
| - |
impl<T> Unlocked<T> {
|
| - |
fn new() -> Self {
|
| - |
Self {
|
| - |
ended: false,
|
| - |
item: None,
|
| - |
}
|
| - |
}
|
| - |
|
| - |
fn end(&mut self) {
|
| - |
self.ended = true;
|
| - |
}
|
| - |
|
| - |
fn is_ended(&self) -> bool {
|
| - |
self.ended && self.item.is_none()
|
| - |
}
|
| - |
|
| - |
fn try_set(&mut self, item: T) -> bool {
|
| - |
if self.item.is_none() {
|
| - |
self.item = Some(item);
|
| - |
true
|
| - |
} else {
|
| - |
false
|
| - |
}
|
| - |
}
|
| - |
|
| - |
fn get(&mut self) -> Option<T> {
|
| - |
self.item.take()
|
| - |
}
|
| - |
}
|
| - |
|
| - |
/// All possible errors from [`PullQueue`].
|
| - |
#[derive(Debug, thiserror::Error)]
|
| - |
pub enum PullQueueError {
|
| - |
/// Mutex locking failed.
|
| - |
#[error("failed to lock mutex for pull queue")]
|
| - |
Mutex,
|
| - |
}
|