Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
fix(ci-broker.md): avoid race condition by waiting up to a limit
Merged liw opened 1 year ago

Previously we did “sleep 0.1” to wait for the mock node daemon (synthetic-events) to delete the control socket, when it ends. This is too short for some environments where the scenario gets run.

Fix this by waiting for the socket to be deleted for some reasonably large time limit, but checking for it being removed every second.

Signed-off-by: Lars Wirzenius liw@liw.fi

2 files changed +139 -71 58ceb336 e82503bf
modified ci-broker.md
@@ -581,27 +581,26 @@ scenarios.

_Who:_ `cib-devs`

-
The following scenario may only work on Linux, as it's using `pgrep`
-
and `nc` and those may not be portable. If so, this may need to be
-
changed for other platforms.
+
We use the `synthetic-events --client` option to connect to the daemon
+
and wait for the daemon to delete the socket file. This is more
+
easily portable than using a generic tool such as `nc`, which has many
+
variants across operating systems.

-
Note also that the version of `nc` must support the `-U` option, which
-
in Debian means using the `netcat-openbsd` package.
-

-
We sleep for a very short time to make sure the `synthetic-events`
-
daemon has time to remove the socket file before we check that it's
-
been deleted.
+
We wait for up to ten seconds the `synthetic-events` daemon to remove
+
the socket file before we check that it's been deleted, but checking
+
for that every second. This avoids the trap of waiting for a fixed
+
time: if the time is too short, the scenario fails spuriously, and if
+
it's very long, the scenario takes longer than necessary.

~~~scenario
given an installed synthetic-events

then file synt.sock does not exist

-
when I run synthetic-events synt.sock
+
when I run synthetic-events --log daemon.log synt.sock
then file synt.sock exists

-
when I run nc -U synt.sock
-
when I run sleep 0.1
+
when I run synthetic-events --client --log daemon.log synt.sock
then file synt.sock does not exist
~~~

modified src/bin/synthetic-events.rs
@@ -10,42 +10,103 @@ use std::{
    fs::{read, remove_file, File, OpenOptions},
    io::{Read, Write},
    net::Shutdown,
-
    os::unix::net::UnixListener,
+
    os::unix::net::{UnixListener, UnixStream},
    path::PathBuf,
    process::{Command, Stdio},
    thread::sleep,
-
    time::Duration,
+
    time::{Duration, Instant},
};

use clap::Parser;

use radicle::node::Event;

+
const MAX_WAIT: Duration = Duration::from_secs(10);
+

fn log(file: &mut File, msg: String) {
    file.write_all(msg.as_bytes()).ok();
}

