Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
Simplify use of adapters, given there is only the default
Merged liw opened 1 year ago

This makes it simpler to implement the per-change adapter later by clearing away some existing mess.

3 files changed +51 -126 c89b471f 2e37eb04
modified src/bin/cib.rs
@@ -13,7 +13,6 @@ use clap::Parser;
use radicle::Profile;

use radicle_ci_broker::{
-
    adapter::Adapter,
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
@@ -173,18 +172,11 @@ impl QueuedCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let profile = Profile::load().map_err(CibError::profile)?;

-
        let mut broker =
-
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
-
        let spec = config
-
            .default_adapter()
-
            .ok_or(CibError::UnknownDefaultAdapter(
-
                config.default_adapter_name().into(),
-
            ))?;
-
        let adapter = Adapter::new(&spec.command)
-
            .with_environment(spec.envs())
-
            .with_sensitive_environment(spec.sensitive_envs());
+
        let adapter = config.default_adapter()?;
        logger::adapter_config(config);
-
        broker.set_default_adapter(&adapter);
+

+
        let broker = Broker::new(config.db(), config.max_run_time(), &adapter)
+
            .map_err(CibError::new_broker)?;

        let mut event_notifications = NotificationChannel::new_event();
        event_notifications
@@ -268,18 +260,10 @@ impl ProcessEventsCmd {
            false,
        );

-
        let mut broker =
-
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
-
        let spec = config
-
            .default_adapter()
-
            .ok_or(CibError::UnknownDefaultAdapter(
-
                config.default_adapter_name().into(),
-
            ))?;
-
        let adapter = Adapter::new(&spec.command)
-
            .with_environment(spec.envs())
-
            .with_sensitive_environment(spec.sensitive_envs());
-
        logger::adapter_config(config);
-
        broker.set_default_adapter(&adapter);
+
        let adapter = config.default_adapter()?;
+

+
        let broker = Broker::new(config.db(), config.max_run_time(), &adapter)
+
            .map_err(CibError::new_broker)?;

        let processor = QueueProcessorBuilder::default()
            .events_rx(events_notification.rx().map_err(CibError::notification)?)
@@ -338,8 +322,8 @@ enum CibError {
    #[error("failed to add events to queue")]
    AddEvents(#[source] AdderError),

-
    #[error("default adapter is not in list of adapters")]
-
    UnknownDefaultAdapter(String),
+
    #[error(transparent)]
+
    DefaultAdapter(#[from] ConfigError),

    #[error("programming error: failed to set up inter-thread notification channel")]
    Notification(#[source] NotificationError),
modified src/broker.rs
@@ -4,7 +4,6 @@
//! testing.

use std::{
-
    collections::HashMap,
    path::{Path, PathBuf},
    time::Duration,
};
@@ -29,19 +28,21 @@ use crate::{
/// node, and executes the appropriate adapter to run CI on the
/// repository.
pub struct Broker {
-
    default_adapter: Option<Adapter>,
-
    adapters: HashMap<RepoId, Adapter>,
+
    default_adapter: Adapter,
    max_run_time: Duration,
    db: Db,
}

impl Broker {
    #[allow(clippy::result_large_err)]
-
    pub fn new(db_filename: &Path, max_run_time: Duration) -> Result<Self, BrokerError> {
+
    pub fn new(
+
        db_filename: &Path,
+
        max_run_time: Duration,
+
        default_adapter: &Adapter,
+
    ) -> Result<Self, BrokerError> {
        logger::broker_db(db_filename);
        Ok(Self {
-
            default_adapter: None,
-
            adapters: HashMap::new(),
+
            default_adapter: default_adapter.clone(),
            max_run_time,
            db: Db::new(db_filename)?,
        })
@@ -52,16 +53,8 @@ impl Broker {
        Ok(self.db.get_all_runs()?)
    }

-
    pub fn set_default_adapter(&mut self, adapter: &Adapter) {
-
        self.default_adapter = Some(adapter.clone());
-
    }
-

-
    pub fn default_adapter(&self) -> Option<&Adapter> {
-
        self.default_adapter.as_ref()
-
    }
-

-
    fn adapter(&self, rid: &RepoId) -> Option<&Adapter> {
-
        self.adapters.get(rid).or(self.default_adapter.as_ref())
+
    fn default_adapter(&self) -> &Adapter {
+
        &self.default_adapter
    }

    #[allow(clippy::result_large_err)]
@@ -72,8 +65,7 @@ impl Broker {
    ) -> Result<Run, BrokerError> {
        let broker_run_id = RunId::default();
        let span = span!(Level::TRACE, "execute_ci_run", %broker_run_id,).entered();
-
        let rid = trigger.repo();
-
        let adapter = self.adapter(&rid).ok_or(BrokerError::NoAdapter(rid))?;
+
        let adapter = self.default_adapter();
        let run = span
            .in_scope(|| self.execute_helper(adapter, broker_run_id, trigger, run_notification))?;
        Ok(run)
@@ -201,7 +193,7 @@ mod test {
    use std::{path::Path, time::Duration};
    use tempfile::tempdir;

-
    use super::{Adapter, Broker, RepoId};
+
    use super::{Adapter, Broker};
    use crate::{
        msg::{RunId, RunResult},
        notif::NotificationChannel,
@@ -209,58 +201,17 @@ mod test {
        test::{mock_adapter, trigger_request, TestResult},
    };

-
    fn broker(filename: &Path) -> anyhow::Result<Broker> {
-
        Ok(Broker::new(filename, Duration::from_secs(1))?)
-
    }
-

-
    fn rid() -> anyhow::Result<RepoId> {
-
        const RID: &str = "rad:zwTxygwuz5LDGBq255RA2CbNGrz8";
-
        Ok(RepoId::from_urn(RID)?)
+
    fn broker(filename: &Path, adapter: &Adapter) -> anyhow::Result<Broker> {
+
        Ok(Broker::new(filename, Duration::from_secs(1), adapter)?)
    }

    #[test]
-
    fn has_no_adapters_initially() -> TestResult<()> {
+
    fn has_adapter() -> TestResult<()> {
        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let broker = broker(&db)?;
-
        let rid = rid()?;
-
        assert_eq!(broker.adapter(&rid), None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn does_not_have_a_default_adapter_initially() -> TestResult<()> {
-
        let tmp = tempdir()?;
-
        let db = tmp.path().join("db.db");
-
        let broker = broker(&db)?;
-

-
        assert_eq!(broker.default_adapter(), None);
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn sets_a_default_adapter_initially() -> TestResult<()> {
-
        let tmp = tempdir()?;
-
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db)?;
-

-
        let adapter = Adapter::default();
-
        broker.set_default_adapter(&adapter);
-
        assert_eq!(broker.default_adapter(), Some(&adapter));
-
        Ok(())
-
    }
-

-
    #[test]
-
    fn finds_default_adapter_for_unknown_repo() -> TestResult<()> {
-
        let tmp = tempdir()?;
-
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db)?;
-

        let adapter = Adapter::default();
-
        broker.set_default_adapter(&adapter);
-

-
        let rid = rid()?;
-
        assert_eq!(broker.adapter(&rid), Some(&adapter));
+
        let broker = broker(&db, &adapter)?;
+
        assert_eq!(broker.default_adapter(), &adapter);
        Ok(())
    }

@@ -278,8 +229,7 @@ echo '{"response":"finished","result":"success"}'

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db)?;
-
        broker.set_default_adapter(&adapter);
+
        let mut broker = broker(&db, &adapter)?;

        let trigger = trigger_request()?;

@@ -311,8 +261,7 @@ exit 1

        let tmp = tempdir()?;
        let db = tmp.path().join("db.db");
-
        let mut broker = broker(&db)?;
-
        broker.set_default_adapter(&adapter);
+
        let mut broker = broker(&db, &adapter)?;

        let trigger = trigger_request()?;

modified src/config.rs
@@ -2,7 +2,6 @@

use std::{
    collections::HashMap,
-
    fmt,
    path::{Path, PathBuf},
    time::Duration,
};
@@ -10,7 +9,7 @@ use std::{
use duration_str::deserialize_duration;
use serde::{Deserialize, Serialize};

-
use crate::{filter::EventFilter, sensitive::Sensitive};
+
use crate::{adapter::Adapter, filter::EventFilter, logger, sensitive::Sensitive};

const DEFAULT_MAX_RUN_TIME: Duration = Duration::from_secs(3600);
const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;
@@ -18,7 +17,7 @@ const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
    default_adapter: String,
-
    adapters: HashMap<String, Adapter>,
+
    adapters: HashMap<String, AdapterConfig>,
    filters: Vec<EventFilter>,
    report_dir: Option<PathBuf>,
    status_update_interval_seconds: Option<u64>,
@@ -56,16 +55,12 @@ impl Config {
        &self.filters
    }

-
    pub fn default_adapter_name(&self) -> &str {
-
        &self.default_adapter
-
    }
-

-
    pub fn default_adapter(&self) -> Option<&Adapter> {
-
        self.adapter(&self.default_adapter)
-
    }
-

-
    pub fn adapter(&self, name: &str) -> Option<&Adapter> {
-
        self.adapters.get(name)
+
    pub fn default_adapter(&self) -> Result<Adapter, ConfigError> {
+
        logger::adapter_config(self);
+
        self.adapters
+
            .get(&self.default_adapter)
+
            .map(Adapter::from)
+
            .ok_or(ConfigError::NoDefaultAdapter)
    }

    pub fn status_page_update_interval(&self) -> u64 {
@@ -90,30 +85,15 @@ impl Config {
    }
}

-
impl fmt::Debug for Adapter {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        write!(
-
            f,
-
            "Adapter {{ \n command: {:#?}, \n env: {:#?}, \n sensitive_env: {:#?} }}",
-
            self.command,
-
            self.env,
-
            self.sensitive_env
-
                .keys()
-
                .map(|k| (k.to_string(), "***".to_string()))
-
                .collect::<HashMap<String, String>>()
-
        )
-
    }
-
}
-

-
#[derive(Serialize, Deserialize)]
-
pub struct Adapter {
+
#[derive(Debug, Serialize, Deserialize)]
+
pub struct AdapterConfig {
    pub command: PathBuf,
    pub env: HashMap<String, String>,
    #[serde(default)]
    pub sensitive_env: HashMap<String, Sensitive>,
}

-
impl Adapter {
+
impl AdapterConfig {
    pub fn envs(&self) -> &HashMap<String, String> {
        &self.env
    }
@@ -123,6 +103,14 @@ impl Adapter {
    }
}

+
impl From<&AdapterConfig> for Adapter {
+
    fn from(config: &AdapterConfig) -> Self {
+
        Self::new(&config.command)
+
            .with_environment(config.envs())
+
            .with_sensitive_environment(config.sensitive_envs())
+
    }
+
}
+

/// All possible errors from configuration handling.
#[derive(Debug, thiserror::Error)]
pub enum ConfigError {
@@ -137,6 +125,10 @@ pub enum ConfigError {
    /// Can't convert configuration into JSON.
    #[error("failed to convert configuration into JSON")]
    ToJson(#[source] serde_json::Error),
+

+
    /// No default adapter.
+
    #[error("the default adapter is not defined in the configuration")]
+
    NoDefaultAdapter,
}

#[cfg(test)]