Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement graceful shutdown
Alexis Sellier committed 3 years ago
commit b9484b6832f660b102f80c7a4a2efbaa3487d1ca
parent 7ec903d712e634f37df3d129ee9a940f5b720205
6 files changed +71 -29
modified radicle-node/src/client.rs
@@ -112,7 +112,7 @@ impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static>

        let (worker_send, worker_recv) = crossbeam_channel::unbounded::<WorkerReq<G>>();
        let pool = WorkerPool::with(
-
            10,
+
            8,
            time::Duration::from_secs(9),
            storage,
            worker_recv,
@@ -120,7 +120,7 @@ impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static>
        );
        let wire = Wire::new(service, worker_send, cert, signer, proxy, clock);
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
-
        let handle = Handle::from(reactor.controller());
+
        let handle = Handle::new(home, reactor.controller());
        let control = thread::spawn({
            let handle = handle.clone();
            move || control::listen(node_sock, handle)
@@ -129,7 +129,6 @@ impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static>
        let mut local_addrs = Vec::new();

        for addr in listen {
-
            // TODO: Once the API supports it, we can pass an opaque type here.
            let listener = NetAccept::bind(&addr)?;
            let local_addr = listener.local_addr();

@@ -153,8 +152,10 @@ impl<G: crypto::Signer + EcSign<Pk = NodeId, Sig = Signature> + Clone + 'static>
        log::info!("Running node {}..", self.id);

        self.pool.run().unwrap();
-
        self.control.join().unwrap()?;
        self.reactor.join().unwrap();
+
        self.control.join().unwrap()?;
+

+
        log::debug!("Node shutdown completed for {}", self.id);

        Ok(())
    }
modified radicle-node/src/client/handle.rs
@@ -1,9 +1,12 @@
+
use std::io::Write;
+
use std::os::unix::net::UnixStream;
use std::sync::Arc;

use crossbeam_channel as chan;
use thiserror::Error;

use crate::identity::Id;
+
use crate::profile::Home;
use crate::service;
use crate::service::{CommandError, FetchLookup, QueryState};
use crate::service::{NodeId, Sessions};
@@ -47,20 +50,22 @@ impl<T> From<chan::SendError<T>> for Error {
}

pub struct Handle<T: reactor::Handler> {
+
    pub(crate) home: Home,
    pub(crate) controller: reactor::Controller<T>,
}

impl<T: reactor::Handler> Clone for Handle<T> {
    fn clone(&self) -> Self {
        Self {
+
            home: self.home.clone(),
            controller: self.controller.clone(),
        }
    }
}

-
impl<T: reactor::Handler> From<reactor::Controller<T>> for Handle<T> {
-
    fn from(controller: reactor::Controller<T>) -> Handle<T> {
-
        Handle { controller }
+
impl<T: reactor::Handler> Handle<T> {
+
    pub fn new(home: Home, controller: reactor::Controller<T>) -> Self {
+
        Self { home, controller }
    }
}

@@ -166,6 +171,13 @@ impl<T: reactor::Handler<Command = service::Command>> radicle::node::Handle for
    }

    fn shutdown(self) -> Result<(), Error> {
+
        // Send a shutdown request to our own control socket. This is the only way to kill the
+
        // control thread gracefully. Since the control thread may have called this function,
+
        // the control socket may already be disconnected. Ignore errors.
+
        UnixStream::connect(self.home.socket())
+
            .and_then(|mut sock| sock.write_all(b"shutdown"))
+
            .ok();
+

        self.controller.shutdown().map_err(|_| Error::NotConnected)
    }
}
modified radicle-node/src/control.rs
@@ -47,8 +47,15 @@ pub fn listen<
        match incoming {
            Ok(mut stream) => {
                if let Err(e) = drain(&stream, &mut handle) {
-
                    log::error!("Received {} on control socket", e);
+
                    log::debug!("Received {} on control socket", e);

+
                    if let DrainError::Shutdown = e {
+
                        log::debug!("Shutdown requested..");
+
                        // Channel might already be disconnected if shutdown
+
                        // came from somewhere else. Ignore errors.
+
                        handle.shutdown().ok();
+
                        break;
+
                    }
                    writeln!(stream, "error: {}", e).ok();

                    stream.flush().ok();
@@ -57,9 +64,10 @@ pub fn listen<
                    writeln!(stream, "ok").ok();
                }
            }
-
            Err(e) => log::error!("Failed to open control socket stream: {}", e),
+
            Err(e) => log::error!("Failed to accept incoming connection: {}", e),
        }
    }
+
    log::debug!("Exiting control loop..");

    Ok(())
}
@@ -74,6 +82,8 @@ enum DrainError {
    Client(#[from] client::handle::Error),
    #[error("i/o error: {0}")]
    Io(#[from] io::Error),
+
    #[error("shutdown requested")]
+
    Shutdown,
}

fn drain<H: Handle<Error = client::handle::Error, FetchLookup = FetchLookup>>(
@@ -199,6 +209,9 @@ fn drain<H: Handle<Error = client::handle::Error, FetchLookup = FetchLookup>>(
                    }
                    Err(e) => return Err(DrainError::Client(e)),
                },
+
                "shutdown" => {
+
                    return Err(DrainError::Shutdown);
+
                }
                _ => {
                    return Err(DrainError::UnknownCommand(line));
                }
modified radicle-node/src/test/logger.rs
@@ -22,9 +22,9 @@ impl Log for Logger {
            target => {
                if self.enabled(record.metadata()) {
                    let current = std::thread::current();
-
                    let msg = format!("{:<8} {}", format!("{}:", target), record.args());
+
                    let msg = format!("{:>12} {}", format!("{}:", target), record.args());
                    let s = if let Some(name) = current.name() {
-
                        format!("{:<8} {msg}", name)
+
                        format!("{:<16} {msg}", name)
                    } else {
                        msg
                    };
modified radicle-node/src/tests/e2e.rs
@@ -1,3 +1,4 @@
+
use std::mem::ManuallyDrop;
use std::path::Path;
use std::{
    collections::{BTreeMap, BTreeSet},
@@ -9,7 +10,7 @@ use radicle::crypto::test::signer::MockSigner;
use radicle::crypto::Signer;
use radicle::git::refname;
use radicle::identity::Id;
-
use radicle::node::Handle;
+
use radicle::node::Handle as _;
use radicle::profile::Home;
use radicle::storage::{ReadStorage, WriteStorage};
use radicle::test::fixtures;
@@ -22,7 +23,7 @@ use crate::service::{routing, FetchLookup, FetchResult};
use crate::storage::git::transport;
use crate::test::logger;
use crate::wire::Wire;
-
use crate::{client, client::Runtime, service};
+
use crate::{client, client::handle::Handle, client::Runtime, service};

/// A node that can be run.
struct Node {
@@ -36,9 +37,22 @@ struct NodeHandle {
    id: NodeId,
    storage: Storage,
    addr: net::SocketAddr,
-
    #[allow(dead_code)]
-
    thread: thread::JoinHandle<Result<(), client::Error>>,
-
    handle: client::handle::Handle<Wire<routing::Table, address::Book, Storage, MockSigner>>,
+
    thread: ManuallyDrop<thread::JoinHandle<Result<(), client::Error>>>,
+
    handle: ManuallyDrop<Handle<Wire<routing::Table, address::Book, Storage, MockSigner>>>,
+
}
+

+
impl Drop for NodeHandle {
+
    fn drop(&mut self) {
+
        log::debug!(target: "test", "Node {} shutting down..", self.id);
+

+
        unsafe { ManuallyDrop::take(&mut self.handle) }
+
            .shutdown()
+
            .unwrap();
+
        unsafe { ManuallyDrop::take(&mut self.thread) }
+
            .join()
+
            .unwrap()
+
            .unwrap();
+
    }
}

impl NodeHandle {
@@ -92,13 +106,15 @@ impl Node {
        let listen = vec![([0, 0, 0, 0], 0).into()];
        let proxy = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 9050);
        let rt = Runtime::with(self.home, config, listen, proxy, self.signer.clone()).unwrap();
-
        let handle = rt.handle.clone();
        let addr = *rt.local_addrs.first().unwrap();
        let id = *self.signer.public_key();
-
        let thread = thread::Builder::new()
-
            .name(id.to_string())
-
            .spawn(move || rt.run())
-
            .unwrap();
+
        let handle = ManuallyDrop::new(rt.handle.clone());
+
        let thread = ManuallyDrop::new(
+
            thread::Builder::new()
+
                .name(id.to_string())
+
                .spawn(move || rt.run())
+
                .unwrap(),
+
        );

        NodeHandle {
            id,
@@ -140,7 +156,7 @@ impl Node {

/// Checks whether the nodes have converged in their routing tables.
#[track_caller]
-
fn check<'a>(nodes: impl IntoIterator<Item = &'a NodeHandle>) -> BTreeSet<(Id, NodeId)> {
+
fn converge<'a>(nodes: impl IntoIterator<Item = &'a NodeHandle>) -> BTreeSet<(Id, NodeId)> {
    let nodes = nodes.into_iter().collect::<Vec<_>>();

    let mut all_routes = BTreeSet::<(Id, NodeId)>::new();
@@ -195,7 +211,7 @@ fn test_inventory_sync_basic() {

    alice.connect(&bob);

-
    let routes = check([&alice, &bob]);
+
    let routes = converge([&alice, &bob]);
    assert_eq!(routes.len(), 2);
}

@@ -223,7 +239,7 @@ fn test_inventory_sync_bridge() {
    alice.connect(&bob);
    bob.connect(&eve);

-
    let routes = check([&alice, &bob, &eve]);
+
    let routes = converge([&alice, &bob, &eve]);
    assert_eq!(routes.len(), 3);
}

@@ -258,7 +274,7 @@ fn test_inventory_sync_ring() {
    eve.connect(&carol);
    carol.connect(&alice);

-
    let routes = check([&alice, &bob, &eve, &carol]);
+
    let routes = converge([&alice, &bob, &eve, &carol]);
    assert_eq!(routes.len(), 4);
}

@@ -298,7 +314,7 @@ fn test_inventory_sync_star() {
    carol.connect(&alice);
    dave.connect(&alice);

-
    let routes = check([&alice, &bob, &eve, &carol, &dave]);
+
    let routes = converge([&alice, &bob, &eve, &carol, &dave]);
    assert_eq!(routes.len(), 5);
}

@@ -316,7 +332,7 @@ fn test_replication() {
    let bob = bob.spawn(service::Config::default());

    alice.connect(&bob);
-
    check([&alice, &bob]);
+
    converge([&alice, &bob]);

    let inventory = alice.handle.inventory().unwrap();
    assert!(inventory.try_iter().next().is_none());
modified radicle-node/src/worker.rs
@@ -272,9 +272,9 @@ impl WorkerPool {
    ///
    /// Blocks until all worker threads have exited.
    pub fn run(self) -> thread::Result<()> {
-
        for worker in self.pool {
+
        for (i, worker) in self.pool.into_iter().enumerate() {
            if let Err(err) = worker.join()? {
-
                log::error!(target: "pool", "Worker failed: {err}");
+
                log::debug!(target: "pool", "Worker {i} exited: {err}");
            }
        }
        log::debug!(target: "pool", "Worker pool shutting down..");