Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
feat: switch to structured logging only
Merged liw opened 1 year ago

Replace all use of log and env-logger crates with structured logging using slog and slog-json.

Change the recently introduced logger module to use the slog-scope crate. This avoids having to pass a reference to logger::Logging to every place that needs to do any logging.

Signed-off-by: Lars Wirzenius liw@liw.fi

13 files changed +310 -293 f0a99590 85414e86
modified Cargo.lock
@@ -179,6 +179,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d1d046238990b9cf5bcde22a3fb3584ee5cf65fb2765f454ed428c7a0063da"

[[package]]
+
name = "arc-swap"
+
version = "1.7.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
+

+
[[package]]
name = "arraydeque"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -501,6 +507,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3b7eb4404b8195a9abb6356f4ac07d8ba267045c8d6d220ac4dc992e6cc75df"

[[package]]
+
name = "ctor"
+
version = "0.2.8"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f"
+
dependencies = [
+
 "quote",
+
 "syn 2.0.75",
+
]
+

+
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -703,16 +719,6 @@ dependencies = [
]

[[package]]
-
name = "env_filter"
-
version = "0.1.2"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab"
-
dependencies = [
-
 "log",
-
 "regex",
-
]
-

-
[[package]]
name = "env_logger"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -726,19 +732,6 @@ dependencies = [
]

[[package]]
-
name = "env_logger"
-
version = "0.11.5"
-
source = "registry+https://github.com/rust-lang/crates.io-index"
-
checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d"
-
dependencies = [
-
 "anstream",
-
 "anstyle",
-
 "env_filter",
-
 "humantime",
-
 "log",
-
]
-

