Radish alpha
r
Radicle CI broker
Radicle
Git (anonymous pull)
Log in to clone via SSH
fix(ci-broker.md): mock node control socket handling in helper
Lars Wirzenius committed 1 year ago
commit e82503bf951280285bf5f7decb5af9968bccfce2
parent 58ceb33620b9b2dbeb8642f3cbf70b85d167d1ca
2 files changed +139 -71
modified ci-broker.md
@@ -581,27 +581,26 @@ scenarios.

_Who:_ `cib-devs`

-
The following scenario may only work on Linux, as it's using `pgrep`
-
and `nc` and those may not be portable. If so, this may need to be
-
changed for other platforms.
+
We use the `synthetic-events --client` option to connect to the daemon
+
and wait for the daemon to delete the socket file. This is more
+
easily portable than using a generic tool such as `nc`, which has many
+
variants across operating systems.

-
Note also that the version of `nc` must support the `-U` option, which
-
in Debian means using the `netcat-openbsd` package.
-

-
We sleep for a very short time to make sure the `synthetic-events`
-
daemon has time to remove the socket file before we check that it's
-
been deleted.
+
We wait for up to ten seconds the `synthetic-events` daemon to remove
+
the socket file before we check that it's been deleted, but checking
+
for that every second. This avoids the trap of waiting for a fixed
+
time: if the time is too short, the scenario fails spuriously, and if
+
it's very long, the scenario takes longer than necessary.

~~~scenario
given an installed synthetic-events

then file synt.sock does not exist

-
when I run synthetic-events synt.sock
+
when I run synthetic-events --log daemon.log synt.sock
then file synt.sock exists

-
when I run nc -U synt.sock
-
when I run sleep 0.1
+
when I run synthetic-events --client --log daemon.log synt.sock
then file synt.sock does not exist
~~~

modified src/bin/synthetic-events.rs
@@ -10,42 +10,103 @@ use std::{
    fs::{read, remove_file, File, OpenOptions},
    io::{Read, Write},
    net::Shutdown,
-
    os::unix::net::UnixListener,
+
    os::unix::net::{UnixListener, UnixStream},
    path::PathBuf,
    process::{Command, Stdio},
    thread::sleep,
-
    time::Duration,
+
    time::{Duration, Instant},
};

use clap::Parser;

use radicle::node::Event;

+
const MAX_WAIT: Duration = Duration::from_secs(10);
+

fn log(file: &mut File, msg: String) {
    file.write_all(msg.as_bytes()).ok();
}

