Radish alpha
r
rad:zwTxygwuz5LDGBq255RA2CbNGrz8
Radicle CI broker
Radicle
Git
radicle-ci-broker src bin synthetic-events.rs
//! Read node events, expressed as JSON, from files, and output them as
//! single-line JSON objects to a Unix domain socket.
//!
//! This is a helper tool for testing of the CI broker as a whole.
//!
//! This program forks and runs itself in the background.

use std::{
    env::current_exe,
    fs::{File, OpenOptions, read, remove_file},
    io::{Read, Write},
    net::Shutdown,
    os::unix::net::{UnixListener, UnixStream},
    path::PathBuf,
    process::{Command, Stdio},
    thread::sleep,
    time::{Duration, Instant},
};

use clap::Parser;

use radicle::node::Event;

const MAX_WAIT: Duration = Duration::from_secs(10);

fn log(file: &mut File, msg: String) {
    file.write_all(msg.as_bytes()).ok();
}

fn main() {
    let args = Args::parse();

    if let Err(e) = {
        if args.client {
            client(&args)
        } else if args.daemon {
            daemon(&args)
        } else {
            launch_daemon(&args)
        }
    } {
        eprintln!("ERROR: {e}");
        let mut e = e.source();
        while let Some(underlying) = e {
            eprintln!("caused by: {underlying}");
            e = underlying.source();
        }
    }
}

#[derive(Debug, Parser)]
struct Args {
    #[clap(long)]
    client: bool,
    #[clap(long)]
    daemon: bool,
    #[clap(long)]
    log: Option<PathBuf>,
    #[clap(long)]
    repeat: Option<usize>,
    socket: PathBuf,
    events: Vec<PathBuf>,
}

fn client(args: &Args) -> Result<(), Box<dyn std::error::Error>> {
    println!("connect to {}", args.socket.display());
    {
        let mut stream = UnixStream::connect(&args.socket).unwrap();
        stream.shutdown(Shutdown::Write)?;
        let mut buf = vec![];
        stream.read_to_end(&mut buf)?;
    }

    println!("wait for daemon to remove the socket file");
    let started = Instant::now();
    while args.socket.exists() {
        let elapsed = started.elapsed();
        println!(
            "still waiting; it's been {} milliseconds",
            elapsed.as_millis()
        );
        if elapsed > MAX_WAIT {
            if let Some(filename) = args.log.as_ref() {
                let log = std::fs::read(filename)?;
                let log = String::from_utf8_lossy(&log);
                eprintln!("daemon log {}:\n====\n{log}\n----\n", filename.display());
            }
            panic!(
                "waited too long for {} to be deleted",
                args.socket.display()
            );
        }
        sleep(Duration::from_secs(1));
    }
    Ok(())
}

fn daemon(args: &Args) -> Result<(), Box<dyn std::error::Error>> {
    fn helper(args: &Args, f: &mut File) -> Result<(), Box<dyn std::error::Error>> {
        log(f, "daemon starts\n".into());

        let repeat = args.repeat.unwrap_or(1);
        log(f, format!("repeat={repeat}\n"));

        let mut events = vec![];
        for filename in args.events.iter() {
            log(f, format!("daemon reads file {}\n", filename.display()));

            let data = read(filename)?;
            let e: Event = serde_json::from_slice(&data)?;
            let mut e = serde_json::to_string(&e)?;
            e.push('\n');
            events.push(e);
        }

        log(f, format!("daemon connects to {}\n", args.socket.display()));
        let listener = UnixListener::bind(&args.socket)?;

        if let Some(stream) = listener.incoming().next() {
            log(f, format!("daemon accepted connection: {stream:?}\n"));
            let mut stream = stream?;
            log(f, format!("unwrapped stream: {stream:?}"));
            for e in events.iter() {
                log(f, "iterated to an event\n".into());
                for _ in 0..repeat {
                    stream.write_all(e.as_bytes())?;
                }
                log(f, "daemon sent a message\n".into());
            }
            log(f, "daemon sent all messages\n".into());

            log(f, "daemon shutting down write end\n".into());
            stream.shutdown(Shutdown::Write)?;
            log(f, "daemon shutdown write\n".into());

            // log(f, "daemon set socket read timeout\n".into());
            // stream.set_read_timeout(Some(Duration::from_millis(1000)))?;
            let mut buf = vec![];
            log(f, "daemon read from socket\n".into());
            stream.read_to_end(&mut buf).ok();
            log(f, "daemon finished reading\n".into());
        }

        log(f, "daemon removes socket file\n".into());
        remove_file(&args.socket)?;

        log(f, "daemon ends\n".into());

        Ok(())
    }

    let mut f = OpenOptions::new()
        .create(true)
        .append(true)
        .open(args.log.as_ref().unwrap())?;

    if let Err(e) = helper(args, &mut f) {
        log(&mut f, format!("ERROR: {e}"));
    }

    Ok(())
}

fn launch_daemon(args: &Args) -> Result<(), Box<dyn std::error::Error>> {
    if args.socket.exists() {
        println!("removing socket {}", args.socket.display());
        remove_file(&args.socket)?;
    }
    println!("launching daemon");
    let repeat = args.repeat.unwrap_or(1);
    let exe = current_exe()?;
    Command::new(exe)
        .arg("--daemon")
        .arg("--log")
        .arg(args.log.clone().unwrap_or(PathBuf::from("/dev/null")))
        .arg(format!("--repeat={repeat}"))
        .arg(&args.socket)
        .args(&args.events)
        .stdin(Stdio::null())
        .stdout(Stdio::null())
        .stderr(Stdio::null())
        .spawn()?;
    println!("waiting for daemon to create socket");
    while !args.socket.exists() {
        println!("no socket yet");
        sleep(Duration::from_millis(100));
    }
    println!("there is a socket now");

    Ok(())
}