Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
refactor(src/pull_queue.rs): drop "end" method, add Drop impl
Lars Wirzenius committed 1 year ago
commit 4577682fd689384ffb125505cea07ba8a394a960
parent 19ede6dfed7bb10d90067a558f74802fddb57d2b
2 files changed +22 -21
modified src/pull_queue.rs
@@ -18,7 +18,6 @@ use std::sync::{Arc, Condvar, Mutex};
///     for i in 0..10 {
///         queue.push(i);
///     }
-
///     queue.end();
/// });
/// let consumer = spawn(move ||
///     while let Ok(Some(i)) = clone.pop() {
@@ -38,11 +37,26 @@ impl<T> Clone for PullQueue<T> {
    }
}

+
impl<T: Clone> Default for PullQueue<T> {
+
    fn default() -> Self {
+
        Self::new()
+
    }
+
}
+

+
impl<T> Drop for PullQueue<T> {
+
    fn drop(&mut self) {
+
        let (mutex, var) = &*self.q;
+
        let mut locked = mutex
+
            .lock()
+
            .map_err(|_| PullQueueError::Mutex)
+
            .expect("FATAL: mutex was poisoned");
+
        locked.end();
+
        var.notify_all();
+
    }
+
}
+

impl<T: Clone> PullQueue<T> {
    /// Create a new [`PullQueue`].
-
    // No `Default` for this type, because we don't want to require
-
    // the type T to implement that.
-
    #[allow(clippy::new_without_default)]
    pub fn new() -> Self {
        Self {
            q: Arc::new((Mutex::new(Unlocked::new()), Condvar::new())),
@@ -66,17 +80,6 @@ impl<T: Clone> PullQueue<T> {
        Ok(())
    }

-
    /// As producer, tell queue there no more items can be produced.
-
    /// This will call [`Self::pop`] to return `None` when all pushed
-
    /// items have been consumed.
-
    pub fn end(&mut self) -> Result<(), PullQueueError> {
-
        let (mutex, var) = &*self.q;
-
        let mut locked = mutex.lock().map_err(|_| PullQueueError::Mutex)?;
-
        locked.end();
-
        var.notify_all();
-
        Ok(())
-
    }
-

    /// Consume an item, if one is available. This will block until an
    /// item is available or not more items will ever be produced.
    /// Return `None` when no more items will ever be produced.
modified src/queueproc.rs
@@ -84,7 +84,7 @@ impl QueueProcessorBuilder {
            prev_queue_len: Instant::now(),
            processors: descs,
            processed_rx: Some(processed_rx),
-
            picked_events,
+
            picked_events: Some(picked_events),
            current: CurrentlyPicked::default(),
        })
    }
@@ -140,7 +140,7 @@ pub struct QueueProcessor {
    events_rx: NotificationReceiver,
    queue_len_interval: Duration,
    prev_queue_len: Instant,
-
    picked_events: PullQueue<Picked>,
+
    picked_events: Option<PullQueue<Picked>>,
    processors: Vec<ProcessorDescription>,
    processed_rx: Option<Receiver<RepoId>>,
    current: CurrentlyPicked,
@@ -168,7 +168,6 @@ impl QueueProcessor {
            let result = self.process_until_shutdown();

            logger::queueproc_end(&result);
-
            self.picked_events.end().map_err(QueueError::PullQueue)?;

            // Wait for worker threads to terminate. This closes all
            // sender ends for results channel.
@@ -186,6 +185,7 @@ impl QueueProcessor {
    #[allow(clippy::unwrap_used)]
    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
        let mut done = false;
+
        let mut picked_events = self.picked_events.take().unwrap();
        while !done {
            // Process all events currently in the event queue, if a
            // filter allows it.
@@ -193,9 +193,7 @@ impl QueueProcessor {
                if let Some(qe) = self.pick_event()? {
                    let picked = Picked::new(qe.clone());
                    self.current.insert(picked.qe.event().repository());
-
                    self.picked_events
-
                        .push(picked)
-
                        .map_err(QueueError::PullQueue)?;
+
                    picked_events.push(picked).map_err(QueueError::PullQueue)?;

                    // We always drop the picked event as soon as it has
                    // been pushed to a processing thread, so that if