fn main() -> anyhow::Result<()> {
    let args = Args::parse();
-
    let exe = current_exe()?;

-
    if args.daemon {
-
        let mut f = OpenOptions::new()
-
            .create(true)
-
            .append(true)
-
            .open(args.log.unwrap())?;
+
    if let Err(e) = {
+
        if args.client {
+
            client(&args)
+
        } else if args.daemon {
+
            daemon(&args)
+
        } else {
+
            launch_daemon(&args)
+
        }
+
    } {
+
        eprintln!("ERROR: {e}");
+
        let mut e = e.source();
+
        while let Some(underlying) = e {
+
            eprintln!("caused by: {underlying}");
+
            e = underlying.source();
+
        }
+
    }

-
        log(&mut f, "daemon starts\n".into());
+
    Ok(())
+
}
+

+
#[derive(Debug, Parser)]
+
struct Args {
+
    #[clap(long)]
+
    client: bool,
+
    #[clap(long)]
+
    daemon: bool,
+
    #[clap(long)]
+
    log: Option<PathBuf>,
+
    #[clap(long)]
+
    repeat: Option<usize>,
+
    socket: PathBuf,
+
    events: Vec<PathBuf>,
+
}
+

+
fn client(args: &Args) -> anyhow::Result<()> {
+
    println!("connect to {}", args.socket.display());
+
    {
+
        let mut stream = UnixStream::connect(&args.socket).unwrap();
+
        stream.shutdown(Shutdown::Write)?;
+
        let mut buf = vec![];
+
        stream.read_to_end(&mut buf)?;
+
    }
+

+
    println!("wait for daemon to remove the socket file");
+
    let started = Instant::now();
+
    while args.socket.exists() {
+
        let elapsed = started.elapsed();
+
        println!(
+
            "still waiting; it's been {} milliseconds",
+
            elapsed.as_millis()
+
        );
+
        if elapsed > MAX_WAIT {
+
            if let Some(filename) = args.log.as_ref() {
+
                let log = std::fs::read(filename)?;
+
                let log = String::from_utf8_lossy(&log);
+
                eprintln!("daemon log {}:\n====\n{log}\n----\n", filename.display());
+
            }
+
            panic!(
+
                "waited too long for {} to be deleted",
+
                args.socket.display()
+
            );
+
        }
+
        sleep(Duration::from_secs(1));
+
    }
+
    Ok(())
+
}
+

+
fn daemon(args: &Args) -> anyhow::Result<()> {
+
    fn helper(args: &Args, f: &mut File) -> anyhow::Result<()> {
+
        log(f, "daemon starts\n".into());

        let repeat = args.repeat.unwrap_or(1);
-
        log(&mut f, format!("repeat={repeat}\n"));
+
        log(f, format!("repeat={repeat}\n"));

        let mut events = vec![];
        for filename in args.events.iter() {
-
            log(
-
                &mut f,
-
                format!("daemon reads file {}\n", filename.display()),
-
            );
+
            log(f, format!("daemon reads file {}\n", filename.display()));

            let data = read(filename)?;
            let e: Event = serde_json::from_slice(&data)?;
@@ -54,71 +115,79 @@ fn main() -> anyhow::Result<()> {
            events.push(e);
        }

-
        log(
-
            &mut f,
-
            format!("daemon connects to {}\n", args.socket.display()),
-
        );
+
        log(f, format!("daemon connects to {}\n", args.socket.display()));
        let listener = UnixListener::bind(&args.socket)?;

        if let Some(stream) = listener.incoming().next() {
-
            log(&mut f, "daemon accepted connection\n".into());
+
            log(f, format!("daemon accepted connection: {stream:?}\n"));
            let mut stream = stream?;
+
            log(f, format!("unwrapped stream: {stream:?}"));
            for e in events.iter() {
+
                log(f, "iterated to an event\n".into());
                for _ in 0..repeat {
                    stream.write_all(e.as_bytes())?;
                }
-
                log(&mut f, "daemon sent a message\n".into());
+
                log(f, "daemon sent a message\n".into());
            }
-
            log(&mut f, "daemon sent all messages\n".into());
+
            log(f, "daemon sent all messages\n".into());

+
            log(f, "daemon shutting down write end\n".into());
            stream.shutdown(Shutdown::Write)?;
-
            log(&mut f, "daemon shutdown write\n".into());
+
            log(f, "daemon shutdown write\n".into());

-
            stream.set_read_timeout(Some(Duration::from_millis(1000)))?;
+
            // log(f, "daemon set socket read timeout\n".into());
+
            // stream.set_read_timeout(Some(Duration::from_millis(1000)))?;
            let mut buf = vec![];
+
            log(f, "daemon read from socket\n".into());
            stream.read_to_end(&mut buf).ok();
-
            log(&mut f, "daemon finished reading\n".into());
+
            log(f, "daemon finished reading\n".into());
        }

+
        log(f, "daemon removes socket file\n".into());
        remove_file(&args.socket)?;
-
        log(&mut f, "daemon ends\n".into());
-
    } else {
-
        if args.socket.exists() {
-
            eprintln!("removing socket {}", args.socket.display());
-
            remove_file(&args.socket)?;
-
        }
-
        eprintln!("launching daemon");
-
        let repeat = args.repeat.unwrap_or(1);
-
        Command::new(exe)
-
            .arg("--daemon")
-
            .arg("--log")
-
            .arg(args.log.unwrap_or(PathBuf::from("/dev/null")))
-
            .arg(&format!("--repeat={repeat}"))
-
            .arg(&args.socket)
-
            .args(&args.events)
-
            .stdin(Stdio::null())
-
            .stdout(Stdio::null())
-
            .stderr(Stdio::null())
-
            .spawn()?;
-
        eprintln!("waiting for daemon to create socket");
-
        while !args.socket.exists() {
-
            eprintln!("no socket yet");
-
            sleep(Duration::from_millis(100));
-
        }
-
        eprintln!("there is a socket now");
+

+
        log(f, "daemon ends\n".into());
+

+
        Ok(())
+
    }
+

+
    let mut f = OpenOptions::new()
+
        .create(true)
+
        .append(true)
+
        .open(args.log.as_ref().unwrap())?;
+

+
    if let Err(e) = helper(args, &mut f) {
+
        log(&mut f, format!("ERROR: {e}"));
    }

    Ok(())
}

-
#[derive(Debug, Parser)]
-
struct Args {
-
    #[clap(long)]
-
    daemon: bool,
-
    #[clap(long)]
-
    log: Option<PathBuf>,
-
    #[clap(long)]
-
    repeat: Option<usize>,
-
    socket: PathBuf,
-
    events: Vec<PathBuf>,
+
fn launch_daemon(args: &Args) -> anyhow::Result<()> {
+
    if args.socket.exists() {
+
        println!("removing socket {}", args.socket.display());
+
        remove_file(&args.socket)?;
+
    }
+
    println!("launching daemon");
+
    let repeat = args.repeat.unwrap_or(1);
+
    let exe = current_exe()?;
+
    Command::new(exe)
+
        .arg("--daemon")
+
        .arg("--log")
+
        .arg(args.log.clone().unwrap_or(PathBuf::from("/dev/null")))
+
        .arg(&format!("--repeat={repeat}"))
+
        .arg(&args.socket)
+
        .args(&args.events)
+
        .stdin(Stdio::null())
+
        .stdout(Stdio::null())
+
        .stderr(Stdio::null())
+
        .spawn()?;
+
    println!("waiting for daemon to create socket");
+
    while !args.socket.exists() {
+
        println!("no socket yet");
+
        sleep(Duration::from_millis(100));
+
    }
+
    println!("there is a socket now");
+

+
    Ok(())
}