Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: construct a collection of all adapters that knows the default
Lars Wirzenius committed 1 year ago
commit ee34af59f16c692f4bf1db5bde35d541a22c843f
parent 7ebe081334f233906291a716cd578b571487c03c
4 files changed +66 -16
modified src/adapter.rs
@@ -18,6 +18,7 @@ use std::{
//use tracing::Instrument;

use crate::{
+
    config::AdapterConfig,
    db::{Db, DbError},
    logger,
    msg::{MessageError, Request, Response},
@@ -29,6 +30,43 @@ use crate::{

const NOT_EXITED: i32 = 999;

+
/// The set of all configured adapters.
+
#[derive(Clone)]
+
pub struct Adapters {
+
    adapters: HashMap<String, Adapter>,
+
    default_adapter: String,
+
}
+

+
impl Adapters {
+
    pub fn new(
+
        adapters: &HashMap<String, AdapterConfig>,
+
        default_adapter: &str,
+
    ) -> Result<Self, AdapterError> {
+
        if !adapters.contains_key(default_adapter) {
+
            Err(AdapterError::NoDefaultAdapter)
+
        } else {
+
            Ok(Self {
+
                adapters: HashMap::from_iter(
+
                    adapters
+
                        .iter()
+
                        .map(|(k, v)| (k.to_string(), Adapter::from(v))),
+
                ),
+
                default_adapter: default_adapter.into(),
+
            })
+
        }
+
    }
+

+
    pub fn default_adapter(&self) -> &Adapter {
+
        // We KNOW the default adapter is in the map so the unwrap is safe.
+
        #[allow(clippy::unwrap_used)]
+
        self.adapters.get(&self.default_adapter).unwrap()
+
    }
+

+
    pub fn get(&self, name: &str) -> Option<&Adapter> {
+
        self.adapters.get(name)
+
    }
+
}
+

/// An external executable that runs CI on request.
#[derive(Debug, Default, Clone, Eq, PartialEq)]
pub struct Adapter {
@@ -227,6 +265,10 @@ impl MaybeResult {

#[derive(Debug, thiserror::Error)]
pub enum AdapterError {
+
    /// No default adapter.
+
    #[error("the default adapter is not defined in the configuration")]
+
    NoDefaultAdapter,
+

    /// Error from [`TimeoutCommand`] or [`RunningProcess`].
    #[error(transparent)]
    TimeoutCommand(#[from] crate::timeoutcmd::TimeoutError),
modified src/bin/cib.rs
@@ -13,6 +13,7 @@ use clap::Parser;
use radicle::Profile;

use radicle_ci_broker::{
+
    adapter::AdapterError,
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
@@ -171,7 +172,7 @@ impl QueuedCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let profile = Profile::load().map_err(CibError::profile)?;

-
        let adapter = config.default_adapter()?;
+
        let adapters = config.to_adapters().map_err(CibError::Adapters)?;
        logger::adapter_config(config);

        let broker =
@@ -194,7 +195,7 @@ impl QueuedCmd {
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
            .filters(config.filters())
-
            .default_adapter(&adapter)
+
            .adapters(&adapters)
            .build()
            .map_err(CibError::process_queue)?;
        let thread = processor.process_in_thread();
@@ -260,7 +261,7 @@ impl ProcessEventsCmd {
            false,
        );

-
        let adapter = config.default_adapter()?;
+
        let adapters = config.to_adapters().map_err(CibError::Adapters)?;

        let broker =
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
@@ -272,7 +273,7 @@ impl ProcessEventsCmd {
            .queue_len_interval(config.queue_len_interval())
            .broker(broker)
            .filters(config.filters())
-
            .default_adapter(&adapter)
+
            .adapters(&adapters)
            .build()
            .map_err(CibError::process_queue)?;
        let processor = processor.process_in_thread();
@@ -324,8 +325,8 @@ enum CibError {
    #[error("failed to add events to queue")]
    AddEvents(#[source] AdderError),

-
    #[error(transparent)]
-
    DefaultAdapter(#[from] ConfigError),
+
    #[error("could not construct list of adapters from configuration")]
+
    Adapters(#[source] AdapterError),

    #[error("programming error: failed to set up inter-thread notification channel")]
    Notification(#[source] NotificationError),
modified src/config.rs
@@ -9,7 +9,12 @@ use std::{
use duration_str::deserialize_duration;
use serde::{Deserialize, Serialize};

-
use crate::{adapter::Adapter, filter::EventFilter, logger, sensitive::Sensitive};
+
use crate::{
+
    adapter::{Adapter, AdapterError, Adapters},
+
    filter::EventFilter,
+
    logger,
+
    sensitive::Sensitive,
+
};

const DEFAULT_MAX_RUN_TIME: Duration = Duration::from_secs(3600);
const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;
@@ -51,6 +56,10 @@ impl Config {
        self.report_dir.as_deref()
    }

+
    pub fn to_adapters(&self) -> Result<Adapters, AdapterError> {
+
        Adapters::new(&self.adapters, &self.default_adapter)
+
    }
+

    pub fn filters(&self) -> &[EventFilter] {
        &self.filters
    }
modified src/queueproc.rs
@@ -11,7 +11,7 @@ use std::{
use radicle::Profile;

use crate::{
-
    adapter::Adapter,
+
    adapter::{Adapter, Adapters},
    broker::{Broker, BrokerError},
    ci_event::{CiEvent, CiEventV1},
    db::{Db, DbError, QueueId, QueuedCiEvent},
@@ -26,7 +26,7 @@ pub struct QueueProcessorBuilder {
    db: Option<Db>,
    broker: Option<Broker>,
    filters: Option<Vec<EventFilter>>,
-
    default_adapter: Option<Adapter>,
+
    adapters: Option<Adapters>,
    events_rx: Option<NotificationReceiver>,
    run_tx: Option<NotificationSender>,
    queue_len_interval: Option<Duration>,
@@ -41,9 +41,7 @@ impl QueueProcessorBuilder {
            profile: Profile::load().map_err(QueueError::Profile)?,
            broker: self.broker.ok_or(QueueError::Missing("broker"))?,
            filters: self.filters.ok_or(QueueError::Missing("filters"))?,
-
            default_adapter: self
-
                .default_adapter
-
                .ok_or(QueueError::Missing("default_adapter"))?,
+
            adapters: self.adapters.ok_or(QueueError::Missing("adapters"))?,
            events_rx: self.events_rx.ok_or(QueueError::Missing("events_rx"))?,
            run_tx: self.run_tx.ok_or(QueueError::Missing("run_tx"))?,
            queue_len_interval: self
@@ -83,8 +81,8 @@ impl QueueProcessorBuilder {
        self
    }

-
    pub fn default_adapter(mut self, adapter: &Adapter) -> Self {
-
        self.default_adapter = Some(adapter.clone());
+
    pub fn adapters(mut self, adapters: &Adapters) -> Self {
+
        self.adapters = Some(adapters.clone());
        self
    }
}
@@ -94,7 +92,7 @@ pub struct QueueProcessor {
    profile: Profile,
    filters: Vec<EventFilter>,
    broker: Broker,
-
    default_adapter: Adapter,
+
    adapters: Adapters,
    events_rx: NotificationReceiver,
    run_tx: NotificationSender,
    queue_len_interval: Duration,
@@ -190,7 +188,7 @@ impl QueueProcessor {
    fn pick_adapters(&self, e: &CiEvent) -> Option<Vec<Adapter>> {
        for filter in self.filters.iter() {
            if filter.allows(e) {
-
                return Some(vec![self.default_adapter.clone()]);
+
                return Some(vec![self.adapters.default_adapter().clone()]);
            }
        }
        None