Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add more verbose errors and debug logging to event reading
Lars Wirzenius committed 2 years ago
commit 1a44586eac8e2d0011af775e457c09f08b0c2fbe
parent e24368765815de9a685fa01d77d84af43bd74413
1 file changed +123 -32
modified src/event.rs
@@ -58,13 +58,32 @@ impl NodeEventSource {
    /// profile.
    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
        info!("subscribing to local node events");
-
        let node = radicle::Node::new(profile.socket());
-
        let events = node.subscribe(time::Duration::MAX)?;
-
        trace!("subscribed OK");
-
        Ok(Self {
-
            events,
-
            allowed: vec![],
-
        })
+
        let socket = profile.socket();
+
        info!(
+
            "profile socket exists? {:#?} {}",
+
            socket.display(),
+
            socket.exists()
+
        );
+
        if !socket.exists() {
+
            return Err(NodeEventError::NoControlSocket(socket));
+
        }
+
        debug!("about to Node::new()");
+
        let node = radicle::Node::new(socket.clone());
+
        debug!("{node:#?}");
+
        debug!("about to node.subscribe()");
+
        match node.subscribe(time::Duration::MAX) {
+
            Ok(events) => {
+
                trace!("subscribed OK");
+
                Ok(Self {
+
                    events,
+
                    allowed: vec![],
+
                })
+
            }
+
            Err(err) => {
+
                info!("failed to subscribe to node events");
+
                Err(NodeEventError::CannotSubscribe(socket.clone(), err))
+
            }
+
        }
    }

    /// Add an event filter for allowed events for this event source.