-
[[package]]
name = "equivalent"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1729,10 +1722,9 @@ version = "0.4.0"
dependencies = [
 "anyhow",
 "clap",
+
 "ctor",
 "culpa",
-
 "env_logger 0.11.5",
 "html-page",
-
 "log",
 "radicle",
 "radicle-git-ext",
 "radicle-surf",
@@ -1742,6 +1734,7 @@ dependencies = [
 "serde_yml",
 "slog",
 "slog-json",
+
 "slog-scope",
 "sqlite",
 "sqlite3-sys",
 "subplot-build",
@@ -2215,6 +2208,17 @@ dependencies = [
]

[[package]]
+
name = "slog-scope"
+
version = "4.4.0"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2f95a4b4c3274cd2869549da82b57ccc930859bdbf5bcea0424bc5f140b3c786"
+
dependencies = [
+
 "arc-swap",
+
 "lazy_static",
+
 "slog",
+
]
+

+
[[package]]
name = "slug"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2371,7 +2375,7 @@ dependencies = [
 "anyhow",
 "base64 0.22.1",
 "clap",
-
 "env_logger 0.10.2",
+
 "env_logger",
 "file_diff",
 "git-testament",
 "html-escape",
modified Cargo.toml
@@ -13,9 +13,7 @@ categories = ["development-tools::build-utils"]
[dependencies]
anyhow = "1.0.86"
clap = { version = "4.5.11", features = ["derive", "wrap_help"] }
-
env_logger = "0.11.5"
html-page = "0.4.0"
-
log = "0.4.22"
radicle-git-ext = "0.7.0"
radicle-surf = { version = "0.18.0", default-features = false, features = ["serde"] }
regex = "1.10.5"
@@ -24,6 +22,7 @@ serde_json = "1.0.121"
serde_yml = "0.0.11"
slog = "2.7.0"
slog-json = "2.6.1"
+
slog-scope = "4.4.0"
sqlite = "0.32.0"
sqlite3-sys = "0.15.0"
subplotlib = "0.10.0"
@@ -36,6 +35,7 @@ version = "0.12.0"
features = ["default", "test"]

[dev-dependencies]
+
ctor = "0.2.8"
culpa = "1.0.2"
tempfile = { version = "3.10.1" }

modified src/adapter.rs
@@ -15,10 +15,9 @@ use std::{
    process::{Command, Stdio},
};

-
use log::{debug, error};
-

use crate::{
    db::{Db, DbError},
+
    logger,
    msg::{MessageError, Request, Response},
    run::{Run, RunState},
};
@@ -52,8 +51,6 @@ impl Adapter {
    }

    pub fn run(&self, trigger: &Request, run: &mut Run, db: &Db) -> Result<(), AdapterError> {
-
        debug!("running adapter");
-

        run.set_state(RunState::Triggered);
        db.update_run(run).map_err(AdapterError::UpdateRun)?;

@@ -69,7 +66,6 @@ impl Adapter {
        assert!(matches!(trigger, Request::Trigger { .. }));

        // Spawn the adapter sub-process.
-
        debug!("spawn adapter sub-process: {:?}", self.bin);
        let mut child = Command::new(&self.bin)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
@@ -81,7 +77,6 @@ impl Adapter {
        // Write the request to trigger a run to the child's stdin.
        // Then close the pipe to prevent the child from trying to
        // read another message that will never be sent.
-
        debug!("send request to sub-process stdin");
        {
            let stdin = child.stdin.take().ok_or(AdapterError::StdinHandle)?;
            trigger
@@ -98,7 +93,6 @@ impl Adapter {
        if let Some(line) = lines.next() {
            let line = line.map_err(AdapterError::ReadLine)?;
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
-
            debug!("first response {resp:#?}");
            match resp {
                Response::Triggered { run_id, info_url } => {
                    run.set_state(RunState::Running);
@@ -111,13 +105,12 @@ impl Adapter {
                _ => return Err(AdapterError::NotTriggered(resp)),
            }
        } else {
-
            debug!("no first response message");
+
            logger::adapter_no_first_response();
        }

        if let Some(line) = lines.next() {
            let line = line.map_err(AdapterError::ReadLine)?;
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
-
            debug!("second response {resp:#?}");
            match resp {
                Response::Finished { result } => {
                    run.set_result(result);
@@ -126,22 +119,18 @@ impl Adapter {
                _ => return Err(AdapterError::NotFinished(resp)),
            }
        } else {
-
            debug!("no second response message");
+
            logger::adapter_no_second_response();
        }

        if let Some(line) = lines.next() {
            let line = line.map_err(AdapterError::ReadLine)?;
            let resp = Response::from_str(&line).map_err(AdapterError::ParseResponse)?;
-
            debug!("third response is too many {resp:#?}");
+
            logger::adapter_too_many_responses();
            return Err(AdapterError::TooMany(resp));
-
        } else {
-
            debug!("no further response messages");
        }

        let wait = child.wait().map_err(AdapterError::Wait)?;
-
        debug!("adapter wait result {wait:#?}");
        if let Some(exit) = wait.code() {
-
            debug!("adapter exit was {exit}");
            if exit != 0 {
                let mut stderr = child.stderr.take().ok_or(AdapterError::StderrHandle)?;
                let mut buf = vec![];
@@ -149,11 +138,11 @@ impl Adapter {
                    .read_to_end(&mut buf)
                    .map_err(AdapterError::ReadStderr)?;
                let stderr = String::from_utf8_lossy(&buf);
-
                debug!("adapter stderr: {stderr:?}");
+
                logger::adapter_result(exit, &stderr.to_string());
                return Err(AdapterError::Failed(exit));
            }
        } else {
-
            error!("adapter sub-process did not exit voluntarily");
+
            logger::adapter_did_not_exit_voluntarioly();
            return Err(AdapterError::Failed(NOT_EXITED));
        }

@@ -233,7 +222,7 @@ mod test {
        adapter::AdapterError,
        msg::{MessageError, Response, RunResult},
        run::Whence,
-
        test::{log_in_tests, mock_adapter, trigger_request, TestResult},
+
        test::{mock_adapter, trigger_request, TestResult},
    };

    fn db() -> anyhow::Result<Db> {
@@ -257,8 +246,6 @@ mod test {

    #[test]
    fn adapter_reports_success() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -279,8 +266,6 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_reports_failure() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -306,8 +291,6 @@ echo '{"response":"finished","result":"failure"}'

    #[test]
    fn adapter_exits_nonzero() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -332,8 +315,6 @@ exit 1

    #[test]
    fn adapter_is_killed_before_any_messages() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
kill -9 $BASHPID
"#;
@@ -356,8 +337,6 @@ kill -9 $BASHPID

    #[test]
    fn adapter_is_killed_after_first_message() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -379,8 +358,6 @@ kill -9 $BASHPID

    #[test]
    fn adapter_is_killed_after_second_message() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -403,8 +380,6 @@ kill -9 $BASHPID

    #[test]
    fn adapter_produces_as_bad_message() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -429,8 +404,6 @@ echo '{"response":"finished","result":"success","bad":"field"}'

    #[test]
    fn adapter_first_message_isnt_triggered() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"finished","result":"success"}'
@@ -456,8 +429,6 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_outputs_too_many_messages() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -485,8 +456,6 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_does_not_exist() -> TestResult<()> {
-
        log_in_tests();
-

        let tmp = tempdir()?;
        let bin = tmp.path().join("adapter.sh");

@@ -507,8 +476,6 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_is_not_executable() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
@@ -536,8 +503,6 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_has_bad_interpreter() -> TestResult<()> {
-
        log_in_tests();
-

        // We test this with a shebang. However, the same kind of code
        // paths and errors should happen when a binary can't be
        // loaded due to missing dynamic linker or library or such.
modified src/bin/cib.rs
@@ -3,14 +3,12 @@
#![allow(clippy::result_large_err)]

use std::{
-
    error::Error,
    fs::write,
    path::{Path, PathBuf},
    process::exit,
};

use clap::Parser;
-
use log::{debug, error, info};

use radicle::Profile;

@@ -19,7 +17,7 @@ use radicle_ci_broker::{
    broker::{Broker, BrokerError},
    config::{Config, ConfigError},
    db::{Db, DbError},
-
    logger::Logger,
+
    logger,
    notif::NotificationChannel,
    pages::{PageBuilder, PageError},
    queueadd::{AdderError, QueueAdderBuilder},
@@ -27,31 +25,22 @@ use radicle_ci_broker::{
};

fn main() {
-
    let logger = Logger::default();
-
    logger.start_cib();
+
    let _logger = logger::open();

+
    logger::start_cib();
    if let Err(e) = fallible_main() {
-
        error!("ERROR: {}", e);
-
        let mut e = e.source();
-
        while let Some(source) = e {
-
            error!("caused by: {}", source);
-
            e = source.source();
-
        }
-
        logger.end_cib_in_error();
+
        logger::error("ERROR", &e);
+
        logger::end_cib_in_error();
        exit(1);
    }
-

-
    logger.end_cib_successfully();
+
    logger::end_cib_successfully();
}

fn fallible_main() -> Result<(), CibError> {
    let args = Args::parse();

-
    env_logger::init_from_env("RADICLE_CI_BROKER_LOG");
-
    info!("Radicle CI broker starts");
-

    let config = Config::load(&args.config).map_err(|e| CibError::read_config(&args.config, e))?;
-
    debug!("loaded configuration: {:#?}", config);
+
    logger::loaded_config(&config);

    args.run(&config)?;

@@ -131,7 +120,6 @@ impl InsertCmd {
            .join()
            .expect("wait for thread to finish")
            .map_err(CibError::add_events)?;
-
        debug!("cib ends");
        Ok(())
    }
}
@@ -153,7 +141,7 @@ impl QueuedCmd {
        let adapter = Adapter::new(&spec.command)
            .with_environment(spec.envs())
            .with_environment(spec.sensitive_envs());
-
        debug!("default adapter: {adapter:?}");
+
        logger::debug2(format!("default adapter: {adapter:?}"));
        broker.set_default_adapter(&adapter);

        let mut event_notifications = NotificationChannel::default();
@@ -174,7 +162,6 @@ impl QueuedCmd {
            .expect("wait for thread to finish")
            .map_err(CibError::process_queue)?;

-
        debug!("cib ends");
        Ok(())
    }
}
@@ -221,7 +208,7 @@ impl ProcessEventsCmd {
        let adapter = Adapter::new(&spec.command)
            .with_environment(spec.envs())
            .with_environment(spec.sensitive_envs());
-
        debug!("default adapter: {adapter:?}");
+
        logger::debug2(format!("default adapter: {adapter:?}"));
        broker.set_default_adapter(&adapter);

        let processor = QueueProcessorBuilder::default()
@@ -236,18 +223,14 @@ impl ProcessEventsCmd {
            .join()
            .expect("wait for processor thread to finish")
            .map_err(CibError::process_queue)?;
-
        info!("event processing thread has ended");

        // The page updating thread ends when the channel for run
        // notifications is closed by the processor thread ending.
-
        info!("wait for page updater thread to end");
        page_updater
            .join()
            .expect("wait for page updater thread to finish")
            .expect("page updater thread succeeded");
-
        info!("page updater thread has ended");

-
        debug!("cib ends");
        Ok(())
    }
}
modified src/bin/cibtool.rs
@@ -54,11 +54,8 @@ fn main() {

#[allow(clippy::result_large_err)]
fn fallible_main() -> Result<(), CibToolError> {
-
    env_logger::init_from_env("RADICLE_CI_BROKER_LOG");
-

    let args = Args::parse();
    args.cmd.run(&args)?;
-

    Ok(())
}

modified src/broker.rs
@@ -5,11 +5,9 @@

use std::{
    collections::HashMap,
-
    error::Error,
    path::{Path, PathBuf},
};

-
use log::{debug, error, info};
use time::{macros::format_description, OffsetDateTime};

use radicle::prelude::RepoId;
@@ -17,6 +15,7 @@ use radicle::prelude::RepoId;
use crate::{
    adapter::Adapter,
    db::{Db, DbError},
+
    logger,
    msg::{PatchEvent, PushEvent, Request},
    run::{Run, Whence},
};
@@ -35,7 +34,7 @@ pub struct Broker {
impl Broker {
    #[allow(clippy::result_large_err)]
    pub fn new(db_filename: &Path) -> Result<Self, BrokerError> {
-
        debug!("broker database in {}", db_filename.display());
+
        logger::broker_db(db_filename);
        Ok(Self {
            default_adapter: None,
            adapters: HashMap::new(),
@@ -66,8 +65,7 @@ impl Broker {

    #[allow(clippy::result_large_err)]
    pub fn execute_ci(&mut self, trigger: &Request) -> Result<Run, BrokerError> {
-
        info!("Start CI run");
-
        debug!("Start CI run on {trigger:#?}");
+
        logger::broker_start_run(trigger);
        let run = match trigger {
            Request::Trigger {
                common,
@@ -104,24 +102,17 @@ impl Broker {
                    // We run the adapter, but if that fails, we just
                    // log the error. The `Run` value records the
                    // result of the run.
-
                    debug!("broker runs adapter");
                    if let Err(e) = adapter.run(trigger, &mut run, &self.db) {
-
                        error!("failed to run adapter or it failed to run CI: {e}");
-
                        let mut e = e.source();
-
                        while let Some(source) = e {
-
                            error!("caused by: {}", source);
-
                            e = source.source();
-
                        }
+
                        logger::error("failed to run adapter or it failed to run CI", &e);
                    }

-
                    debug!("broker run {run:#?}");
                    run
                } else {
                    return Err(BrokerError::NoAdapter(*rid));
                }
            }
        };
-
        info!("Finish CI run: {run:?}");
+
        logger::broker_end_run(&run);

        self.db.update_run(&run)?;

@@ -188,7 +179,7 @@ mod test {
    use crate::{
        msg::{RunId, RunResult},
        run::RunState,
-
        test::{log_in_tests, mock_adapter, trigger_request, TestResult},
+
        test::{mock_adapter, trigger_request, TestResult},
    };

    fn broker(filename: &Path) -> anyhow::Result<Broker> {
@@ -309,8 +300,6 @@ echo '{"response":"finished","result":"success"}'

    #[test]
    fn adapter_fails() -> TestResult<()> {
-
        log_in_tests();
-

        const ADAPTER: &str = r#"#!/bin/bash
read
echo '{"response":"triggered","run_id":{"id":"xyzzy"}}'
modified src/db.rs
@@ -16,7 +16,6 @@ use std::{
    time::Duration,
};

-
use log::{debug, trace};
use sqlite::{Connection, State, Statement};
use time::{macros::format_description, OffsetDateTime};
use uuid::Uuid;
@@ -38,7 +37,6 @@ impl Db {
    /// If it is created, tables are created.
    pub fn new<P: AsRef<Path>>(filename: P) -> Result<Self, DbError> {
        let filename = filename.as_ref();
-
        debug!("open database {}", filename.display());
        let mut db = Db {
            filename: filename.into(),
            conn: sqlite::open(filename).map_err(|e| DbError::open(filename, e))?,
@@ -46,7 +44,6 @@ impl Db {

        debug_assert!(MAX_WAIT.as_millis() < usize::MAX as u128); // safe conversion
        let ms = MAX_WAIT.as_millis() as usize;
-
        trace!("set busy timeout to {ms} milliseconds");
        db.conn
            .set_busy_timeout(ms)
            .map_err(|e| DbError::busy_timer(filename, e))?;
@@ -96,7 +93,6 @@ impl Db {

    // Prepare a statement for execution.
    fn prepare<'a>(&'a self, sql: &str) -> Result<Stmt<'a>, DbError> {
-
        trace!("prepare {}", sql);
        match self.conn.prepare(sql) {
            Ok(stmt) => Ok(Stmt::new(sql, stmt)),
            Err(e) => Err(DbError::prepare(sql, &self.filename, e)),
@@ -106,7 +102,6 @@ impl Db {
    // Execute a statement that doesn't return any rows with values.
    // This means basically any statement except SELECT.
    fn execute_valueless(stmt: &mut Stmt) -> Result<(), DbError> {
-
        trace!("execute {}", stmt.sql);
        stmt.stmt.reset().map_err(DbError::reset)?;
        match stmt.stmt.next() {
            Ok(_) => Ok(()),
modified src/event.rs
@@ -27,7 +27,6 @@
//! let e = Filters::try_from(filters).unwrap();
//! ```

-
use log::{debug, error, info, trace};
use radicle::{
    node::{Event, Handle, NodeId},
    prelude::RepoId,
@@ -43,6 +42,8 @@ use std::{
    time,
};

+
use crate::logger;
+

/// Source of events from the local Radicle node.
///
/// The events are filtered. Only events allowed by at least one
@@ -57,30 +58,18 @@ impl NodeEventSource {
    /// Create a new source of node events, for a given Radicle
    /// profile.
    pub fn new(profile: &Profile) -> Result<Self, NodeEventError> {
-
        info!("subscribing to local node events");
        let socket = profile.socket();
-
        info!(
-
            "profile socket exists? {:#?} {}",
-
            socket.display(),
-
            socket.exists()
-
        );
        if !socket.exists() {
            return Err(NodeEventError::NoControlSocket(socket));
        }
-
        debug!("about to Node::new()");
        let node = radicle::Node::new(socket.clone());
-
        debug!("{node:#?}");
-
        debug!("about to node.subscribe()");
        match node.subscribe(time::Duration::MAX) {
-
            Ok(events) => {
-
                trace!("subscribed OK");
-
                Ok(Self {
-
                    events: Box::new(events.into_iter()),
-
                    allowed: vec![],
-
                })
-
            }
+
            Ok(events) => Ok(Self {
+
                events: Box::new(events.into_iter()),
+
                allowed: vec![],
+
            }),
            Err(err) => {
-
                info!("failed to subscribe to node events");
+
                logger::error("failed to subscribe to node events", &err);
                Err(NodeEventError::CannotSubscribe(socket.clone(), err))
            }
        }
@@ -94,11 +83,9 @@ impl NodeEventSource {
    fn allowed(&self, event: &BrokerEvent) -> Result<bool, NodeEventError> {
        for filter in self.allowed.iter() {
            if !event.is_allowed(filter)? {
-
                trace!("event is not allowed");
                return Ok(false);
            }
        }
-
        trace!("event is allowed");
        Ok(true)
    }

@@ -107,45 +94,34 @@ impl NodeEventSource {
    /// no more events from this source, or there's an error.
    pub fn event(&mut self) -> Result<Vec<BrokerEvent>, NodeEventError> {
        loop {
-
            debug!("getting next event from local node");
            if let Some(event) = self.events.next() {
-
                debug!("got event from local node");
                match event {
                    Ok(event) => {
-
                        debug!("got node event {:#?}", event);
                        let mut result = vec![];
                        if let Some(broker_events) = BrokerEvent::from_event(&event) {
-
                            info!("node event became {} broker events", broker_events.len());
                            for e in broker_events {
                                if self.allowed(&e)? {
-
                                    info!("allowed event {e:?}");
                                    result.push(e);
                                }
                            }
                            if !result.is_empty() {
-
                                info!("got {} allowed broker events from node event", result.len());
                                return Ok(result);
-
                            } else {
-
                                info!("got no allowed broker events from node event, ignoring it");
                            }
                        }
-
                        debug!(
-
                            "got event, but did not result in broker events, or none were allowed"
-
                        );
                    }
                    Err(radicle::node::Error::Io(err))
                        if err.kind() == std::io::ErrorKind::ConnectionReset =>
                    {
-
                        error!("connection to node control socket broke");
+
                        logger::event_disconnected();
                        return Err(NodeEventError::BrokenConnection);
                    }
                    Err(err) => {
-
                        error!("error reading event from node: {err}");
+
                        logger::error("error reading event from node", &err);
                        return Err(NodeEventError::Node(err));
                    }
                }
            } else {
-
                info!("no more node events from control socket: iterator ended");
+
                logger::event_end();
                return Ok(vec![]);
            }
        }
@@ -353,15 +329,12 @@ impl BrokerEvent {
                    RefUpdate::Skipped { name, oid }
                        if name.as_str() == "shutdown" && oid.is_zero() =>
                    {
-
                        info!("received shutdown event from node");
                        events.push(Self::shutdown());
                    }
                    RefUpdate::Created { name, oid } => {
-
                        info!("ref created {name} {oid}");
                        events.push(Self::new(rid, name, oid, None));
                    }
                    RefUpdate::Updated { name, old, new } => {
-
                        info!("ref updated {name} {old} => {new}");
                        events.push(Self::new(rid, name, new, Some(*old)));
                    }
                    _ => (),
@@ -375,37 +348,20 @@ impl BrokerEvent {

    /// Is this broker event allowed by a filter?
    fn is_allowed(&self, filter: &EventFilter) -> Result<bool, NodeEventError> {
-
        debug!("is_allowed called: filter={filter:?}");
-
        let res = self.is_allowed_helper(filter, 0)?;
-
        debug!("is_allowed: res={res}");
+
        let res = self.is_allowed_helper(filter)?;
        Ok(res)
    }

-
    fn is_allowed_helper(
-
        &self,
-
        filter: &EventFilter,
-
        level: usize,
-
    ) -> Result<bool, NodeEventError> {
-
        let prefix = format!("{:width$}", " ", width = level * 4);
-

-
        trace!("is_allowed: {prefix} called {self:?}");
-

+
    fn is_allowed_helper(&self, filter: &EventFilter) -> Result<bool, NodeEventError> {
        let allowed = match self {
            Self::Shutdown => true,
            Self::RefChanged {
                rid,
                name,
-
                oid,
-
                old,
+
                oid: _,
+
                old: _,
            } => {
-
                trace!("is_allowed: {prefix} rid={rid:?}");
-
                trace!("is_allowed: {prefix} name={name:?}");
-
                trace!("is_allowed: {prefix} oid={oid:?}");
-
                trace!("is_allowed: {prefix} old={old:?}");
-
                trace!("is_allowed: {prefix} filter={filter:?}");
-

                let parsed = parse_ref(name)?;
-
                trace!("is_allowed: {prefix} parsed={parsed:?}");

                match filter {
                    EventFilter::Repository(wanted) => rid == wanted,
@@ -426,18 +382,17 @@ impl BrokerEvent {
                    }
                    EventFilter::And(conds) => conds
                        .iter()
-
                        .all(|cond| self.is_allowed_helper(cond, level + 1).unwrap_or(false)),
+
                        .all(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
                    EventFilter::Or(conds) => conds
                        .iter()
-
                        .any(|cond| self.is_allowed_helper(cond, level + 1).unwrap_or(false)),
+
                        .any(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
                    EventFilter::Not(conds) => !conds
                        .iter()
-
                        .any(|cond| self.is_allowed_helper(cond, level + 1).unwrap_or(false)),
+
                        .any(|cond| self.is_allowed_helper(cond).unwrap_or(false)),
                }
            }
        };

-
        trace!("is_allowed: {prefix} allowed={allowed}");
        Ok(allowed)
    }

@@ -564,18 +519,13 @@ pub enum ParsedRef {
/// }
/// ```
pub fn parse_ref(s: &str) -> Result<Option<ParsedRef>, NodeEventError> {
-
    trace!("parse_ref: s={s:?}");
-

    const PAT_PATCH: &str = r"^refs/namespaces/[^/]+/refs/heads/patches/([^/]+)$";
    let patch_re = Regex::new(PAT_PATCH).map_err(|e| NodeEventError::Regex(PAT_PATCH, e))?;
    if let Some(patch_captures) = patch_re.captures(s) {
-
        trace!("parse_ref: patch_captures={patch_captures:?}");
        if let Some(patch_id) = patch_captures.get(1) {
-
            trace!("parse_ref: patch_id={patch_id:?}");
            let patch_id_str = patch_id.as_str();
            let oid = Oid::try_from(patch_id_str)
                .map_err(|e| NodeEventError::ParseOid(patch_id_str.into(), e))?;
-
            trace!("parse_ref: patch oid={oid:?}");
            return Ok(Some(ParsedRef::Patch(oid)));
        }
    }
@@ -583,40 +533,26 @@ pub fn parse_ref(s: &str) -> Result<Option<ParsedRef>, NodeEventError> {
    const PAT_BRANCH: &str = r"^refs/namespaces/[^/]+/refs/heads/(.+)$";
    let push_re = Regex::new(PAT_BRANCH).map_err(|e| NodeEventError::Regex(PAT_BRANCH, e))?;
    if let Some(push_captures) = push_re.captures(s) {
-
        trace!("parse_ref: push_captures={push_captures:?}");
        if let Some(branch) = push_captures.get(1) {
-
            trace!("parse_ref: branch={branch:?}");
            return Ok(Some(ParsedRef::Push(branch.as_str().to_string())));
        }
    }

-
    trace!("parse_ref: neither push nor patch");
    Ok(None)
}

#[cfg(test)]
-
fn log_init() {
-
    let _ = env_logger::builder()
-
        .is_test(true)
-
        .format_timestamp(None)
-
        .filter_level(log::LevelFilter::Trace)
-
        .try_init();
-
}
-

-
#[cfg(test)]
mod test_parse_ref {
-
    use super::{log_init, parse_ref, Oid, ParsedRef};
+
    use super::{parse_ref, Oid, ParsedRef};

    #[test]
    fn plain_branch_name_is_none() -> anyhow::Result<()> {
-
        log_init();
        assert_eq!(parse_ref("main")?, None);
        Ok(())
    }

    #[test]
    fn namespaced_branch() -> anyhow::Result<()> {
-
        log_init();
        assert_eq!(
            parse_ref(
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/main"
@@ -628,7 +564,6 @@ mod test_parse_ref {

    #[test]
    fn namespaced_branch_with_slashes() -> anyhow::Result<()> {
-
        log_init();
        assert_eq!(
            parse_ref(
                "refs/namespaces/z6MkfBU2cwcZfaE6Z1dLqb7Ve7u4pdgbSo9tP6qUVsqFn2xv/refs/heads/liw/cob/draft/v2"
@@ -640,7 +575,6 @@ mod test_parse_ref {

    #[test]
    fn namespaced_patch() -> anyhow::Result<()> {
-
        log_init();
        const SHA: &str = "0a4c69183fc8b8d849f5ab977d70f2a1f4788bca";
        assert_eq!(
            parse_ref(&format!("refs/namespaces/NID/refs/heads/patches/{SHA}"))?,
@@ -672,11 +606,10 @@ pub fn push_branch(name: &str) -> String {

#[cfg(test)]
mod test {
-
    use super::{is_patch_update, log_init, parse_ref, push_branch, Oid, ParsedRef};
+
    use super::{is_patch_update, parse_ref, push_branch, Oid, ParsedRef};

    #[test]
    fn test_parse_patch_ref() -> anyhow::Result<()> {
-
        log_init();
        let patch_ref =
            "refs/namespaces/NID/refs/heads/patches/9183ed6232687d3105482960cecb01a53018b80a";

@@ -691,7 +624,6 @@ mod test {

    #[test]
    fn test_parse_push_ref() -> anyhow::Result<()> {
-
        log_init();
        let push_ref =
            "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main";
        let parsed_ref = parse_ref(push_ref)?;
@@ -708,7 +640,6 @@ mod test {

    #[test]
    fn test_parse_invalid_ref() -> anyhow::Result<()> {
-
        log_init();
        let invalid_ref = "invalid_ref";
        let parsed_ref = parse_ref(invalid_ref)?;
        assert!(parsed_ref.is_none());
@@ -717,7 +648,6 @@ mod test {

    #[test]
    fn branch_is_not_patch_update() {
-
        log_init();
        assert_eq!(
            is_patch_update(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/main"
@@ -728,7 +658,6 @@ mod test {

    #[test]
    fn patch_branch_is_not_patch_update() {
-
        log_init();
        assert_eq!(
            is_patch_update(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/patches/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
@@ -739,7 +668,6 @@ mod test {

    #[test]
    fn patch_update() {
-
        log_init();
        assert_eq!(
            is_patch_update(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/cobs/xyz.radicle.patch/bbb54a2c9314a528a4fff9d6c2aae874ed098433"
@@ -750,7 +678,6 @@ mod test {

    #[test]
    fn get_push_branch() {
-
        log_init();
        assert_eq!(
            push_branch(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/heads/branch_name"
@@ -761,7 +688,6 @@ mod test {

    #[test]
    fn get_no_push_branch() {
-
        log_init();
        assert_eq!(
            push_branch(
                "refs/namespaces/z6MkuhvCnrcow7vzkyQzkuFixzpTa42iC2Cfa4DA8HRLCmys/refs/rad/sigrefs"
modified src/logger.rs
@@ -1,34 +1,224 @@
//! A logger abstraction for the CI broker.

-
use std::sync::Mutex;
+
#[cfg(test)]
+
use std::sync::Once;
+
use std::{path::Path, sync::Mutex, time::Duration};

-
use slog::{info, o, Drain};
+
use radicle::{git::raw::Oid, identity::RepoId};
+
use slog::{debug, error, info, o, warn, Drain};
+
use slog_scope::GlobalLoggerGuard;

-
pub struct Logger {
-
    root: slog::Logger,
+
use crate::{config::Config, db::QueueId, event::BrokerEvent, msg::Request, run::Run};
+

+
pub fn open() -> GlobalLoggerGuard {
+
    let logger = slog_json::Json::new(std::io::stderr())
+
        .add_default_keys()
+
        .set_flush(true)
+
        .set_newlines(true)
+
        .build();
+
    let log = slog::Logger::root(Mutex::new(logger).fuse(), o!());
+
    slog_scope::set_global_logger(log)
}

-
impl Default for Logger {
-
    fn default() -> Self {
-
        Self {
-
            root: slog::Logger::root(
-
                Mutex::new(slog_json::Json::default(std::io::stderr())).map(slog::Fuse),
-
                o!(),
-
            ),
-
        }
-
    }
+
// Set up structured logging for tests.
+
//
+
// We have to keep the GlobalLoggerGuard we get when we initialize
+
// `slog-scope` alive as long as we may need to do any logging. We can
+
// only create that once per process. For tests, we do that here.
+
//
+
// We can't do the same thing for non-test processes, as that would
+
// interfere with use of `radicle-ci-broker` as a library. Libraries
+
// should not interfere with global state, unless they're specifically
+
// intended to do that.
+
//
+
// This is for tests only: otherwise
+

+
// We use this to make sure we initialize the logger only once.
+
#[cfg(test)]
+
static INIT: Once = Once::new();
+

+
// This is the actual logger for tests.
+
#[cfg(test)]
+
static mut LOGGER: Option<GlobalLoggerGuard> = None;
+

+
#[cfg(test)]
+
#[ctor::ctor]
+
fn open_for_tests() {
+
    INIT.call_once(|| unsafe {
+
        LOGGER = Some(open());
+
    });
}

-
impl Logger {
-
    pub fn start_cib(&self) {
-
        info!(&self.root, "CI broker starts");
-
    }
+
pub fn start_cib() {
+
    info!(slog_scope::logger(), "CI broker starts");
+
}

-
    pub fn end_cib_successfully(&self) {
-
        info!(&self.root, "CI broker ends successfully");
-
    }
+
pub fn end_cib_successfully() {
+
    info!(slog_scope::logger(), "CI broker ends successfully");
+
}
+

+
pub fn end_cib_in_error() {
+
    info!(
+
        slog_scope::logger(),
+
        "CI broker ends in unrecoverable error"
+
    );
+
}
+

+
pub fn loaded_config(config: &Config) {
+
    debug!(slog_scope::logger(), "loaded configuration {config:#?}");
+
}
+

+
pub fn queueproc_start() {
+
    info!(
+
        slog_scope::logger(),
+
        "start thread to process events until a shutdown event"
+
    );
+
}
+

+
pub fn queueproc_end() {
+
    info!(slog_scope::logger(), "start thread to process events ends");
+
}
+

+
pub fn queueproc_channel_disconnect() {
+
    info!(
+
        slog_scope::logger(),
+
        "event notification channel disconnected"
+
    );
+
}
+

+
pub fn queueproc_remove_event(id: &QueueId) {
+
    info!(slog_scope::logger(), "remove event from queue: {id}");
+
}
+

+
pub fn queueproc_action_run(rid: &RepoId, oid: &Oid) {
+
    info!(slog_scope::logger(), "Action: run: {rid} {oid}");
+
}
+

+
pub fn queueproc_action_shutdown() {
+
    info!(slog_scope::logger(), "Action: shutdown");
+
}
+

+
pub fn queueadd_start() {
+
    info!(
+
        slog_scope::logger(),
+
        "start thread to add events from node to event queue"
+
    );
+
}
+

+
pub fn queueadd_control_socket_close() {
+
    info!(
+
        slog_scope::logger(),
+
        "no more events from node control sockets"
+
    );
+
}
+

+
pub fn queueadd_push_event(e: &BrokerEvent) {
+
    debug!(
+
        slog_scope::logger(),
+
        "insert broker event into queue: {e:?}"
+
    );
+
}
+

+
pub fn queueadd_end() {
+
    info!(slog_scope::logger(), "start thread to process events ends");
+
}
+

+
pub fn pages_directory_unset() {
+
    warn!(
+
        slog_scope::logger(),
+
        "not writing HTML report pages as output directory has not been set"
+
    );
+
}
+

+
pub fn pages_interval(interval: Duration) {
+
    info!(
+
        slog_scope::logger(),
+
        "wait about {} seconds to update HTML report pages again",
+
        interval.as_secs()
+
    );
+
}
+

+
pub fn pages_disconnected() {
+
    info!(
+
        slog_scope::logger(),
+
        "page updater: run notification channel disconnected"
+
    );
+
}
+

+
pub fn pages_start() {
+
    info!(slog_scope::logger(), "start page updater thread");
+
}
+

+
pub fn pages_end() {
+
    info!(slog_scope::logger(), "end page updater thread");
+
}
+

+
pub fn event_disconnected() {
+
    info!(
+
        slog_scope::logger(),
+
        "connection to node control socket broke"
+
    );
+
}
+

+
pub fn event_end() {
+
    info!(
+
        slog_scope::logger(),
+
        "no more node events from control socket: iterator ended"
+
    );
+
}
+

+
pub fn broker_db(filename: &Path) {
+
    info!(
+
        slog_scope::logger(),
+
        "broker database: {}",
+
        filename.display()
+
    );
+
}
+

+
pub fn broker_start_run(trigger: &Request) {
+
    info!(slog_scope::logger(), "start CI run");
+
    debug!(slog_scope::logger(), "trigger event: {trigger:#?}");
+
}
+

+
pub fn broker_end_run(run: &Run) {
+
    info!(slog_scope::logger(), "Finish CI run");
+
    debug!(slog_scope::logger(), "finished CI run: {run:#?}");
+
}
+

+
pub fn adapter_no_first_response() {
+
    error!(slog_scope::logger(), "no first response message");
+
}
+

+
pub fn adapter_no_second_response() {
+
    error!(slog_scope::logger(), "no second response message");
+
}
+

+
pub fn adapter_too_many_responses() {
+
    error!(slog_scope::logger(), "too many response messages");
+
}
+

+
pub fn adapter_result(exit: i32, stderr: &String) {
+
    debug!(slog_scope::logger(), "adapter exit code: {exit}");
+
    debug!(slog_scope::logger(), "adapter stderr: {stderr}");
+
}
+

+
pub fn adapter_did_not_exit_voluntarioly() {
+
    warn!(slog_scope::logger(), "adapter did not exit voluntarily");
+
}
+

+
pub fn debug(msg: &str) {
+
    debug!(slog_scope::logger(), "{msg}");
+
}
+

+
pub fn debug2(msg: String) {
+
    debug!(slog_scope::logger(), "{msg}");
+
}

-
    pub fn end_cib_in_error(&self) {
-
        info!(&self.root, "CI broker ends in unrecoverable error");
+
pub fn error(msg: &str, e: &impl std::error::Error) {
+
    error!(slog_scope::logger(), "{msg}: {e}");
+
    let mut e = e.source();
+
    while let Some(source) = e {
+
        error!(slog_scope::logger(), "caused by: {}", source);
+
        e = source.source();
    }
}
modified src/pages.rs
@@ -17,7 +17,6 @@ use std::{
};

use html_page::{Element, HtmlPage, Tag};
-
use log::{debug, info, trace, warn};
use serde::Serialize;
use time::{macros::format_description, OffsetDateTime};

@@ -30,6 +29,7 @@ use radicle::{
use crate::{
    db::{Db, DbError, QueuedEvent},
    event::{parse_ref, BrokerEvent, ParsedRef},
+
    logger,
    msg::RunId,
    notif::NotificationReceiver,
    run::{Run, RunState, Whence},
@@ -96,7 +96,7 @@ impl PageBuilder {
        for run in self.runs.iter() {
            runs.insert(run.adapter_run_id().unwrap().clone(), run.clone());
        }
-
        debug!("broker database has {} CI runs", runs.len());
+
        logger::debug2(format!("broker database has {} CI runs", runs.len()));

        Ok(StatusPage::new())
    }
@@ -525,15 +525,15 @@ impl StatusPage {
        db: Db,
        once: bool,
    ) -> JoinHandle<Result<(), PageError>> {
+
        logger::pages_start();
+

        if self.dirname.is_none() {
-
            warn!("not writing HTML report pages as output directory has not been set");
+
            logger::pages_directory_unset();
        }

        self.node_alias = profile.config.alias().to_string();
-
        info!(
-
            "wait about {} seconds to update HTML report pages again",
-
            UPDATE_INTERVAL.as_secs()
-
        );
+
        logger::pages_interval(UPDATE_INTERVAL);
+

        spawn(move || {
            'processing_loop: loop {
                self.update_and_write(&profile, &db)?;
@@ -541,35 +541,26 @@ impl StatusPage {
                    return Ok(());
                }

-
                trace!("wait for run notification or timeout");
                match run_rx.recv_timeout(UPDATE_INTERVAL) {
-
                    Ok(_) => trace!("page updater: got a run notification"),
-
                    Err(RecvTimeoutError::Timeout) => {
-
                        trace!("page updater: timeout on run notification channel")
-
                    }
+
                    Ok(_) => (),
+
                    Err(RecvTimeoutError::Timeout) => (),
                    Err(RecvTimeoutError::Disconnected) => {
-
                        debug!("page updater: run notification channel disconnected");
+
                        logger::pages_disconnected();
                        break 'processing_loop;
                    }
                }
            }

            // Make sure we update reports and status JSON at least once.
-
            info!("make sure we update reports and status JSON at least once");
            self.update_and_write(&profile, &db)?;

-
            info!("end page updater thread");
+
            logger::pages_end();
            Ok(())
        })
    }

    fn update_and_write(&mut self, profile: &Profile, db: &Db) -> Result<(), PageError> {
        if let Some(dirname) = &self.dirname {
-
            info!(
-
                "write HTML report pages and {STATUS_JSON} to {}",
-
                dirname.display()
-
            );
-

            let runs = db.get_all_runs()?;

            // Create list of events, except ones for private
@@ -655,7 +646,6 @@ impl StatusPage {
    }

    fn write_file(filename: &Path, text: &str) -> Result<(), PageError> {
-
        debug!("write file {}", filename.display());
        write(filename, text).map_err(|e| PageError::Write(filename.into(), e))?;
        Ok(())
    }
modified src/queueadd.rs
@@ -2,11 +2,10 @@ use std::thread::{spawn, JoinHandle};

use radicle::Profile;

-
use log::{debug, info};
-

use crate::{
    db::{Db, DbError},
    event::{BrokerEvent, EventFilter, NodeEventError, NodeEventSource},
+
    logger,
    notif::NotificationSender,
};

@@ -62,31 +61,26 @@ impl QueueAdder {
    }

    pub fn add_events(&self) -> Result<(), AdderError> {
-
        info!("start thread to add events from node to event queue");
+
        logger::queueadd_start();

        let profile = Profile::load()?;
-
        debug!("loaded profile {profile:#?}");

        let mut source = NodeEventSource::new(&profile)?;
-
        debug!("created node event source");

        for filter in self.filters.iter() {
            source.allow(filter.clone());
        }
-
        debug!("added filters to node event source");

        // This loop ends when there's an error, e.g., failure to read an
        // event from the node.
        'event_loop: loop {
-
            debug!("waiting for event from node");
            let events = source.event()?;
            if events.is_empty() {
-
                info!("no more events from node control sockets");
+
                logger::queueadd_control_socket_close();
                break 'event_loop;
            } else {
                for e in events {
-
                    debug!("got event {e:#?}");
-
                    info!("insert broker event into queue: {e:?}");
+
                    logger::queueadd_push_event(&e);
                    self.push_event(e)?;
                }
            }
@@ -95,10 +89,10 @@ impl QueueAdder {
        // Add a shutdown event to the queue so that the queue
        // processing thread knows to stop.
        if self.push_shutdown {
-
            info!("push a shutdown event into queue");
            self.push_event(BrokerEvent::Shutdown)?;
        }

+
        logger::queueadd_end();
        Ok(())
    }

modified src/queueproc.rs
@@ -8,13 +8,13 @@ use std::{
    time::Duration,
};

-
use log::{debug, info};
use radicle::Profile;

use crate::{
    broker::{Broker, BrokerError},
    db::{Db, DbError, QueueId, QueuedEvent},
    event::BrokerEvent,
+
    logger,
    msg::{MessageError, RequestBuilder},
    notif::{NotificationReceiver, NotificationSender},
};
@@ -75,38 +75,30 @@ impl QueueProcessor {
    }

    fn process_until_shutdown(&mut self) -> Result<(), QueueError> {
-
        info!("start thread to process events until a shutdown event");
+
        logger::queueproc_start();
        let mut done = false;
        while !done {
-
            info!("Looking for an event to process");
            while let Some(qe) = self.pick_event()? {
-
                info!("picked event from queue: {}: {:?}", qe.id(), qe.event());
                done = self.process_event(qe.event())?;
                self.drop_event(qe.id())?;
                self.run_tx.send(()).map_err(|_| QueueError::NotifyRun)?;
            }
-
            info!("waiting for next event from node");
            match self.events_rx.recv_timeout(RECV_TIMEOUT) {
-
                Ok(_) => {
-
                    info!("got event notification");
-
                }
-
                Err(RecvTimeoutError::Timeout) => {
-
                    info!("timed out on event notification");
-
                }
+
                Ok(_) => {}
+
                Err(RecvTimeoutError::Timeout) => {}
                Err(RecvTimeoutError::Disconnected) => {
-
                    info!("event notification channel disconnected");
+
                    logger::queueproc_channel_disconnect();
                    done = true;
                }
            }
        }

-
        info!("thread to process events ends");
+
        logger::queueproc_end();
        Ok(())
    }

    fn pick_event(&self) -> Result<Option<QueuedEvent>, QueueError> {
        let ids = self.db.queued_events().map_err(QueueError::db)?;
-
        debug!("event queue has {} events", ids.len());

        let mut queue = vec![];
        for id in ids.iter() {
@@ -131,7 +123,7 @@ impl QueueProcessor {
                oid,
                old: _,
            } => {
-
                info!("Action: run: {rid} {oid}");
+
                logger::queueproc_action_run(rid, oid);

                let trigger = RequestBuilder::default()
                    .profile(&self.profile)
@@ -144,14 +136,14 @@ impl QueueProcessor {
                Ok(false)
            }
            BrokerEvent::Shutdown => {
-
                info!("Action: shutdown");
+
                logger::queueproc_action_shutdown();
                Ok(true)
            }
        }
    }

    fn drop_event(&mut self, id: &QueueId) -> Result<(), QueueError> {
-
        debug!("remove event {id}");
+
        logger::queueproc_remove_event(id);
        self.db.remove_queued_event(id).map_err(QueueError::db)?;
        Ok(())
    }
modified src/test.rs
@@ -90,11 +90,3 @@ pub fn mock_adapter(filename: &Path, script: &str) -> TestResult<Adapter> {

    Ok(Adapter::new(filename))
}
-

-
pub fn log_in_tests() {
-
    env_logger::builder()
-
        .filter_level(log::LevelFilter::Debug)
-
        .is_test(true)
-
        .try_init()
-
        .ok();
-
}