Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add a "pull queue" data type
Lars Wirzenius committed 1 year ago
commit c7dbff55e5c706aa38adca054c014bb17046eb45
parent 5ce95bc360c4f20b2f21952667e7b9be818e876a
2 files changed +148 -0
modified src/lib.rs
@@ -19,6 +19,7 @@ pub mod msg;
pub mod node_event_source;
pub mod notif;
pub mod pages;
+
pub mod pull_queue;
pub mod queueadd;
pub mod queueproc;
pub mod refs;
added src/pull_queue.rs
@@ -0,0 +1,147 @@
+
use std::sync::{Arc, Condvar, Mutex};
+

+
/// A pull queue.
+
///
+
/// A producer produces an item when a consumer is ready for it, not
+
/// before. There can be any number of consumers.
+
///
+
/// This type has interior mutability. Each thread should own
+
/// their clone of this, and all clones refer to the same queue.
+
///
+
/// # Example
+
/// ```
+
/// # use std::thread::spawn;
+
/// # use radicle_ci_broker::pull_queue::PullQueue;
+
/// let mut queue = PullQueue::new();
+
/// let mut clone = queue.clone();
+
/// let producer = spawn(move || {
+
///     for i in 0..10 {
+
///         queue.push(i);
+
///     }
+
///     queue.end();
+
/// });
+
/// let consumer = spawn(move ||
+
///     while let Ok(Some(i)) = clone.pop() {
+
///         println!("{i}");
+
///     }
+
/// );
+
/// producer.join().unwrap();
+
/// consumer.join().unwrap();
+
/// ```
+
pub struct PullQueue<T> {
+
    q: Arc<(Mutex<Unlocked<T>>, Condvar)>,
+
}
+

+
impl<T> Clone for PullQueue<T> {
+
    fn clone(&self) -> Self {
+
        Self { q: self.q.clone() }
+
    }
+
}
+

+
impl<T> PullQueue<T> {
+
    /// Create a new [`PullQueue`].
+
    // No `Default` for this type, because we don't want to require
+
    // the type T to implement that.
+
    #[allow(clippy::new_without_default)]
+
    pub fn new() -> Self {
+
        Self {
+
            q: Arc::new((Mutex::new(Unlocked::new()), Condvar::new())),
+
        }
+
    }
+

+
    /// Push a new item to the queue. This blocks until there is a
+
    /// consumer who wants the item.
+
    pub fn push(&mut self, i: T) -> Result<(), PullQueueError> {
+
        let (mutex, var) = &*self.q;
+
        let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
+
        loop {
+
            if locked.is_empty() {
+
                locked.set(i);
+
                var.notify_all();
+
                break;
+
            } else {
+
                // Wait for the previous item to be consumed.
+
                locked = var.wait(locked).map_err(|_| PullQueueError::Mutex)?;
+
            }
+
        }
+
        Ok(())
+
    }
+

+
    /// As producer, tell queue there no more items can be produced.
+
    /// This will call [`Self::pop`] to return `None` when all pushed
+
    /// items have been consumed.
+
    pub fn end(&mut self) -> Result<(), PullQueueError> {
+
        let (mutex, var) = &*self.q;
+
        let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
+
        locked.end();
+
        var.notify_all();
+
        Ok(())
+
    }
+

+
    /// Consume an item, if one is available. This will block until an
+
    /// item is available or not more items will ever be produced.
+
    /// Return `None` when no more items will ever be produced.
+
    pub fn pop(&mut self) -> Result<Option<T>, PullQueueError> {
+
        let (mutex, var) = &*self.q;
+
        let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
+

+
        loop {
+
            if locked.is_ended() {
+
                // Queue is empty and won't get anything more. We're
+
                // done.
+
                var.notify_all();
+
                return Ok(None);
+
            } else if let Some(i) = locked.get() {
+
                // There was something in the queue, return it.
+
                var.notify_all();
+
                return Ok(Some(i));
+
            } else {
+
                locked = var.wait(locked).map_err(|_| PullQueueError::Mutex)?;
+
            }
+
        }
+
    }
+
}
+

+
struct Unlocked<T> {
+
    ended: bool,
+
    item: Option<T>,
+
}
+

+
impl<T> Unlocked<T> {
+
    fn new() -> Self {
+
        Self {
+
            ended: false,
+
            item: None,
+
        }
+
    }
+

+
    fn end(&mut self) {
+
        self.ended = true;
+
    }
+

+
    fn is_ended(&self) -> bool {
+
        self.ended && self.item.is_none()
+
    }
+

+
    fn is_empty(&self) -> bool {
+
        self.item.is_none()
+
    }
+

+
    fn set(&mut self, item: T) {
+
        assert!(self.item.is_none());
+
        assert!(!self.ended);
+
        self.item = Some(item);
+
    }
+

+
    fn get(&mut self) -> Option<T> {
+
        self.item.take()
+
    }
+
}
+

+
/// All possible errors from [`PullQueue`].
+
#[derive(Debug, thiserror::Error)]
+
pub enum PullQueueError {
+
    /// Mutex locking failed.
+
    #[error("failed to lock mutex for pull queue")]
+
    Mutex,
+
}