@@ -75,9 +94,11 @@ impl NodeEventSource {
    fn allowed(&self, event: &BrokerEvent) -> bool {
        for filter in self.allowed.iter() {
            if !event.is_allowed(filter) {
+
                trace!("event is not allowed");
                return false;
            }
        }
+
        trace!("event is allowed");
        true
    }

@@ -85,34 +106,69 @@ impl NodeEventSource {
    /// block until there is an allowed event, or until there will be
    /// no more events from this source, or there's an error.
    pub fn event(&mut self) -> Result<Vec<BrokerEvent>, NodeEventError> {
-
        trace!("getting next event from local node");
        loop {
-
            let next = self.events.next();
-
            if next.is_none() {
+
            trace!("getting next event from local node");
+
            if let Some(event) = self.events.next() {
+
                trace!("got event from local node: {event:#?}");
+
                match event {
+
                    Ok(ref event) if Self::is_shutdown_request(event) => {
+
                        info!("got shutdown request from node control socket");
+
                        return Ok(vec![BrokerEvent::Shutdown]);
+
                    }
+
                    Ok(event) => {
+
                        trace!("got node event {:#?}", event);
+
                        let mut result = vec![];
+
                        if let Some(broker_events) = BrokerEvent::from_event(&event) {
+
                            for e in broker_events {
+
                                if self.allowed(&e) {
+
                                    debug!("allowed {:#?}", e);
+
                                    result.push(e);
+
                                }
+
                            }
+
                            return Ok(result);
+
                        }
+
                        trace!("got event, but it was not allowed by filter, go to next event");
+
                    }
+
                    Err(radicle::node::Error::Io(err))
+
                        if err.kind() == std::io::ErrorKind::ConnectionReset =>
+
                    {
+
                        trace!("connection to node control socket broke");
+
                        return Err(NodeEventError::BrokenConnection);
+
                    }
+
                    Err(err) => {
+
                        return Err(NodeEventError::Node(err));
+
                    }
+
                }
+
            } else {
+
                trace!("no more node events from control socket: iterator ended");
                return Err(NodeEventError::BrokenConnection);
            }
+
        }
+
    }

-
            let event = next.unwrap()?;
-
            trace!("got node event {:#?}", event);
-
            let mut result = vec![];
-
            if let Some(broker_events) = BrokerEvent::from_event(&event) {
-
                for e in broker_events {
-
                    if self.allowed(&e) {
-
                        debug!("allowed {:#?}", e);
-
                        result.push(e);
-
                    }
+
    fn is_shutdown_request(event: &Event) -> bool {
+
        if let Event::RefsFetched { updated, .. } = event {
+
            if let Some(RefUpdate::Skipped { name, .. }) = updated.first() {
+
                if name == &RefString::try_from("shutdown").expect("create name") {
+
                    return true;
                }
-
                return Ok(result);
            }
-

-
            trace!("got event, but it was not allowed by filter, next event");
        }
+
        false
    }
}

/// Possible errors from accessing the local Radicle node.
#[derive(Debug, thiserror::Error)]
pub enum NodeEventError {
+
    /// Node control socket does not exist.
+
    #[error("node control socket does not exist: {0}")]
+
    NoControlSocket(PathBuf),
+

+
    /// Can't subscribe to node events.
+
    #[error("failed to subscribe to node events on socket {0}")]
+
    CannotSubscribe(PathBuf, #[source] radicle::node::Error),
+

    /// Some error from getting an event from the node.
    #[error(transparent)]
    Node(#[from] radicle::node::Error),
@@ -242,6 +298,9 @@ impl TryFrom<&str> for Filters {
/// complex events to simpler ones that only affect one ref at a time.
#[derive(Debug, Clone, Serialize)]
pub enum BrokerEvent {
+
    /// Request the CI broker shuts down in an orderly fashion.
+
    Shutdown,
+

    /// A git ref in a git repository has changed to refer to a given
    /// commit. This covers both the case of a new ref, and the case
    /// of a changed ref.
@@ -282,6 +341,11 @@ impl BrokerEvent {
            let mut events = vec![];
            for up in updated {
                match up {
+
                    RefUpdate::Skipped { name, oid }
+
                        if name.as_str() == "shutdown" && oid.is_zero() =>
+
                    {
+
                        events.push(Self::Shutdown);
+
                    }
                    RefUpdate::Created { name, oid } => {
                        events.push(Self::new(rid, name, oid, None));
                    }
@@ -299,16 +363,35 @@ impl BrokerEvent {

    /// Is this broker event allowed by a filter?
    pub fn is_allowed(&self, filter: &EventFilter) -> bool {
-
        let Self::RefChanged {
-
            rid,
-
            name,
-
            oid: _,
-
            old: _,
-
        } = self;
-
        match filter {
-
            EventFilter::Repository(wanted) => rid == wanted,
+
        trace!("is_allowed: called");
+
        let (rid, name) = match self {
+
            Self::Shutdown => return true,
+
            Self::RefChanged {
+
                rid,
+
                name,
+
                oid: _,
+
                old: _,
+
            } => (rid, name),
+
        };
+
        trace!("is_allowed: rid={rid:?}");
+
        trace!("is_allowed: name={name:?}");
+
        let allowed = match filter {
+
            EventFilter::Repository(wanted) => {
+
                trace!("is_allowed: Repository: wanted={wanted:?}");
+
                let allowed = rid == wanted;
+
                allowed
+
            }
            EventFilter::RefSuffix(wanted) => name.ends_with(wanted),
-
            EventFilter::Branch(wanted) => name.ends_with(&format!("/refs/heads/{}", wanted)),
+
            EventFilter::Branch(wanted) => {
+
                trace!("is_allowed: Branch: wanted={wanted:?}");
+
                let suffix = format!("refs/heads/{}", wanted);
+
                let abs_suffix = format!("/{}", suffix);
+
                trace!("is_allowed: Branch: suffix={suffix:?}");
+
                trace!("is_allowed: Branch: abs_suffix={suffix:?}");
+
                let is_branch = name.as_str() == suffix || name.ends_with(&abs_suffix);
+
                trace!("is_allowed: Branch: is_branch={}", is_branch);
+
                is_branch
+
            }
            EventFilter::AnyPatch => is_patch_update(name).is_some(),
            EventFilter::Patch(wanted) => is_patch_update(name) == Some(wanted),
            EventFilter::AnyPatchRef => is_patch_ref(name).is_some(),
@@ -316,11 +399,14 @@ impl BrokerEvent {
            EventFilter::And(conds) => conds.iter().all(|cond| self.is_allowed(cond)),
            EventFilter::Or(conds) => conds.iter().any(|cond| self.is_allowed(cond)),
            EventFilter::Not(conds) => !conds.iter().any(|cond| self.is_allowed(cond)),
-
        }
+
        };
+
        trace!("is_allowed: allowed={allowed}");
+
        allowed
    }

    pub fn name(&self) -> Option<&RefString> {
        match self {
+
            BrokerEvent::Shutdown => None,
            BrokerEvent::RefChanged { name, .. } => Some(name),
        }
    }
@@ -328,15 +414,20 @@ impl BrokerEvent {
    /// Extract the NID from the RefString.
    /// The RefString will start with `refs/namespaces/<nid>/...`
    pub fn nid(&self) -> Option<NodeId> {
+
        debug!("BrokerEvent::nid: name={:?}", self.name());
        if let Some(name) = self.name() {
+
            debug!("BrokerEvent::nid: Some(name)={name:?}");
            let mut parts = name.split('/');
            if let Some(nid) = parts.nth(2) {
+
                debug!("BrokerEvent::nid: nid={nid:?}");
                let parsed = nid.parse();
+
                debug!("BrokerEvent::nid: parsed={parsed:?}");
                if parsed.is_ok() {
                    return parsed.ok();
                }
            }
        }
+
        debug!("BrokerEvent::nid: nope");
        None
    }