Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
feat: add optional max_run_time field to configuration file
Lars Wirzenius committed 1 year ago
commit 55e1e7dba593cc2f771001c1ac265dbcd93b5b66
parent 30435e200124dd04bc20877e432fd4b486cd7753
6 files changed +137 -22
modified Cargo.lock
@@ -191,6 +191,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d902e3d592a523def97af8f317b08ce16b7ab854c1985a0c671e6f15cebc236"

[[package]]
+
name = "arrayvec"
+
version = "0.7.6"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
+

+
[[package]]
name = "as-slice"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -358,6 +364,15 @@ dependencies = [
]

[[package]]
+
name = "chrono"
+
version = "0.4.38"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
+
dependencies = [
+
 "num-traits",
+
]
+

+
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -651,6 +666,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"

[[package]]
+
name = "duration-str"
+
version = "0.11.2"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "709d653e7c92498eb29fb86a2a6f0f3502b97530f33aedb32ef848d4d28b31a3"
+
dependencies = [
+
 "chrono",
+
 "rust_decimal",
+
 "serde",
+
 "thiserror",
+
 "time",
+
 "winnow",
+
]
+

+
[[package]]
name = "ec25519"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1712,6 +1741,7 @@ dependencies = [
 "clap",
 "ctor",
 "culpa",
+
 "duration-str",
 "html-page",
 "radicle",
 "radicle-git-ext",
@@ -1978,6 +2008,16 @@ dependencies = [
]

[[package]]
+
name = "rust_decimal"
+
version = "1.36.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555"
+
dependencies = [
+
 "arrayvec",
+
 "num-traits",
+
]
+

+
[[package]]
name = "rustix"
version = "0.38.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3083,6 +3123,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"

[[package]]
+
name = "winnow"
+
version = "0.6.20"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "36c1fec1a2bb5866f07c25f68c26e565c4c200aebb96d7e55710c19d3e8ac49b"
+
dependencies = [
+
 "memchr",
+
]
+

+
[[package]]
name = "xattr"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -13,6 +13,7 @@ categories = ["development-tools::build-utils"]
[dependencies]
anyhow = "1.0.86"
clap = { version = "4.5.11", features = ["derive", "wrap_help"] }
+
duration-str = "0.11.2"
html-page = "0.4.0"
radicle-git-ext = "0.8.0"
radicle-surf = { version = "0.22.0", default-features = false, features = ["serde"] }
modified src/adapter.rs
@@ -58,11 +58,12 @@ impl Adapter {
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
+
        max_run_time: Duration,
    ) -> Result<(), AdapterError> {
        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

-
        let x = self.run_helper(trigger, run, db, run_notification);
+
        let x = self.run_helper(trigger, run, db, run_notification, max_run_time);

        run.set_state(RunState::Finished);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;
@@ -76,13 +77,14 @@ impl Adapter {
        run: &mut Run,
        db: &Db,
        run_notification: &NotificationSender,
+
        max_run_time: Duration,
    ) -> Result<(), AdapterError> {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
        let mut cmd = Command::new(&self.bin);
        cmd.envs(self.envs());
-
        let mut child = TimeoutCommand::new(Duration::from_secs(1000));
+
        let mut child = TimeoutCommand::new(max_run_time);
        child.feed_stdin(trigger.to_string().as_bytes());
        let child = child.spawn(cmd).map_err(|err| match err {
            TimeoutError::Spawn(_, err) => AdapterError::SpawnAdapter(self.bin.clone(), err),
@@ -251,7 +253,7 @@ pub enum AdapterError {

#[cfg(test)]
mod test {
-
    use std::{fs::write, io::ErrorKind};
+
    use std::{fs::write, io::ErrorKind, time::Duration};

    use tempfile::{tempdir, NamedTempFile};

@@ -267,6 +269,8 @@ mod test {
        test::{mock_adapter, trigger_request, TestResult},
    };

+
    const MAX: Duration = Duration::from_secs(10);
+

    fn db() -> anyhow::Result<Db> {
        let tmp = NamedTempFile::new()?;
        let db = Db::new(tmp.path())?;
@@ -302,7 +306,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender)?;
+
        Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX)?;
        assert_eq!(run.result(), Some(&RunResult::Success));

        Ok(())
@@ -324,7 +328,7 @@ echo '{"response":"finished","result":"failure"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);

        match x {
            Ok(_) => (),
@@ -353,7 +357,7 @@ exit 1
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(x.is_err());
        assert_eq!(run.result(), Some(&RunResult::Failure));
@@ -375,7 +379,7 @@ kill -9 $BASHPID
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoFirstMessage)));

@@ -398,7 +402,7 @@ kill -9 $BASHPID
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));

@@ -420,7 +424,7 @@ echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::NoSecondMessage)));

@@ -444,7 +448,7 @@ kill -9 $BASHPID
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(x, Err(AdapterError::Signal)));