fn main() -> anyhow::Result<()> {
    let args = Args::parse();
-
    let exe = current_exe()?;

-
    if args.daemon {
-
        let mut f = OpenOptions::new()
-
            .create(true)
-
            .append(true)
-
            .open(args.log.unwrap())?;
+
    if let Err(e) = {
+
        if args.client {
+
            client(&args)
+
        } else if args.daemon {
+
            daemon(&args)
+
        } else {
+
            launch_daemon(&args)
+
        }
+
    } {
+
        eprintln!("ERROR: {e}");
+
        let mut e = e.source();
+
        while let Some(underlying) = e {
+
            eprintln!("caused by: {underlying}");
+
            e = underlying.source();
+
        }
+
    }

-
        log(&mut f, "daemon starts\n".into());
+
    Ok(())
+
}
+

+
#[derive(Debug, Parser)]
+
struct Args {
+
    #[clap(long)]
+
    client: bool,
+
    #[clap(long)]
+
    daemon: bool,
+
    #[clap(long)]
+
    log: Option<PathBuf>,
+
    #[clap(long)]
+
    repeat: Option<usize>,
+
    socket: PathBuf,
+
    events: Vec<PathBuf>,
+
}
+

+
fn client(args: &Args) -> anyhow::Result<()> {
+
    println!("connect to {}", args.socket.display());
+
    {
+
        let mut stream = UnixStream::connect(&args.socket).unwrap();
+
        stream.shutdown(Shutdown::Write)?;
+
        let mut buf = vec![];
+
        stream.read_to_end(&mut buf)?;
+
    }
+

+
    println!("wait for daemon to remove the socket file");
+
    let started = Instant::now();
+
    while args.socket.exists() {
+
        let elapsed = started.elapsed();
+
        println!(
+
            "still waiting; it's been {} milliseconds",
+
            elapsed.as_millis()
+
        );
+
        if elapsed > MAX_WAIT {
+
            if let Some(filename) = args.log.as_ref() {
+
                let log = std::fs::read(filename)?;
+
                let log = String::from_utf8_lossy(&log);
+
                eprintln!("daemon log {}:\n====\n{log}\n----\n", filename.display());
+
            }
+
            panic!(
+
                "waited too long for {} to be deleted",
+
                args.socket.display()
+
            );
+
        }
+
        sleep(Duration::from_secs(1));
+
    }
+
    Ok(())
+
}
+

+
fn daemon(args: &Args) -> anyhow::Result<()> {
+
    fn helper(args: &Args, f: &mut File) -> anyhow::Result<()> {
+
        log(f, "daemon starts\n".into());

        let repeat = args.repeat.unwrap_or(1);
-
        log(&mut f, format!("repeat={repeat}\n"));
+
        log(f, format!("repeat={repeat}\n"));

        let mut events = vec![];
        for filename in args.events.iter() {
-
            log(
-
                &mut f,
-
                format!("daemon reads file {}\n", filename.display()),
-
            );
+
            log(f, format!("daemon reads file {}\n", filename.display()));

            let data = read(filename)?;
            let e: Event = serde_json::from_slice(&data)?;
@@ -54,71 +115,79 @@ fn main() -> anyhow::Result<()> {
            events.push(e);
        }

-
        log(
-
            &mut f,
-
            format!("daemon connects to {}\n", args.socket.display()),
-
        );
+
        log(f, format!("daemon connects to {}\n", args.socket.display()));
        let listener = UnixListener::bind(&args.socket)?;

        if let Some(stream) = listener.incoming().next() {
-
            log(&mut f, "daemon accepted connection\n".into());
+
            log(f, format!("daemon accepted connection: {stream:?}\n"));
            let mut stream = stream?;
+
            log(f, format!("unwrapped stream: {stream:?}"));
            for e in events.iter() {
+
                log(f, "iterated to an event\n".into());
                for _ in 0..repeat {
                    stream.write_all(e.as_bytes())?;
                }
-
                log(&mut f, "daemon sent a message\n".into());
+
                log(f, "daemon sent a message\n".into());
            }
-
            log(&mut f, "daemon sent all messages\n".into());
+
            log(f, "daemon sent all messages\n".into());

+
            log(f, "daemon shutting down write end\n".into());
            stream.shutdown(Shutdown::Write)?;
-
            log(&mut f, "daemon shutdown write\n".into());
+
            log(f, "daemon shutdown write\n".into());

-
            stream.set_read_timeout(Some(Duration::from_millis(1000)))?;
+
            // log(f, "daemon set socket read timeout\n".into());
+
            // stream.set_read_timeout(Some(Duration::from_millis(1000)))?;
            let mut buf = vec![];
+
            log(f, "daemon read from socket\n".into());
            stream.read_to_end(&mut buf).ok();
-
            log(&mut f, "daemon finished reading\n".into());
+
            log(f, "daemon finished reading\n".into());
        }

+
        log(f, "daemon removes socket file\n".into());
        remove_file(&args.socket)?;
-
        log(&mut f, "daemon ends\n".into());
-
    } else {
-
        if args.socket.exists() {
-
            eprintln!("removing socket {}", args.socket.display());
-
            remove_file(&args.socket)?;
-
        }
-
        eprintln!("launching daemon");
-
        let repeat = args.repeat.unwrap_or(1);
-
        Command::new(exe)
-
            .arg("--daemon")
-
            .arg("--log")
-
            .arg(args.log.unwrap_or(PathBuf::from("/dev/null")))
-
            .arg(&format!("--repeat={repeat}"))
-
            .arg(&args.socket)
-
            .args(&args.events)
-
            .stdin(Stdio::null())
-
            .stdout(Stdio::null())
-
            .stderr(Stdio::null())
-
            .spawn()?;
-
        eprintln!("waiting for daemon to create socket");
-
        while !args.socket.exists() {
-
            eprintln!("no socket yet");
-
            sleep(Duration::from_millis(100));
-
        }
-
        eprintln!("there is a socket now");
+

+
        log(f, "daemon ends\n".into());
+

+
        Ok(())
+
    }
+

+
    let mut f = OpenOptions::new()
+
        .create(true)
+
        .append(true)
+
        .open(args.log.as_ref().unwrap())?;
+

+
    if let Err(e) = helper(args, &mut f) {
+
        log(&mut f, format!("ERROR: {e}"));
    }

    Ok(())
}

-
#[derive(Debug, Parser)]
-
struct Args {
-
    #[clap(long)]
-
    daemon: bool,
-
    #[clap(long)]
-
    log: Option<PathBuf>,
-
    #[clap(long)]
-
    repeat: Option<usize>,
-
    socket: PathBuf,
-
    events: Vec<PathBuf>,
+
fn launch_daemon(args: &Args) -> anyhow::Result<()> {
+
    if args.socket.exists() {
+
        println!("removing socket {}", args.socket.display());
+
        remove_file(&args.socket)?;
+
    }
+
    println!("launching daemon");
+
    let repeat = args.repeat.unwrap_or(1);
+
    let exe = current_exe()?;
+
    Command::new(exe)
+
        .arg("--daemon")
+
        .arg("--log")
+
        .arg(args.log.clone().unwrap_or(PathBuf::from("/dev/null")))
+
        .arg(&format!("--repeat={repeat}"))
+
        .arg(&args.socket)
+
        .args(&args.events)
+
        .stdin(Stdio::null())
+
        .stdout(Stdio::null())
+
        .stderr(Stdio::null())
+
        .spawn()?;
+
    println!("waiting for daemon to create socket");
+
    while !args.socket.exists() {
+
        println!("no socket yet");
+
        sleep(Duration::from_millis(100));
+
    }
+
    println!("there is a socket now");
+

+
    Ok(())
}