Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
Fix crash when ref change is neither patch nor branch
Merged liw opened 1 year ago
9 files changed +111 -141 0d1d3291 6e0d8853
modified src/adapter.rs
@@ -161,10 +161,6 @@ impl Adapter {
        }

        let wait_result = child.wait().expect("FIXME");
-
        logger::debug2(format!(
-
            "wait result? {wait_result:?} status.code: {:?}",
-
            wait_result.status().code()
-
        ));
        if wait_result.timed_out() {
            logger::adapter_did_not_exit_voluntarily();
            outcome.set_error(AdapterError::Failed(NOT_EXITED));
modified src/ci_event.rs
@@ -194,9 +194,11 @@ impl CiEvent {
                            if let Ok(patch_id) = patch_id(name) {
                                Self::patch_created(origin, *rid, patch_id, *oid)
                            } else if let Ok(branch) = namespaced_from_str(name) {
-
                                let branch = branch_from_namespaced(&branch)
-
                                    .map_err(|err| CiEventError::branch_name(&branch, err))?;
-
                                Self::branch_created(origin, *rid, &branch, *oid)?
+
                                if let Ok(branch) = branch_from_namespaced(&branch) {
+
                                    Self::branch_created(origin, *rid, &branch, *oid)?
+
                                } else {
+
                                    continue;
+
                                }
                            } else {
                                continue;
                            }
@@ -206,9 +208,11 @@ impl CiEvent {
                            if let Ok(patch_id) = patch_id(name) {
                                Self::patch_updated(origin, *rid, patch_id, *new)
                            } else if let Ok(branch) = namespaced_from_str(name) {
-
                                let branch = branch_from_namespaced(&branch)
-
                                    .map_err(|err| CiEventError::branch_name(&branch, err))?;
-
                                Self::branch_updated(origin, *rid, &branch, *new, *old)?
+
                                if let Ok(branch) = branch_from_namespaced(&branch) {
+
                                    Self::branch_updated(origin, *rid, &branch, *new, *old)?
+
                                } else {
+
                                    continue;
+
                                }
                            } else {
                                continue;
                            }
@@ -217,9 +221,11 @@ impl CiEvent {
                            let origin = originator(name.to_namespaced().unwrap())?;
                            let branch = namespaced_from_str(name)
                                .map_err(|err| CiEventError::branch_name(name, err))?;
-
                            let branch = branch_from_namespaced(&branch)
-
                                .map_err(|err| CiEventError::branch_name(&branch, err))?;
-
                            Self::branch_deleted(origin, *rid, &branch, *oid)?
+
                            if let Ok(branch) = branch_from_namespaced(&branch) {
+
                                Self::branch_deleted(origin, *rid, &branch, *oid)?
+
                            } else {
+
                                continue;
+
                            }
                        }
                        RefUpdate::Skipped { .. } => continue,
                    };
modified src/ci_event_source.rs
@@ -23,7 +23,6 @@ impl CiEventSource {

    pub fn event(&mut self) -> Result<Option<Vec<CiEvent>>, CiEventSourceError> {
        let result = self.source.node_event();
-
        logger::trace2(format!("ci_event_source: result={result:?}"));
        match result {
            Err(err) if matches!(err, NodeEventError::BrokenConnection) => {
                logger::ci_event_source_disconnected();
@@ -40,7 +39,9 @@ impl CiEventSource {
            Ok(Some(event)) => {
                let ci_events =
                    CiEvent::from_node_event(&event).map_err(CiEventSourceError::CiEvent)?;
-
                logger::ci_event_source_got_events(&ci_events);
+
                if !ci_events.is_empty() {
+
                    logger::ci_event_source_got_events(&ci_events);
+
                }
                Ok(Some(ci_events))
            }
        }
modified src/logger.rs
@@ -11,12 +11,14 @@ use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, Env
use crate::{
    adapter::Adapter,
    ci_event::CiEvent,
-
    ci_event_source::CiEventSource,
+
    ci_event_source::{CiEventSource, CiEventSourceError},
    config::Config,
    db::{QueueId, QueuedCiEvent},
    filter::EventFilter,
    msg::Request,
    node_event_source::NodeEventSource,
+
    pages::PageError,
+
    queueadd::AdderError,
    queueproc::QueueError,
    run::Run,
};
@@ -112,8 +114,6 @@ enum Id {
    AdapterStderrLine,
    AdapterTooManyMessages,

-
    AdHoc,
-

    BrokerDatabase,
    BrokerRunEnd,
    BrokerRunStart,
@@ -122,7 +122,6 @@ enum Id {
    CiEventSourceCreated,
    CiEventSourceDisconnected,
    CiEventSourceEnd,
-
    CiEventSourceEndOfFile,
    CiEventSourceGotEvents,
    CibConfig,
    CibEndFailure,
@@ -152,10 +151,12 @@ enum Id {
    QueueProcEnd,
    QueueProcFilterDecision,
    QueueProcPickedEvent,
+
    QueueProcProcessingEvent,
    QueueProcProcessedEvent,
    QueueProcQueueLength,
    QueueProcRemoveEvent,
    QueueProcStart,
+
    QueueProcTrigger,

    TimeoutLineReceiverCheckChild,
    TimeoutLineReceiverChildDisconnected,
@@ -381,15 +382,6 @@ pub fn ci_event_source_end() {
    );
}

-
pub fn ci_event_source_eof(source: &CiEventSource) {
-
    info!(
-
        msg_id = ?Id::CiEventSourceEndOfFile,
-
        kind = %Kind::Debug,
-
        ?source,
-
        "CI event source end of file"
-
    );
-
}
-

pub fn loaded_config(config: &Config) {
    debug!(
        msg_id = ?Id::CibConfig,
@@ -416,10 +408,11 @@ pub fn queueproc_start() {
    );
}

-
pub fn queueproc_end() {
+
pub fn queueproc_end(result: &Result<(), QueueError>) {
    debug!(
        msg_id = ?Id::QueueProcEnd,
        kind = %Kind::Debug,
+
        ?result,
        "thread to process events ends"
    );
}
@@ -444,6 +437,7 @@ pub fn queueproc_queue_length(len: usize) {
pub fn queueproc_filter_decision(event: &CiEvent, filter: &EventFilter, allowed: bool) {
    info!(
        msg_id = ?Id::QueueProcFilterDecision,
+
        kind = %Kind::FilterDecision,
        ?event,
        ?filter,
        ?allowed,
@@ -454,6 +448,7 @@ pub fn queueproc_filter_decision(event: &CiEvent, filter: &EventFilter, allowed:
pub fn queueproc_predicate_decision(event: &CiEvent, filter: &EventFilter, allowed: bool) {
    trace!(
        msg_id = ?Id::QueueProcFilterDecision,
+
        kind = %Kind::FilterDecision,
        ?event,
        ?filter,
        ?allowed,
@@ -472,6 +467,24 @@ pub fn queueproc_picked_event(id: &QueueId, event: &QueuedCiEvent, adapter: &Ada
    );
}

+
pub fn queueproc_processing_event(event: &CiEvent) {
+
    info!(
+
        msg_id = ?Id::QueueProcProcessingEvent,
+
        kind = %Kind::GotEvent,
+
        ?event,
+
        "processing event"
+
    );
+
}
+

+
pub fn queueproc_trigger(result: &Result<Request, QueueError>) {
+
    info!(
+
        msg_id = ?Id::QueueProcTrigger,
+
        kind = %Kind::GotEvent,
+
        ?result,
+
        "trigger request result"
+
    );
+
}
+

pub fn queueproc_processed_event(result: &Result<bool, QueueError>) {
    info!(
        msg_id = ?Id::QueueProcProcessedEvent,
@@ -516,10 +529,11 @@ pub fn queueadd_start() {
    );
}

-
pub fn queueadd_control_socket_close() {
+
pub fn queueadd_control_socket_close(error: &CiEventSourceError) {
    info!(
        msg_id = ?Id::QueueAddEndEvents,
        kind = %Kind::Debug,
+
        ?error,
        "no more events from node control socket"
    );
}
@@ -534,10 +548,11 @@ pub fn queueadd_push_event(event: &CiEvent, id: &QueueId) {
    );
}

-
pub fn queueadd_end() {
+
pub fn queueadd_end(result: &Result<(), AdderError>) {
    debug!(
        msg_id = ?Id::QueueAddEnd,
        kind = %Kind::Debug,
+
        ?result,
        "thread to process events ends"
    );
}
@@ -575,10 +590,11 @@ pub fn pages_start() {
    );
}

-
pub fn pages_end() {
+
pub fn pages_end(result: &Result<(), PageError>) {
    debug!(
        msg_id = ?Id::PagesEnd,
        kind = %Kind::Debug,
+
        ?result,
        "end page updater thread"
    );
}
@@ -984,38 +1000,6 @@ pub fn event_filter_decision(filter: &'static str, allowed: bool, reason: &str)
    );
}

-
pub fn debug(msg: &str) {
-
    debug!(
-
        msg_id = ?Id::AdHoc,
-
        kind = %Kind::Debug,
-
        "{msg}"
-
    );
-
}
-

-
pub fn debug2(msg: String) {
-
    debug!(
-
        msg_id = ?Id::AdHoc,
-
        kind = %Kind::Debug,
-
        "{msg}"
-
    );
-
}
-

-
pub fn trace(msg: &str) {
-
    trace!(
-
        msg_id = ?Id::AdHoc,
-
        kind = %Kind::Debug,
-
        "{msg}"
-
    );
-
}
-

-
pub fn trace2(msg: String) {
-
    trace!(
-
        msg_id = ?Id::AdHoc,
-
        kind = %Kind::Debug,
-
        "{msg}"
-
    );
-
}
-

pub fn error(msg: &str, e: &impl std::error::Error) {
    error!("{msg}: {e}");
    let mut e = e.source();
modified src/msg.rs
@@ -144,19 +144,15 @@ impl<'a> RequestBuilder<'a> {
    /// Create a [`Request::Trigger``] message from a [`crate::ci_event::Civet`].
    pub fn build_trigger_from_ci_event(self) -> Result<Request, MessageError> {
        fn repository(repo: &RepoId, profile: &Profile) -> Result<Repository, MessageError> {
-
            logger::trace2(format!("build trigger: look up repository {repo}"));
            let rad_repo = match profile.storage.repository(*repo) {
                Err(err) => {
-
                    logger::trace2(format!("build trigger: repo lookup result {err:?}"));
                    return Err(err)?;
                }
                Ok(rad_repo) => rad_repo,
            };

-
            logger::trace("build trigger: look up project");
            let project_info = match rad_repo.project() {
                Err(err) => {
-
                    logger::trace2(format!("build trigger: project lookup result {err:?}"));
                    return Err(err)?;
                }
                Ok(x) => x,
@@ -177,11 +173,8 @@ impl<'a> RequestBuilder<'a> {
            repo: &RepoId,
            profile: &Profile,
        ) -> Result<EventCommonFields, MessageError> {
-
            logger::trace2(format!("build trigger: create common fields for {repo}"));
-
            logger::trace("build trigger: look up repository");
            let repository = match repository(repo, profile) {
                Err(err) => {
-
                    logger::trace2(format!("build trigger: project lookup result {err:?}"));
                    return Err(err)?;
                }
                Ok(x) => x,
@@ -194,11 +187,8 @@ impl<'a> RequestBuilder<'a> {
        }

        fn author(node: &NodeId, profile: &Profile) -> Result<Author, MessageError> {
-
            logger::trace2(format!("build trigger: look up author {node}"));
            let did = Did::from(*node);
-
            let x = did_to_author(profile, &did);
-
            logger::trace2(format!("build trigger: author lookup result {x:?}"));
-
            x
+
            did_to_author(profile, &did)
        }

        fn commits(
@@ -206,24 +196,19 @@ impl<'a> RequestBuilder<'a> {
            tip: Oid,
            base: Oid,
        ) -> Result<Vec<Oid>, radicle_surf::Error> {
-
            logger::trace2(format!("build trigger: look commits {git_repo:?}"));
-
            let x = git_repo
+
            git_repo
                .history(tip)?
                .take_while(|c| if let Ok(c) = c { c.id != base } else { false })
                .map(|r| r.map(|c| c.id))
-
                .collect::<Result<Vec<Oid>, _>>();
-
            logger::trace2(format!("build trigger: revision lookup result {x:?}"));
-
            x
+
                .collect::<Result<Vec<Oid>, _>>()
        }

        fn patch_cob(
            rad_repo: &radicle::storage::git::Repository,
            patch_id: &PatchId,
        ) -> Result<radicle::cob::patch::Patch, MessageError> {
-
            logger::trace2(format!("build trigger: look patch cob {patch_id}"));
            let x = match patch::Patches::open(rad_repo) {
                Err(err) => {
-
                    logger::trace2(format!("patch repo open => {err:?}"));
                    return Err(err)?;
                }
                Ok(x) => x,
@@ -231,7 +216,6 @@ impl<'a> RequestBuilder<'a> {

            let x = match x.get(patch_id) {
                Err(err) => {
-
                    logger::trace2(format!("get patch => {err:?}"));
                    return Err(err)?;
                }
                Ok(x) => x,
@@ -245,7 +229,6 @@ impl<'a> RequestBuilder<'a> {
                Some(x) => x,
            };

-
            logger::trace2(format!("build trigger: patch cob lookup result {x:?}"));
            Ok(x)
        }

@@ -253,8 +236,7 @@ impl<'a> RequestBuilder<'a> {
            patch_cob: &radicle::cob::patch::Patch,
            author: &Author,
        ) -> Result<Vec<Revision>, MessageError> {
-
            logger::trace2(format!("build trigger: look patch revisions by {author:?}"));
-
            let x = patch_cob
+
            patch_cob
                .revisions()
                .map(|(rid, r)| {
                    Ok::<Revision, MessageError>(Revision {
@@ -266,9 +248,7 @@ impl<'a> RequestBuilder<'a> {
                        timestamp: r.timestamp().as_secs(),
                    })
                })
-
                .collect::<Result<Vec<Revision>, MessageError>>();
-
            logger::trace2(format!("build trigger: revision lookup result {x:?}"));
-
            x
+
                .collect::<Result<Vec<Revision>, MessageError>>()
        }

        fn patch_base(
@@ -276,20 +256,14 @@ impl<'a> RequestBuilder<'a> {
            patch_id: &PatchId,
            author: &Author,
        ) -> Result<Oid, MessageError> {
-
            logger::trace2(format!(
-
                "build trigger: look base commit for patch {patch_id}"
-
            ));
            let author_pk = radicle::crypto::PublicKey::from(author.id);
            let (_id, revision) = match patch_cob.latest_by(&author_pk) {
                None => {
-
                    logger::trace("build trigger: patch base lookup failed: nothing found");
                    return Err(MessageError::LatestPatchRevision(*patch_id));
                }
                Some(x) => x,
            };
-
            let base = *revision.base();
-
            logger::trace2(format!("patch base commit is {base}"));
-
            Ok(base)
+
            Ok(*revision.base())
        }

        let profile = self.profile.ok_or(MessageError::NoProfile)?;
@@ -302,7 +276,6 @@ impl<'a> RequestBuilder<'a> {
                branch,
                tip,
            })) => {
-
                logger::trace("build trigger: branch created");
                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
@@ -322,7 +295,6 @@ impl<'a> RequestBuilder<'a> {
                tip,
                old_tip,
            })) => {
-
                logger::trace("build trigger: branch updated");
                let git_repo =
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))?;
                let mut commits = commits(&git_repo, *tip, *old_tip)?;
@@ -348,7 +320,6 @@ impl<'a> RequestBuilder<'a> {
                branch,
                tip,
            })) => {
-
                logger::trace("build trigger: branch deleted");
                Ok(Request::Trigger {
                    common: common_fields(EventType::Push, repo, profile)?,
                    push: Some(PushEvent {
@@ -367,7 +338,6 @@ impl<'a> RequestBuilder<'a> {
                patch: patch_id,
                new_tip,
            })) => {
-
                logger::trace("build trigger: patch created");
                let rad_repo = profile.storage.repository(*repo)?;
                let git_repo =
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))?;
@@ -410,7 +380,6 @@ impl<'a> RequestBuilder<'a> {
                patch: patch_id,
                new_tip,
            })) => {
-
                logger::trace("build trigger: patch updated");
                let rad_repo = profile.storage.repository(*repo)?;
                let git_repo =
                    radicle_surf::Repository::open(paths::repository(&profile.storage, repo))?;
modified src/node_event_source.rs
@@ -46,7 +46,6 @@ impl NodeEventSource {
    /// A closed or broken connection to the node is not an error,
    /// it's treated as end of file.
    pub fn node_event(&mut self) -> Result<Option<Event>, NodeEventError> {
-
        logger::trace("node_event: try to get an event");
        if let Some(event) = self.events.next() {
            match event {
                Ok(event) => {
modified src/pages.rs
@@ -762,8 +762,6 @@ impl StatusPage {
        db: Db,
        once: bool,
    ) -> JoinHandle<Result<(), PageError>> {
-
        logger::pages_start();
-

        if self.dirname.is_none() {
            logger::pages_directory_unset();
        }
@@ -772,28 +770,40 @@ impl StatusPage {
        logger::pages_interval(UPDATE_INTERVAL);

        spawn(move || {
-
            'processing_loop: loop {
-
                self.update_and_write(&profile, &db)?;
-
                if once {
-
                    return Ok(());
-
                }
+
            logger::pages_start();
+
            let result = self.update_loop(run_rx, profile, db, once);
+
            logger::pages_end(&result);
+
            result
+
        })
+
    }

-
                match run_rx.wait_for_notification() {
-
                    Ok(_) => (),
-
                    Err(RecvTimeoutError::Timeout) => (),
-
                    Err(RecvTimeoutError::Disconnected) => {
-
                        logger::pages_disconnected();
-
                        break 'processing_loop;
-
                    }
+
    fn update_loop(
+
        mut self,
+
        run_rx: NotificationReceiver,
+
        profile: Profile,
+
        db: Db,
+
        once: bool,
+
    ) -> Result<(), PageError> {
+
        'processing_loop: loop {
+
            self.update_and_write(&profile, &db)?;
+
            if once {
+
                return Ok(());
+
            }
+

+
            match run_rx.wait_for_notification() {
+
                Ok(_) => (),
+
                Err(RecvTimeoutError::Timeout) => (),
+
                Err(RecvTimeoutError::Disconnected) => {
+
                    logger::pages_disconnected();
+
                    break 'processing_loop;
                }
            }
+
        }

-
            // Make sure we update reports and status JSON at least once.
-
            self.update_and_write(&profile, &db)?;
+
        // Make sure we update reports and status JSON at least once.
+
        self.update_and_write(&profile, &db)?;

-
            logger::pages_end();
-
            Ok(())
-
        })
+
        Ok(())
    }

    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
modified src/queueadd.rs
@@ -42,12 +42,15 @@ pub struct QueueAdder {

impl QueueAdder {
    pub fn add_events_in_thread(self) -> JoinHandle<Result<(), AdderError>> {
-
        spawn(move || self.add_events())
+
        spawn(move || {
+
            logger::queueadd_start();
+
            let result = self.add_events();
+
            logger::queueadd_end(&result);
+
            result
+
        })
    }

-
    pub fn add_events(&self) -> Result<(), AdderError> {
-
        logger::queueadd_start();
-

+
    fn add_events(&self) -> Result<(), AdderError> {
        let profile = Profile::load()?;

        let mut source = CiEventSource::new(&profile)?;
@@ -56,10 +59,9 @@ impl QueueAdder {
        // event from the node.
        'event_loop: loop {
            let events = source.event();
-
            logger::trace2(format!("queueadd: events={events:?}"));
            match events {
                Err(e) => {
-
                    logger::queueadd_control_socket_close();
+
                    logger::queueadd_control_socket_close(&e);
                    return Err(e.into());
                }
                Ok(None) => {
@@ -73,8 +75,6 @@ impl QueueAdder {
                }
            }
        }
-

-
        logger::queueadd_end();
        Ok(())
    }

modified src/queueproc.rs
@@ -109,15 +109,26 @@ pub struct QueueProcessor {

impl QueueProcessor {
    pub fn process_in_thread(mut self) -> JoinHandle<Result<(), QueueError>> {
-
        spawn(move || self.process_until_shutdown())
+
        spawn(move || {
+
            logger::queueproc_start();
+
            let result = self.process_until_shutdown();
+
            logger::queueproc_end(&result);
+
            result
+
        })
    }

    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
-
        logger::queueproc_start();
        let mut done = false;
        while !done {
            let mut first_error = None;
            while let Some(qe) = self.pick_event()? {
+
                // We always drop the picked event at once so that if
+
                // anything goes wrong, we don't try to process the
+
                // same event again. It's better for the failure to be
+
                // logged and let the humans deal with whatever
+
                // complicated failure situation is happening.
+
                self.drop_event(qe.id())?;
+

                match self.pick_adapters(qe.event()) {
                    Err(err) => {
                        if first_error.is_none() {
@@ -154,7 +165,6 @@ impl QueueProcessor {
            }
        }

-
        logger::queueproc_end();
        Ok(())
    }

@@ -170,7 +180,6 @@ impl QueueProcessor {
            Err(QueueError::BuildTrigger(_, _)) => done = false,
            Err(err) => Err(err)?,
        }
-
        self.drop_event(qe.id())?;
        self.run_tx.notify()?;
        Ok(done)
    }
@@ -229,8 +238,8 @@ impl QueueProcessor {
    }

    fn process_event(&mut self, event: &CiEvent, adapter: &Adapter) -> Result<bool, QueueError> {
-
        logger::debug2(format!("queproc::process_event: called; event={event:#?}"));
-
        let x = match event {
+
        logger::queueproc_processing_event(event);
+
        match event {
            CiEvent::V1(CiEventV1::Shutdown) => {
                logger::queueproc_action_shutdown();
                Ok(true)
@@ -310,24 +319,20 @@ impl QueueProcessor {
                patch: _,
                new_tip,
            }) => {
-
                logger::debug("patch updated");
                logger::queueproc_action_run(repo, new_tip);
                let trigger = RequestBuilder::default()
                    .profile(&self.profile)
                    .ci_event(event)
                    .build_trigger_from_ci_event()
                    .map_err(|e| QueueError::build_trigger(event, e));
-
                logger::debug2(format!("got trigger {trigger:?}"));
+
                logger::queueproc_trigger(&trigger);
                let trigger = trigger?;
                self.broker
                    .execute_ci(adapter, &trigger, &self.run_tx)
                    .map_err(QueueError::execute_ci)?;
-
                logger::debug("executed ci");
                Ok(false)
            }
-
        };
-
        logger::debug("queproc::process_event: end");
-
        x
+
        }
    }

    fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {