Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
refactor: use ergo module to load profile
Merged liw opened 2 months ago

refactor: CiEventSource gets a reference to a profile instead of owning it

refactor: make Worker::NAME be a static string

refactor: drop Worker::name, now unnecessary

refactor: QueueAdder holds an ergo::Radicle

This still loads the profile, because Worker needs a ’static lifetime and this is the easiest way to achieve that.

refactor: add ergo::Oid export

Use this consistently to reduce confusion between Oid types from Radicle.

17 files changed +63 -87 82ed0bc0 02e259f0
modified src/adapter.rs
@@ -496,13 +496,13 @@ mod test {

    use tempfile::{NamedTempFile, TempDir, tempdir};

-
    use radicle::git::Oid;
    use radicle::prelude::RepoId;

    use super::{Adapter, Db, Run};
    use crate::{
        adapter::AdapterError,
        cob::KnownJobCobs,
+
        ergo::Oid,
        msg::{MessageError, Response, RunId, RunResult},
        notif::NotificationChannel,
        run::{RunBuilder, Whence},
modified src/bin/cib.rs
@@ -9,12 +9,12 @@ use std::{
};

use clap::Parser;
-
use radicle::Profile;
use radicle_ci_broker::{
    adapter::AdapterError,
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
+
    ergo,
    logger::{self, LogLevel},
    notif::{NotificationChannel, NotificationError},
    pages::{PageError, StatusPage},
@@ -193,8 +193,6 @@ struct QueuedCmd {}

impl QueuedCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
-
        let profile = Profile::load().map_err(CibError::profile)?;
-

        let adapters = config.to_adapters().map_err(CibError::Adapters)?;
        logger::adapter_config(config);

@@ -232,7 +230,7 @@ impl QueuedCmd {
        let db = args.open_db(config)?;
        let mut page = StatusPage::new(
            run_notifications.rx().map_err(CibError::notification)?,
-
            profile,
+
            ergo::Radicle::new().map_err(CibError::ergo)?,
            db,
            true,
        );
@@ -274,13 +272,11 @@ impl ProcessEventsCmd {
            .map_err(CibError::QueueAdder)?;
        let adder = start_thread(adder);

-
        let profile = Profile::load().map_err(CibError::profile)?;
-

        let db = args.open_db(config)?;

        let mut page = StatusPage::new(
            run_notification.rx().map_err(CibError::notification)?,
-
            profile,
+
            ergo::Radicle::new().map_err(CibError::ergo)?,
            db,
            false,
        );
@@ -334,6 +330,9 @@ impl ProcessEventsCmd {

#[derive(Debug, thiserror::Error)]
enum CibError {
+
    #[error("failed to load ergonomic Radicle wrapper")]
+
    Ergo(#[source] ergo::ErgoError),
+

    #[error("failed to read configuration file {0}")]
    ReadConfig(PathBuf, #[source] ConfigError),

@@ -346,9 +345,6 @@ enum CibError {
    #[error("failed to convert adapters as JSON")]
    AdaptersToJson(PathBuf, #[source] AdapterError),

-
    #[error("failed to look up node profile")]
-
    Profile(#[source] radicle::profile::Error),
-

    #[error("failed to use SQLite database")]
    Db(#[source] DbError),

@@ -394,8 +390,8 @@ impl CibError {
        Self::AdaptersToJson(filename.into(), e)
    }

-
    fn profile(e: radicle::profile::Error) -> Self {
-
        Self::Profile(e)
+
    fn ergo(e: ergo::ErgoError) -> Self {
+
        Self::Ergo(e)
    }

    fn db(e: DbError) -> Self {
modified src/bin/cibtool.rs
@@ -18,12 +18,13 @@ use std::{

use clap::Parser;

-
use radicle::{Profile, git::Oid, prelude::NodeId};
+
use radicle::prelude::NodeId;

use radicle_ci_broker::{
    broker::BrokerError,
    ci_event::{CiEvent, CiEventError, CiEventV1},
    db::{Db, DbError, QueueId, QueuedCiEvent},
+
    ergo::{self, Oid},
    logger,
    msg::{RunId, RunResult},
    notif::{NotificationChannel, NotificationError},
@@ -250,12 +251,12 @@ enum RunSubCmd {

#[derive(Debug, thiserror::Error)]
enum CibToolError {
+
    #[error("failed to load ergonomic Radicle wrapper")]
+
    Ergo(#[source] ergo::ErgoError),
+

    #[error("failed to create cache of job COBs")]
    KnownJobCobs(#[source] radicle_ci_broker::cob::JobError),

-
    #[error("failed to look up node profile")]
-
    Profile(#[source] radicle::profile::Error),
-

    #[error("cannot find CI run with id {0}")]
    RunNotFound(RunId),

modified src/bin/cibtoolcmd/cob.rs
@@ -2,9 +2,9 @@ use std::sync::{Arc, Mutex};

use url::Url;

-
use radicle::{git::Oid, identity::RepoId};
+
use radicle::identity::RepoId;

-
use radicle_ci_broker::cob::*;
+
use radicle_ci_broker::{cob::*, ergo::Oid};

use super::*;

modified src/bin/cibtoolcmd/event.rs
@@ -130,8 +130,8 @@ impl Leaf for AddEvent {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
        let r = ergo::Radicle::new().map_err(EventError::Ergonomic)?;

-
        let profile = r.profile();
-
        let nid = *profile.id();
+
        let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;
+
        let nid = *radicle.profile().id();

        let repo = r
            .repository_by_name(&self.repo)
@@ -365,8 +365,9 @@ pub struct RecordEvents {

impl Leaf for RecordEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
-
        let profile = util::load_profile()?;
-
        let mut source = NodeEventSource::new(&profile).map_err(CibToolError::EventSubscribe)?;
+
        let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;
+
        let mut source =
+
            NodeEventSource::new(radicle.profile()).map_err(CibToolError::EventSubscribe)?;
        let mut file = if let Some(filename) = &self.output {
            Some(
                std::fs::File::create(filename)
@@ -419,7 +420,7 @@ pub struct CiEvents {

impl Leaf for CiEvents {
    fn run(&self, _args: &Args) -> Result<(), CibToolError> {
-
        let profile = Profile::load().map_err(CibToolError::Profile)?;
+
        let radicle = ergo::Radicle::new().map_err(CibToolError::Ergo)?;

        let bytes = std::fs::read(&self.input)
            .map_err(|e| CibToolError::ReadEvents(self.input.clone(), e))?;
@@ -435,7 +436,7 @@ impl Leaf for CiEvents {

        let mut ci_events: Vec<CiEvent> = vec![];
        for node_event in node_events.iter() {
-
            if let Ok(mut cevs) = CiEvent::from_node_event(node_event, &profile) {
+
            if let Ok(mut cevs) = CiEvent::from_node_event(node_event, radicle.profile()) {
                ci_events.append(&mut cevs);
            }
        }
modified src/bin/cibtoolcmd/report.rs
@@ -1,4 +1,4 @@
-
use radicle_ci_broker::{pages::StatusPage, worker::start_thread};
+
use radicle_ci_broker::{ergo, pages::StatusPage, worker::start_thread};

use super::*;

@@ -18,14 +18,12 @@ pub struct ReportCmd {

impl Leaf for ReportCmd {
    fn run(&self, args: &Args) -> Result<(), CibToolError> {
-
        let profile = Profile::load().map_err(CibToolError::Profile)?;
-

        let db = args.open_db()?;

        let mut run_notification = NotificationChannel::new_run();
        let mut page = StatusPage::new(
            run_notification.rx().map_err(CibToolError::Notification)?,
-
            profile,
+
            ergo::Radicle::new().map_err(CibToolError::Ergo)?,
            db,
            true,
        );
modified src/ci_event_source.rs
@@ -8,15 +8,15 @@ use crate::{
    node_event_source::{NodeEventError, NodeEventSource},
};

-
pub struct CiEventSource {
+
pub struct CiEventSource<'a> {
    source: NodeEventSource,
-
    profile: Profile,
+
    profile: &'a Profile,
}

-
impl CiEventSource {
-
    pub fn new(profile: Profile) -> Result<Self, CiEventSourceError> {
+
impl<'a> CiEventSource<'a> {
+
    pub fn new(profile: &'a Profile) -> Result<Self, CiEventSourceError> {
        let source = Self {
-
            source: NodeEventSource::new(&profile).map_err(CiEventSourceError::Subscribe)?,
+
            source: NodeEventSource::new(profile).map_err(CiEventSourceError::Subscribe)?,
            profile,
        };
        logger::ci_event_source_created(&source);
@@ -43,7 +43,7 @@ impl CiEventSource {
                Ok(None)
            }
            Ok(Some(event)) => {
-
                let ci_events = CiEvent::from_node_event(&event, &self.profile)
+
                let ci_events = CiEvent::from_node_event(&event, self.profile)
                    .map_err(CiEventSourceError::CiEvent)?;
                if !ci_events.is_empty() {
                    logger::ci_event_source_got_events(&ci_events);
@@ -54,7 +54,7 @@ impl CiEventSource {
    }
}

-
impl fmt::Debug for CiEventSource {
+
impl<'a> fmt::Debug for CiEventSource<'a> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "CiEventSource<path={:?}", self.source)
    }
modified src/cob.rs
@@ -15,7 +15,6 @@ use url::Url;
use uuid::Uuid;

use radicle::{
-
    git::Oid,
    node::{
        Handle, Node,
        sync::{Announcer, AnnouncerConfig, ReplicationFactor},
@@ -25,7 +24,7 @@ use radicle::{
};
use radicle_job::*;

-
use crate::{logger, msg::RunId};
+
use crate::{ergo::Oid, logger, msg::RunId};

/// Lookup cache for job COBs.
///
modified src/ergo.rs
@@ -9,12 +9,13 @@ use std::str::FromStr;

use radicle::{
    cob::patch::{Patch, PatchId, cache::Patches},
-
    git::Oid,
    identity::{Project, RepoId},
    profile::Profile,
    storage::{ReadStorage, RepositoryInfo, git::Repository},
};

+
pub use radicle::git::Oid;
+

/// A Radicle node.
///
/// This type represents a Radicle node, and exists to cache some
modified src/logger.rs
@@ -8,7 +8,7 @@ use tracing::{Level, debug, error, info, trace, warn};
use tracing_subscriber::{EnvFilter, fmt, layer::SubscriberExt, util::SubscriberInitExt};
use uuid::Uuid;

-
use radicle::{git::Oid, identity::RepoId, node::Event, patch::PatchId};
+
use radicle::{identity::RepoId, node::Event, patch::PatchId};
use radicle_job::{JobId, Reason};

use crate::{
@@ -17,6 +17,7 @@ use crate::{
    ci_event_source::{CiEventSource, CiEventSourceError},
    config::Config,
    db::{QueueId, QueuedCiEvent},
+
    ergo::Oid,
    filter::{Decision, EventFilter},
    msg::{Request, RunId},
    node_event_source::NodeEventSource,
modified src/msg.rs
@@ -29,13 +29,13 @@ use radicle::{
};
pub use radicle::{
    cob::patch::PatchId,
-
    git::Oid,
    prelude::{NodeId, RepoId},
};
pub use radicle_surf::Commit;

use crate::{
    ci_event::{CiEvent, CiEventV1},
+
    ergo::Oid,
    logger,
};

modified src/pages.rs
@@ -20,7 +20,6 @@ use time::{OffsetDateTime, macros::format_description};

use radicle::{
    Profile,
-
    git::Oid,
    prelude::RepoId,
    storage::{ReadRepository, ReadStorage},
};
@@ -28,6 +27,7 @@ use radicle::{
use crate::{
    ci_event::{CiEvent, CiEventV1},
    db::{Db, DbError, QueuedCiEvent},
+
    ergo::{self, Oid},
    logger,
    msg::{RunId, RunResult},
    notif::NotificationReceiver,
@@ -863,7 +863,7 @@ pub struct StatusPage {

struct PageArgs {
    run_rx: NotificationReceiver,
-
    profile: Profile,
+
    radicle: ergo::Radicle,
    db: Db,
    once: bool,
    desc_snippet: Option<String>,
@@ -874,13 +874,13 @@ impl StatusPage {
        self.dirname = Some(dirname.into());
    }

-
    pub fn new(run_rx: NotificationReceiver, profile: Profile, db: Db, once: bool) -> Self {
+
    pub fn new(run_rx: NotificationReceiver, radicle: ergo::Radicle, db: Db, once: bool) -> Self {
        Self {
            node_alias: "".into(),
            dirname: None,
            args: PageArgs {
                run_rx,
-
                profile,
+
                radicle,
                db,
                once,
                desc_snippet: None,
@@ -935,7 +935,7 @@ impl StatusPage {
                        | CiEvent::V1(CiEventV1::BranchUpdated { repo, .. })
                        | CiEvent::V1(CiEventV1::PatchCreated { repo, .. })
                        | CiEvent::V1(CiEventV1::PatchUpdated { repo, .. }) => {
-
                            if Self::is_public_repo(&self.args.profile, repo) {
+
                            if Self::is_public_repo(self.args.radicle.profile(), repo) {
                                Some(Ok(event))
                            } else {
                                None
modified src/queueadd.rs
@@ -1,10 +1,8 @@
-
use radicle::Profile;
-

use crate::{
    ci_event::CiEvent,
    ci_event_source::{CiEventSource, CiEventSourceError},
    db::{Db, DbError, QueueId},
-
    logger,
+
    ergo, logger,
    notif::NotificationSender,
    worker::Worker,
};
@@ -20,6 +18,7 @@ impl QueueAdderBuilder {
        Ok(QueueAdder {
            db: self.db.ok_or(AdderError::Missing("db"))?,
            events_tx: self.events_tx.ok_or(AdderError::Missing("events_tx"))?,
+
            radicle: ergo::Radicle::new().map_err(AdderError::Ergo)?,
        })
    }

@@ -37,13 +36,12 @@ impl QueueAdderBuilder {
pub struct QueueAdder {
    db: Db,
    events_tx: NotificationSender,
+
    radicle: ergo::Radicle,
}

impl QueueAdder {
    fn add_events(&self) -> Result<(), AdderError> {
-
        let profile = Profile::load().map_err(AdderError::profile)?;
-

-
        let mut source = CiEventSource::new(profile)?;
+
        let mut source = CiEventSource::new(self.radicle.profile())?;

        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.
@@ -76,7 +74,7 @@ impl QueueAdder {
}

impl Worker for QueueAdder {
-
    const NAME: &str = "queue-adder";
+
    const NAME: &'static str = "queue-adder";
    type Error = AdderError;

    fn work(&mut self) -> Result<(), Self::Error> {
@@ -89,8 +87,8 @@ pub enum AdderError {
    #[error("programming error: QueueAdderBuilder field {0} was not set")]
    Missing(&'static str),

-
    #[error(transparent)]
-
    Profile(#[from] Box<radicle::profile::Error>),
+
    #[error("failed to load ergonomic Radicle wrapper")]
+
    Ergo(#[source] ergo::ErgoError),

    #[error(transparent)]
    CiEvent(#[from] CiEventSourceError),
@@ -101,9 +99,3 @@ pub enum AdderError {
    #[error("failed to notify other thread about database change")]
    Send,
}
-

-
impl AdderError {
-
    fn profile(err: radicle::profile::Error) -> Self {
-
        Self::Profile(Box::new(err))
-
    }
-
}
modified src/run.rs
@@ -2,11 +2,13 @@ use std::fmt;

use serde::{Deserialize, Serialize};

-
use radicle::git::Oid;
use radicle::prelude::RepoId;
use radicle_job::JobId;

-
use crate::msg::{Revision, RunId, RunResult};
+
use crate::{
+
    ergo::Oid,
+
    msg::{Revision, RunId, RunResult},
+
};

#[derive(Debug, Default)]
pub struct RunBuilder {
modified src/subplot.rs
@@ -10,7 +10,6 @@ use std::{
};

use radicle::{
-
    git::Oid,
    git::RefString,
    node::{Event, NodeId},
    prelude::RepoId,
@@ -20,6 +19,8 @@ use radicle::{
use subplotlib::steplibrary::datadir::Datadir;
use subplotlib::steplibrary::runcmd::Runcmd;

+
use radicle_ci_broker::ergo::Oid;
+

#[derive(Debug, Default)]
struct SubplotContext {}

modified src/util.rs
@@ -17,11 +17,12 @@ use time::{
use radicle::{
    Profile, Storage,
    cob::ObjectId,
-
    git::Oid,
    prelude::{NodeId, RepoId},
    storage::ReadStorage,
};

+
use crate::ergo::Oid;
+

pub fn lookup_repo(profile: &Profile, wanted: &str) -> Result<(RepoId, String), UtilError> {
    let storage = Storage::open(profile.storage(), profile.info()).map_err(UtilError::storage)?;

@@ -73,10 +74,6 @@ pub fn oid_from_cli_arg(profile: &Profile, rid: RepoId, commit: &str) -> Result<
    }
}

-
pub fn load_profile() -> Result<Profile, UtilError> {
-
    Profile::load().map_err(UtilError::profile)
-
}
-

pub fn lookup_nid(profile: &Profile) -> Result<NodeId, UtilError> {
    Ok(*profile.id())
}
@@ -160,9 +157,6 @@ pub fn safely_overwrite<P: AsRef<Path>>(filename: P, data: &[u8]) -> Result<(),

#[derive(Debug, thiserror::Error)]
pub enum UtilError {
-
    #[error("failed to look up node profile")]
-
    Profile(#[source] Box<radicle::profile::Error>),
-

    #[error("failed to look up open node storage")]
    Storage(#[source] Box<radicle::storage::Error>),

@@ -216,10 +210,6 @@ pub enum UtilError {
}

impl UtilError {
-
    fn profile(err: radicle::profile::Error) -> Self {
-
        Self::Profile(Box::new(err))
-
    }
-

    fn storage(err: radicle::storage::Error) -> Self {
        Self::Storage(Box::new(err))
    }
modified src/worker.rs
@@ -10,29 +10,23 @@ use crate::logger;

/// Start a new thread. Caller must catch the thread handle and
/// join it to wait for thread to end.
-
pub fn start_thread<W: Worker>(mut o: W) -> JoinHandle<Result<(), W::Error>> {
-
    let name = o.name();
+
pub fn start_thread<W: Worker + 'static>(mut o: W) -> JoinHandle<Result<(), W::Error>> {
    spawn(move || {
-
        logger::worker_start(&name);
+
        logger::worker_start(W::NAME);
        let result = o.work();
-
        logger::worker_end(&name, &result);
+
        logger::worker_end(W::NAME, &result);
        result
    })
}

/// A worker thread.
-
pub trait Worker: Send + 'static {
+
pub trait Worker: Send {
    /// Name of thread, or kind of thread. Used for logging only.
-
    const NAME: &str;
+
    const NAME: &'static str;

    /// Type of error from this worker.
    type Error: std::error::Error + Send;

    /// Do the work the thread is supposed to do.
    fn work(&mut self) -> Result<(), Self::Error>;
-

-
    /// Return name of thread as an owned string.
-
    fn name(&self) -> String {
-
        Self::NAME.to_string()
-
    }
}