Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
fix: handle end of events betterer in CiEventSource
Lars Wirzenius committed 1 year ago
commit 5483acf427c9e611f7b3f31a070d4273a5c09cf8
parent 0ac3b62301d5779b6077c16c202120b7f6bdc35a
3 files changed +46 -19
modified src/ci_event_source.rs
@@ -21,11 +21,12 @@ impl CiEventSource {
        Ok(source)
    }

-
    pub fn event(&mut self) -> Result<Vec<CiEvent>, CiEventSourceError> {
+
    pub fn event(&mut self) -> Result<Option<Vec<CiEvent>>, CiEventSourceError> {
        let result = self.source.node_event();
+
        logger::debug2(format!("ci_event_source: result={result:?}"));
        match &result {
            Err(NodeEventError::BrokenConnection) => {
-
                logger::event_disconnected();
+
                logger::ci_event_source_disconnected();
                Err(CiEventSourceError::BrokenConnection(result.unwrap_err()))
            }
            Err(err) => {
@@ -33,10 +34,15 @@ impl CiEventSource {
                Err(CiEventSourceError::NodeEventError(result.unwrap_err()))
            }
            Ok(None) => {
-
                logger::event_end();
-
                Ok(vec![])
+
                logger::ci_event_source_end();
+
                Ok(None)
+
            }
+
            Ok(Some(event)) => {
+
                let ci_events =
+
                    CiEvent::from_node_event(event).map_err(CiEventSourceError::CiEvent)?;
+
                logger::ci_event_source_got_events(&ci_events);
+
                Ok(Some(ci_events))
            }
-
            Ok(Some(event)) => CiEvent::from_node_event(event).map_err(CiEventSourceError::CiEvent),
        }
    }
}
modified src/logger.rs
@@ -104,11 +104,25 @@ pub fn ci_event_source_created(source: &CiEventSource) {
    );
}

-
pub fn ci_event_source_got_event(event: &CiEvent) {
+
pub fn ci_event_source_got_events(events: &[CiEvent]) {
    info!(
        slog_scope::logger(),
-
        "CI event source received event";
-
        "ci_event" => format!("{event:#?}")
+
        "CI event source received events";
+
        "ci_events" => format!("{events:#?}")
+
    );
+
}
+

+
pub fn ci_event_source_disconnected() {
+
    info!(
+
        slog_scope::logger(),
+
        "CI event source received disconnection"
+
    );
+
}
+

+
pub fn ci_event_source_end() {
+
    info!(
+
        slog_scope::logger(),
+
        "CI event source was notified end of events"
    );
}

@@ -183,7 +197,7 @@ pub fn queueadd_push_event(e: &CiEvent) {
}

pub fn queueadd_end() {
-
    info!(slog_scope::logger(), "start thread to process events ends");
+
    info!(slog_scope::logger(), "thread to process events ends");
}

pub fn pages_directory_unset() {
modified src/queueadd.rs
@@ -64,16 +64,23 @@ impl QueueAdder {
        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.
        'event_loop: loop {
-
            let events = source.event()?;
-
            if events.is_empty() {
-
                logger::queueadd_control_socket_close();
-
                break 'event_loop;
-
            } else {
-
                for e in events {
-
                    for filter in self.filters.iter() {
-
                        if filter.allows(&e) {
-
                            logger::queueadd_push_event(&e);
-
                            self.push_event(e.clone())?;
+
            let events = source.event();
+
            logger::debug2(format!("queueadd: events={events:?}"));
+
            match events {
+
                Err(e) => {
+
                    logger::queueadd_control_socket_close();
+
                    return Err(e.into());
+
                }
+
                Ok(None) => {
+
                    break 'event_loop;
+
                }
+
                Ok(Some(events)) => {
+
                    for e in events {
+
                        for filter in self.filters.iter() {
+
                            if filter.allows(&e) {
+
                                logger::queueadd_push_event(&e);
+
                                self.push_event(e.clone())?;
+
                            }
                        }
                    }
                }