@@ -467,7 +471,7 @@ echo '{"response":"finished","result":"success","bad":"field"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);

        match x {
            Err(AdapterError::ParseResponse(MessageError::DeserializeResponse(_))) => (),
@@ -492,7 +496,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -521,7 +525,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        assert!(matches!(
            x,
@@ -542,7 +546,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -571,7 +575,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
@@ -604,7 +608,7 @@ echo '{"response":"finished","result":"success"}'
        let mut run = run()?;
        let mut channel = NotificationChannel::new_run();
        let sender = channel.tx()?;
-
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender);
+
        let x = Adapter::new(&bin).run(&trigger_request()?, &mut run, &db, &sender, MAX);
        eprintln!("{x:#?}");
        match x {
            Err(AdapterError::SpawnAdapter(filename, e)) => {
modified src/bin/cib.rs
@@ -148,7 +148,8 @@ impl QueuedCmd {
    fn run(&self, args: &Args, config: &Config) -> Result<(), CibError> {
        let profile = Profile::load().map_err(CibError::profile)?;

-
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let mut broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
        let spec =
            config
                .adapter(&config.default_adapter)
@@ -235,7 +236,8 @@ impl ProcessEventsCmd {
            false,
        );

-
        let mut broker = Broker::new(config.db()).map_err(CibError::new_broker)?;
+
        let mut broker =
+
            Broker::new(config.db(), config.max_run_time()).map_err(CibError::new_broker)?;
        let spec =
            config
                .adapter(&config.default_adapter)
modified src/broker.rs
@@ -6,6 +6,7 @@
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
+
    time::Duration,
};

use time::{macros::format_description, OffsetDateTime};
@@ -29,16 +30,18 @@ use crate::{
pub struct Broker {
    default_adapter: Option<Adapter>,
    adapters: HashMap<RepoId, Adapter>,
+
    max_run_time: Duration,
    db: Db,
}

impl Broker {
    #[allow(clippy::result_large_err)]
-
    pub fn new(db_filename: &Path) -> Result<Self, BrokerError> {
+
    pub fn new(db_filename: &Path, max_run_time: Duration) -> Result<Self, BrokerError> {
        logger::broker_db(db_filename);
        Ok(Self {
            default_adapter: None,
            adapters: HashMap::new(),
+
            max_run_time,
            db: Db::new(db_filename)?,
        })
    }
@@ -107,7 +110,13 @@ impl Broker {
                    // We run the adapter, but if that fails, we just
                    // log the error. The `Run` value records the
                    // result of the run.
-
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db, run_notification) {
+
                    if let Err(e) = adapter.run(
+
                        trigger,
+
                        &mut run,
+
                        &self.db,
+
                        run_notification,
+
                        self.max_run_time,
+
                    ) {
                        logger::error("failed to run adapter or it failed to run CI", &e);
                    }

@@ -172,7 +181,7 @@ pub enum BrokerError {

#[cfg(test)]
mod test {
-
    use std::path::Path;
+
    use std::{path::Path, time::Duration};
    use tempfile::tempdir;

    use super::{Adapter, Broker, RepoId};
@@ -184,7 +193,7 @@ mod test {
    };

    fn broker(filename: &Path) -> anyhow::Result<Broker> {
-
        Ok(Broker::new(filename)?)
+
        Ok(Broker::new(filename, Duration::from_secs(1))?)
    }

    fn rid() -> anyhow::Result<RepoId> {
modified src/config.rs
@@ -4,24 +4,34 @@ use std::{
    collections::HashMap,
    fmt,
    path::{Path, PathBuf},
+
    time::Duration,
};

+
use duration_str::deserialize_duration;
use serde::{Deserialize, Serialize};

use crate::filter::EventFilter;

+
const DEFAULT_MAX_RUN_TIME: Duration = Duration::from_secs(3600);
const DEFAULT_STATUS_PAGE_UPDATE_INTERVAL: u64 = 10;

#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
    pub default_adapter: String,
    pub adapters: HashMap<String, Adapter>,
+
    #[serde(deserialize_with = "deserialize_duration")]
+
    #[serde(default = "default_max_run_time")]
+
    pub max_run_time: Duration,
    pub filters: Vec<EventFilter>,
    pub report_dir: Option<PathBuf>,
    pub status_update_interval_seconds: Option<u64>,
    pub db: PathBuf,
}

+
fn default_max_run_time() -> Duration {
+
    DEFAULT_MAX_RUN_TIME
+
}
+

impl Config {
    pub fn load(filename: &Path) -> Result<Self, ConfigError> {
        let config =
@@ -38,6 +48,10 @@ impl Config {
            .unwrap_or(DEFAULT_STATUS_PAGE_UPDATE_INTERVAL)
    }

+
    pub fn max_run_time(&self) -> Duration {
+
        self.max_run_time
+
    }
+

    pub fn db(&self) -> &Path {
        &self.db
    }
@@ -93,3 +107,39 @@ pub enum ConfigError {
    #[error("failed to convert configuration into JSON")]
    ToJson(#[source] serde_json::Error),
}
+

+
#[cfg(test)]
+
mod test {
+
    use super::*;
+

+
    #[test]
+
    #[allow(clippy::unwrap_used)]
+
    fn parse_config_yaml() {
+
        const YAML: &str = r#"---
+
default_adapter: foo
+
adapters: {}
+
filters: []
+
db: "foo.db"
+
max_run_time: 1min
+
...
+
"#;
+

+
        let cfg: Config = serde_yml::from_str(YAML).unwrap();
+
        assert_eq!(cfg.max_run_time(), Duration::from_secs(60));
+
    }
+

+
    #[test]
+
    #[allow(clippy::unwrap_used)]
+
    fn parse_config_yaml_without_max_run_time() {
+
        const YAML: &str = r#"---
+
default_adapter: foo
+
adapters: {}
+
filters: []
+
db: "foo.db"
+
...
+
"#;
+

+
        let cfg: Config = serde_yml::from_str(YAML).unwrap();
+
        assert_eq!(cfg.max_run_time(), DEFAULT_MAX_RUN_TIME);
+
    }
+
}