Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Rename `client` module to `runtime`
Alexis Sellier committed 3 years ago
commit 23cf44f4c7ee46d68d1312be19f394b87ff4a661
parent 02be3341441eb9dac2df3b6e6945f136ff81423a
11 files changed +511 -514
deleted radicle-node/src/client.rs
@@ -1,251 +0,0 @@
-
use std::io::{BufRead, BufReader};
-
use std::os::unix::net::UnixListener;
-
use std::path::PathBuf;
-
use std::{fs, io, net, thread, time};
-

-
use crossbeam_channel as chan;
-
use cyphernet::{Cert, EcSign};
-
use netservices::resource::NetAccept;
-
use radicle::profile::Home;
-
use radicle::Storage;
-
use reactor::poller::popol;
-
use reactor::Reactor;
-
use thiserror::Error;
-

-
use crate::address;
-
use crate::control;
-
use crate::crypto::{Signature, Signer};
-
use crate::node::NodeId;
-
use crate::service::{routing, tracking};
-
use crate::wire;
-
use crate::wire::Wire;
-
use crate::worker::{WorkerPool, WorkerReq};
-
use crate::{crypto, service, LocalTime};
-

-
pub mod handle;
-
use handle::Handle;
-

-
/// Directory in `$RAD_HOME` under which node-specific files are stored.
-
pub const NODE_DIR: &str = "node";
-
/// Filename of routing table database under [`NODE_DIR`].
-
pub const ROUTING_DB_FILE: &str = "routing.db";
-
/// Filename of address database under [`NODE_DIR`].
-
pub const ADDRESS_DB_FILE: &str = "addresses.db";
-
/// Filename of tracking table database under [`NODE_DIR`].
-
pub const TRACKING_DB_FILE: &str = "tracking.db";
-

-
/// A client error.
-
#[derive(Error, Debug)]
-
pub enum Error {
-
    /// A routing database error.
-
    #[error("routing database error: {0}")]
-
    Routing(#[from] routing::Error),
-
    /// An address database error.
-
    #[error("address database error: {0}")]
-
    Addresses(#[from] address::Error),
-
    /// A tracking database error.
-
    #[error("tracking database error: {0}")]
-
    Tracking(#[from] tracking::Error),
-
    /// An I/O error.
-
    #[error("i/o error: {0}")]
-
    Io(#[from] io::Error),
-
    /// A control socket error.
-
    #[error("control socket error: {0}")]
-
    Control(#[from] control::Error),
-
    /// Another node is already running.
-
    #[error(
-
        "another node appears to be running; \
-
        if this isn't the case, delete the socket file at '{0}' \
-
        and restart the node"
-
    )]
-
    AlreadyRunning(PathBuf),
-
}
-

-
/// Holds join handles to the client threads, as well as a client handle.
-
pub struct Runtime<G: Signer + EcSign> {
-
    pub id: NodeId,
-
    pub home: Home,
-
    pub handle: Handle<G>,
-
    pub control: thread::JoinHandle<Result<(), control::Error>>,
-
    pub storage: Storage,
-
    pub reactor: Reactor<wire::Control<G>>,
-
    pub daemon: net::SocketAddr,
-
    pub pool: WorkerPool,
-
    pub local_addrs: Vec<net::SocketAddr>,
-
}
-

-
impl<G: Signer + EcSign> Runtime<G> {
-
    /// Initialize the runtime.
-
    ///
-
    /// This function spawns threads.
-
    pub fn with(
-
        home: Home,
-
        config: service::Config,
-
        listen: Vec<net::SocketAddr>,
-
        proxy: net::SocketAddr,
-
        daemon: net::SocketAddr,
-
        signer: G,
-
    ) -> Result<Runtime<G>, Error>
-
    where
-
        G: crypto::Signer + EcSign<Sig = Signature, Pk = NodeId> + Clone + 'static,
-
    {
-
        let id = *signer.public_key();
-
        let node_sock = home.socket();
-
        let node_dir = home.node();
-
        let network = config.network;
-
        let rng = fastrand::Rng::new();
-
        let clock = LocalTime::now();
-
        let storage = Storage::open(home.storage())?;
-
        let address_db = node_dir.join(ADDRESS_DB_FILE);
-
        let routing_db = node_dir.join(ROUTING_DB_FILE);
-
        let tracking_db = node_dir.join(TRACKING_DB_FILE);
-

-
        log::info!(target: "node", "Opening address book {}..", address_db.display());
-
        let addresses = address::Book::open(address_db)?;
-

-
        log::info!(target: "node", "Opening routing table {}..", routing_db.display());
-
        let routing = routing::Table::open(routing_db)?;
-

-
        log::info!(target: "node", "Opening tracking policy table {}..", tracking_db.display());
-
        let tracking = tracking::Config::open(tracking_db)?;
-

-
        log::info!(target: "node", "Initializing service ({:?})..", network);
-
        let service = service::Service::new(
-
            config,
-
            clock,
-
            routing,
-
            storage.clone(),
-
            addresses,
-
            tracking,
-
            signer.clone(),
-
            rng,
-
        );
-

-
        let cert = Cert {
-
            pk: id,
-
            sig: EcSign::sign(&signer, id.as_slice()),
-
        };
-

-
        let (worker_send, worker_recv) = chan::unbounded::<WorkerReq<G>>();
-
        let mut wire = Wire::new(service, worker_send, cert, signer, proxy, clock);
-
        let mut local_addrs = Vec::new();
-

-
        for addr in listen {
-
            let listener = NetAccept::bind(&addr)?;
-
            let local_addr = listener.local_addr();
-

-
            local_addrs.push(local_addr);
-
            wire.listen(listener);
-

-
            log::info!(target: "node", "Listening on {local_addr}..");
-
        }
-
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
-
        let handle = Handle::new(home.clone(), reactor.controller());
-

-
        log::info!(target: "node", "Binding control socket {}..", node_sock.display());
-

-
        // TODO: Move this stuff to `run` function.
-

-
        let listener = match UnixListener::bind(&node_sock) {
-
            Ok(sock) => sock,
-
            Err(err) if err.kind() == io::ErrorKind::AddrInUse => {
-
                return Err(Error::AlreadyRunning(node_sock));
-
            }
-
            Err(err) => {
-
                return Err(err.into());
-
            }
-
        };
-
        let control = thread::spawn({
-
            let handle = handle.clone();
-
            move || control::listen(listener, handle)
-
        });
-

-
        let pool = WorkerPool::with(
-
            8,
-
            time::Duration::from_secs(9),
-
            storage.clone(),
-
            daemon,
-
            worker_recv,
-
            handle.clone(),
-
            id.to_human(),
-
        );
-

-
        Ok(Runtime {
-
            id,
-
            home,
-
            control,
-
            storage,
-
            reactor,
-
            daemon,
-
            handle,
-
            pool,
-
            local_addrs,
-
        })
-
    }
-

-
    pub fn run(self) -> Result<(), Error> {
-
        log::info!(target: "node", "Running node {}..", self.id);
-
        log::info!(target: "node", "Spawning git daemon at {}..", self.storage.path().display());
-

-
        let mut daemon = daemon::spawn(self.storage.path(), self.daemon)?;
-
        thread::Builder::new().name(self.id.to_human()).spawn({
-
            let stderr = daemon.stderr.take().unwrap();
-
            || {
-
                for line in BufReader::new(stderr).lines().flatten() {
-
                    log::debug!(target: "daemon", "{line}");
-
                }
-
            }
-
        })?;
-

-
        self.pool.run().unwrap();
-
        self.reactor.join().unwrap();
-
        self.control.join().unwrap()?;
-

-
        daemon.kill().ok(); // Ignore error if daemon has already exited, for whatever reason.
-
        daemon.wait()?;
-

-
        fs::remove_file(self.home.socket()).ok();
-

-
        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
-

-
        Ok(())
-
    }
-
}
-

-
pub mod daemon {
-
    use std::path::Path;
-
    use std::process::{Child, Command, Stdio};
-
    use std::{env, io, net};
-

-
    pub fn spawn(storage: &Path, addr: net::SocketAddr) -> io::Result<Child> {
-
        let storage = storage.canonicalize()?;
-
        let listen = format!("--listen={}", addr.ip());
-
        let port = format!("--port={}", addr.port());
-
        let child = Command::new("git")
-
            .env_clear()
-
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT")))
-
            .env("GIT_PROTOCOL", "version=2")
-
            .current_dir(storage)
-
            .arg("daemon")
-
            // Make all git directories available.
-
            .arg("--export-all")
-
            .arg("--verbose")
-
            // The git "root". Should be our storage path.
-
            .arg("--base-path=.")
-
            // Timeout (in seconds) between the moment the connection is established
-
            // and the client request is received (typically a rather low value,
-
            // since that should be basically immediate).
-
            .arg("--init-timeout=3")
-
            // Timeout (in seconds) for specific client sub-requests.
-
            // This includes the time it takes for the server to process the sub-request
-
            // and the time spent waiting for the next client’s request.
-
            .arg("--timeout=9")
-
            .arg("--log-destination=stderr")
-
            .arg(listen)
-
            .arg(port)
-
            .stderr(Stdio::piped())
-
            .spawn()?;
-

-
        Ok(child)
-
    }
-
}
deleted radicle-node/src/client/handle.rs
@@ -1,222 +0,0 @@
-
use std::fmt;
-
use std::io::{self, Write};
-
use std::os::unix::net::UnixStream;
-
use std::sync::atomic::{AtomicBool, Ordering};
-
use std::sync::Arc;
-

-
use crossbeam_channel as chan;
-
use cyphernet::EcSign;
-
use thiserror::Error;
-

-
use crate::crypto::Signer;
-
use crate::identity::Id;
-
use crate::node::FetchLookup;
-
use crate::profile::Home;
-
use crate::service;
-
use crate::service::{CommandError, QueryState};
-
use crate::service::{NodeId, Sessions};
-
use crate::wire;
-
use crate::worker::WorkerResp;
-

-
/// An error resulting from a handle method.
-
#[derive(Error, Debug)]
-
pub enum Error {
-
    /// The command channel is no longer connected.
-
    #[error("command channel is not connected")]
-
    NotConnected,
-
    /// The command returned an error.
-
    #[error("command failed: {0}")]
-
    Command(#[from] CommandError),
-
    /// The operation timed out.
-
    #[error("the operation timed out")]
-
    Timeout,
-
    /// An I/O error occured.
-
    #[error(transparent)]
-
    Io(#[from] std::io::Error),
-
}
-

-
impl From<chan::RecvError> for Error {
-
    fn from(_: chan::RecvError) -> Self {
-
        Self::NotConnected
-
    }
-
}
-

-
impl From<chan::RecvTimeoutError> for Error {
-
    fn from(err: chan::RecvTimeoutError) -> Self {
-
        match err {
-
            chan::RecvTimeoutError::Timeout => Self::Timeout,
-
            chan::RecvTimeoutError::Disconnected => Self::NotConnected,
-
        }
-
    }
-
}
-

-
impl<T> From<chan::SendError<T>> for Error {
-
    fn from(_: chan::SendError<T>) -> Self {
-
        Self::NotConnected
-
    }
-
}
-

-
pub struct Handle<G: Signer + EcSign> {
-
    pub(crate) home: Home,
-
    pub(crate) controller: reactor::Controller<wire::Control<G>>,
-

-
    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
-
    shutdown: Arc<AtomicBool>,
-
}
-

-
impl<G: Signer + EcSign> fmt::Debug for Handle<G> {
-
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
-
        f.debug_struct("Handle").field("home", &self.home).finish()
-
    }
-
}
-

-
impl<G: Signer + EcSign> Clone for Handle<G> {
-
    fn clone(&self) -> Self {
-
        Self {
-
            home: self.home.clone(),
-
            controller: self.controller.clone(),
-
            shutdown: self.shutdown.clone(),
-
        }
-
    }
-
}
-

-
impl<G: Signer + EcSign + 'static> Handle<G> {
-
    pub fn new(home: Home, controller: reactor::Controller<wire::Control<G>>) -> Self {
-
        Self {
-
            home,
-
            controller,
-
            shutdown: Arc::default(),
-
        }
-
    }
-

-
    pub fn worker_result(&mut self, resp: WorkerResp<G>) -> Result<(), Error> {
-
        match self.controller.cmd(wire::Control::Worker(resp)) {
-
            Ok(()) => {}
-
            Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Err(Error::NotConnected),
-
            Err(err) => return Err(err.into()),
-
        }
-
        Ok(())
-
    }
-

-
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
-
        self.controller.cmd(wire::Control::User(cmd))?;
-
        Ok(())
-
    }
-
}
-

-
impl<G: Signer + EcSign + 'static> radicle::node::Handle for Handle<G> {
-
    type Sessions = Sessions;
-
    type Error = Error;
-

-
    fn is_running(&self) -> bool {
-
        true
-
    }
-

-
    fn connect(&mut self, node: NodeId, addr: radicle::node::Address) -> Result<(), Error> {
-
        self.command(service::Command::Connect(node, addr))?;
-

-
        Ok(())
-
    }
-

-
    fn fetch(&mut self, id: Id) -> Result<FetchLookup, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::Fetch(id, sender))?;
-
        receiver.recv().map_err(Error::from)
-
    }
-

-
    fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::TrackNode(id, alias, sender))?;
-
        receiver.recv().map_err(Error::from)
-
    }
-

-
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::UntrackNode(id, sender))?;
-
        receiver.recv().map_err(Error::from)
-
    }
-

-
    fn track_repo(&mut self, id: Id) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::TrackRepo(id, sender))?;
-
        receiver.recv().map_err(Error::from)
-
    }
-

-
    fn untrack_repo(&mut self, id: Id) -> Result<bool, Error> {
-
        let (sender, receiver) = chan::bounded(1);
-
        self.command(service::Command::UntrackRepo(id, sender))?;
-
        receiver.recv().map_err(Error::from)
-
    }
-

-
    fn announce_refs(&mut self, id: Id) -> Result<(), Error> {
-
        self.command(service::Command::AnnounceRefs(id))
-
    }
-

-
    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error> {
-
        let (sender, receiver) = chan::unbounded();
-
        let query: Arc<QueryState> = Arc::new(move |state| {
-
            for (id, node) in state.routing().entries()? {
-
                if sender.send((id, node)).is_err() {
-
                    break;
-
                }
-
            }
-
            Ok(())
-
        });
-
        let (err_sender, err_receiver) = chan::bounded(1);
-
        self.command(service::Command::QueryState(query, err_sender))?;
-
        err_receiver.recv()??;
-

-
        Ok(receiver)
-
    }
-

-
    fn sessions(&self) -> Result<Self::Sessions, Error> {
-
        let (sender, receiver) = chan::unbounded();
-
        let query: Arc<QueryState> = Arc::new(move |state| {
-
            sender.send(state.sessions().clone()).ok();
-
            Ok(())
-
        });
-
        let (err_sender, err_receiver) = chan::bounded(1);
-
        self.command(service::Command::QueryState(query, err_sender))?;
-
        err_receiver.recv()??;
-

-
        let sessions = receiver.recv()?;
-

-
        Ok(sessions)
-
    }
-

-
    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
-
        let (sender, receiver) = chan::unbounded();
-
        let query: Arc<QueryState> = Arc::new(move |state| {
-
            for id in state.inventory()?.iter() {
-
                if sender.send(*id).is_err() {
-
                    break;
-
                }
-
            }
-
            Ok(())
-
        });
-
        let (err_sender, err_receiver) = chan::bounded(1);
-
        self.command(service::Command::QueryState(query, err_sender))?;
-
        err_receiver.recv()??;
-

-
        Ok(receiver)
-
    }
-

-
    fn shutdown(self) -> Result<(), Error> {
-
        // If the current value is `false`, set it to `true`, otherwise error.
-
        if self
-
            .shutdown
-
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
-
            .is_err()
-
        {
-
            return Ok(());
-
        }
-
        // Send a shutdown request to our own control socket. This is the only way to kill the
-
        // control thread gracefully. Since the control thread may have called this function,
-
        // the control socket may already be disconnected. Ignore errors.
-
        UnixStream::connect(self.home.socket())
-
            .and_then(|mut sock| sock.write_all(b"shutdown"))
-
            .ok();
-

-
        self.controller.shutdown().map_err(|_| Error::NotConnected)
-
    }
-
}
modified radicle-node/src/control.rs
@@ -9,10 +9,10 @@ use std::{io, net};

use radicle::node::Handle;

-
use crate::client;
use crate::identity::Id;
use crate::node;
use crate::node::FetchLookup;
+
use crate::runtime;

#[derive(thiserror::Error, Debug)]
pub enum Error {
@@ -23,7 +23,7 @@ pub enum Error {
}

/// Listen for commands on the control socket, and process them.
-
pub fn listen<H: Handle<Error = client::handle::Error>>(
+
pub fn listen<H: Handle<Error = runtime::HandleError>>(
    listener: UnixListener,
    mut handle: H,
) -> Result<(), Error> {
@@ -62,15 +62,15 @@ enum DrainError {
    InvalidCommandArg(String, Box<dyn std::error::Error>),
    #[error("unknown command `{0}`")]
    UnknownCommand(String),
-
    #[error("client error: {0}")]
-
    Client(#[from] client::handle::Error),
+
    #[error("runtime error: {0}")]
+
    Runtime(#[from] runtime::HandleError),
    #[error("i/o error: {0}")]
    Io(#[from] io::Error),
    #[error("shutdown requested")]
    Shutdown,
}

-
fn drain<H: Handle<Error = client::handle::Error>>(
+
fn drain<H: Handle<Error = runtime::HandleError>>(
    stream: &UnixStream,
    handle: &mut H,
) -> Result<(), DrainError> {
@@ -104,7 +104,7 @@ fn drain<H: Handle<Error = client::handle::Error>>(
                    }
                }
                Err(e) => {
-
                    return Err(DrainError::Client(e));
+
                    return Err(DrainError::Runtime(e));
                }
            },
            Err(err) => {
@@ -121,7 +121,7 @@ fn drain<H: Handle<Error = client::handle::Error>>(
                    }
                }
                Err(e) => {
-
                    return Err(DrainError::Client(e));
+
                    return Err(DrainError::Runtime(e));
                }
            },
            Err(err) => {
@@ -144,7 +144,7 @@ fn drain<H: Handle<Error = client::handle::Error>>(
                        }
                    }
                    Err(e) => {
-
                        return Err(DrainError::Client(e));
+
                        return Err(DrainError::Runtime(e));
                    }
                },
                Err(err) => {
@@ -165,7 +165,7 @@ fn drain<H: Handle<Error = client::handle::Error>>(
                    }
                }
                Err(e) => {
-
                    return Err(DrainError::Client(e));
+
                    return Err(DrainError::Runtime(e));
                }
            },
            Err(err) => {
@@ -175,7 +175,7 @@ fn drain<H: Handle<Error = client::handle::Error>>(
        Some(("announce-refs", arg)) => match arg.parse() {
            Ok(id) => {
                if let Err(e) = handle.announce_refs(id) {
-
                    return Err(DrainError::Client(e));
+
                    return Err(DrainError::Runtime(e));
                }
                writeln!(writer, "{}", node::RESPONSE_OK)?;
            }
@@ -197,7 +197,7 @@ fn drain<H: Handle<Error = client::handle::Error>>(
                        writeln!(writer, "{id} {seed}",)?;
                    }
                }
-
                Err(e) => return Err(DrainError::Client(e)),
+
                Err(e) => return Err(DrainError::Runtime(e)),
            },
            "inventory" => match handle.inventory() {
                Ok(c) => {
@@ -205,7 +205,7 @@ fn drain<H: Handle<Error = client::handle::Error>>(
                        writeln!(writer, "{id}")?;
                    }
                }
-
                Err(e) => return Err(DrainError::Client(e)),
+
                Err(e) => return Err(DrainError::Runtime(e)),
            },
            "shutdown" => {
                return Err(DrainError::Shutdown);
@@ -218,14 +218,14 @@ fn drain<H: Handle<Error = client::handle::Error>>(
    Ok(())
}

-
fn fetch<W: Write, H: Handle<Error = client::handle::Error>>(
+
fn fetch<W: Write, H: Handle<Error = runtime::HandleError>>(
    id: Id,
    mut writer: W,
    handle: &mut H,
) -> Result<(), DrainError> {
    match handle.fetch(id) {
        Err(e) => {
-
            return Err(DrainError::Client(e));
+
            return Err(DrainError::Runtime(e));
        }
        Ok(FetchLookup::Found { seeds, results }) => {
            let seeds = Vec::from(seeds);
modified radicle-node/src/lib.rs
@@ -1,10 +1,10 @@
pub mod address;
pub mod bounded;
-
pub mod client;
pub mod clock;
pub mod control;
pub mod deserializer;
pub mod logger;
+
pub mod runtime;
pub mod service;
pub mod sql;
#[cfg(any(test, feature = "test"))]
@@ -17,6 +17,7 @@ pub mod worker;
pub use localtime::{LocalDuration, LocalTime};
pub use netservices::LinkDirection as Link;
pub use radicle::{collections, crypto, git, identity, node, profile, rad, storage};
+
pub use runtime::Runtime;

pub mod prelude {
    pub use crate::bounded::BoundedVec;
modified radicle-node/src/main.rs
@@ -5,9 +5,9 @@ use cyphernet::addr::PeerAddr;
use localtime::LocalDuration;

use radicle::profile;
-
use radicle_node::client::Runtime;
use radicle_node::crypto::ssh::keystore::{Keystore, MemorySigner};
use radicle_node::prelude::{Address, NodeId};
+
use radicle_node::Runtime;
use radicle_node::{logger, service};

#[derive(Debug)]
added radicle-node/src/runtime.rs
@@ -0,0 +1,253 @@
+
mod handle;
+

+
use std::io::{BufRead, BufReader};
+
use std::os::unix::net::UnixListener;
+
use std::path::PathBuf;
+
use std::{fs, io, net, thread, time};
+

+
use crossbeam_channel as chan;
+
use cyphernet::{Cert, EcSign};
+
use netservices::resource::NetAccept;
+
use radicle::profile::Home;
+
use radicle::Storage;
+
use reactor::poller::popol;
+
use reactor::Reactor;
+
use thiserror::Error;
+

+
use crate::address;
+
use crate::control;
+
use crate::crypto::{Signature, Signer};
+
use crate::node::NodeId;
+
use crate::service::{routing, tracking};
+
use crate::wire;
+
use crate::wire::Wire;
+
use crate::worker::{WorkerPool, WorkerReq};
+
use crate::{crypto, service, LocalTime};
+

+
pub use handle::Error as HandleError;
+
pub use handle::Handle;
+

+
/// Directory in `$RAD_HOME` under which node-specific files are stored.
+
pub const NODE_DIR: &str = "node";
+
/// Filename of routing table database under [`NODE_DIR`].
+
pub const ROUTING_DB_FILE: &str = "routing.db";
+
/// Filename of address database under [`NODE_DIR`].
+
pub const ADDRESS_DB_FILE: &str = "addresses.db";
+
/// Filename of tracking table database under [`NODE_DIR`].
+
pub const TRACKING_DB_FILE: &str = "tracking.db";
+

+
/// A client error.
+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// A routing database error.
+
    #[error("routing database error: {0}")]
+
    Routing(#[from] routing::Error),
+
    /// An address database error.
+
    #[error("address database error: {0}")]
+
    Addresses(#[from] address::Error),
+
    /// A tracking database error.
+
    #[error("tracking database error: {0}")]
+
    Tracking(#[from] tracking::Error),
+
    /// An I/O error.
+
    #[error("i/o error: {0}")]
+
    Io(#[from] io::Error),
+
    /// A control socket error.
+
    #[error("control socket error: {0}")]
+
    Control(#[from] control::Error),
+
    /// Another node is already running.
+
    #[error(
+
        "another node appears to be running; \
+
        if this isn't the case, delete the socket file at '{0}' \
+
        and restart the node"
+
    )]
+
    AlreadyRunning(PathBuf),
+
}
+

+
/// Holds join handles to the client threads, as well as a client handle.
+
pub struct Runtime<G: Signer + EcSign> {
+
    pub id: NodeId,
+
    pub home: Home,
+
    pub handle: Handle<G>,
+
    pub control: thread::JoinHandle<Result<(), control::Error>>,
+
    pub storage: Storage,
+
    pub reactor: Reactor<wire::Control<G>>,
+
    pub daemon: net::SocketAddr,
+
    pub pool: WorkerPool,
+
    pub local_addrs: Vec<net::SocketAddr>,
+
}
+

+
impl<G: Signer + EcSign> Runtime<G> {
+
    /// Initialize the runtime.
+
    ///
+
    /// This function spawns threads.
+
    pub fn with(
+
        home: Home,
+
        config: service::Config,
+
        listen: Vec<net::SocketAddr>,
+
        proxy: net::SocketAddr,
+
        daemon: net::SocketAddr,
+
        signer: G,
+
    ) -> Result<Runtime<G>, Error>
+
    where
+
        G: crypto::Signer + EcSign<Sig = Signature, Pk = NodeId> + Clone + 'static,
+
    {
+
        let id = *signer.public_key();
+
        let node_sock = home.socket();
+
        let node_dir = home.node();
+
        let network = config.network;
+
        let rng = fastrand::Rng::new();
+
        let clock = LocalTime::now();
+
        let storage = Storage::open(home.storage())?;
+
        let address_db = node_dir.join(ADDRESS_DB_FILE);
+
        let routing_db = node_dir.join(ROUTING_DB_FILE);
+
        let tracking_db = node_dir.join(TRACKING_DB_FILE);
+

+
        log::info!(target: "node", "Opening address book {}..", address_db.display());
+
        let addresses = address::Book::open(address_db)?;
+

+
        log::info!(target: "node", "Opening routing table {}..", routing_db.display());
+
        let routing = routing::Table::open(routing_db)?;
+

+
        log::info!(target: "node", "Opening tracking policy table {}..", tracking_db.display());
+
        let tracking = tracking::Config::open(tracking_db)?;
+

+
        log::info!(target: "node", "Initializing service ({:?})..", network);
+
        let service = service::Service::new(
+
            config,
+
            clock,
+
            routing,
+
            storage.clone(),
+
            addresses,
+
            tracking,
+
            signer.clone(),
+
            rng,
+
        );
+

+
        let cert = Cert {
+
            pk: id,
+
            sig: EcSign::sign(&signer, id.as_slice()),
+
        };
+

+
        let (worker_send, worker_recv) = chan::unbounded::<WorkerReq<G>>();
+
        let mut wire = Wire::new(service, worker_send, cert, signer, proxy, clock);
+
        let mut local_addrs = Vec::new();
+

+
        for addr in listen {
+
            let listener = NetAccept::bind(&addr)?;
+
            let local_addr = listener.local_addr();
+

+
            local_addrs.push(local_addr);
+
            wire.listen(listener);
+

+
            log::info!(target: "node", "Listening on {local_addr}..");
+
        }
+
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
+
        let handle = Handle::new(home.clone(), reactor.controller());
+

+
        log::info!(target: "node", "Binding control socket {}..", node_sock.display());
+

+
        // TODO: Move this stuff to `run` function.
+

+
        let listener = match UnixListener::bind(&node_sock) {
+
            Ok(sock) => sock,
+
            Err(err) if err.kind() == io::ErrorKind::AddrInUse => {
+
                return Err(Error::AlreadyRunning(node_sock));
+
            }
+
            Err(err) => {
+
                return Err(err.into());
+
            }
+
        };
+
        let control = thread::spawn({
+
            let handle = handle.clone();
+
            move || control::listen(listener, handle)
+
        });
+

+
        let pool = WorkerPool::with(
+
            8,
+
            time::Duration::from_secs(9),
+
            storage.clone(),
+
            daemon,
+
            worker_recv,
+
            handle.clone(),
+
            id.to_human(),
+
        );
+

+
        Ok(Runtime {
+
            id,
+
            home,
+
            control,
+
            storage,
+
            reactor,
+
            daemon,
+
            handle,
+
            pool,
+
            local_addrs,
+
        })
+
    }
+

+
    pub fn run(self) -> Result<(), Error> {
+
        log::info!(target: "node", "Running node {}..", self.id);
+
        log::info!(target: "node", "Spawning git daemon at {}..", self.storage.path().display());
+

+
        let mut daemon = daemon::spawn(self.storage.path(), self.daemon)?;
+
        thread::Builder::new().name(self.id.to_human()).spawn({
+
            let stderr = daemon.stderr.take().unwrap();
+
            || {
+
                for line in BufReader::new(stderr).lines().flatten() {
+
                    log::debug!(target: "daemon", "{line}");
+
                }
+
            }
+
        })?;
+

+
        self.pool.run().unwrap();
+
        self.reactor.join().unwrap();
+
        self.control.join().unwrap()?;
+

+
        daemon.kill().ok(); // Ignore error if daemon has already exited, for whatever reason.
+
        daemon.wait()?;
+

+
        fs::remove_file(self.home.socket()).ok();
+

+
        log::debug!(target: "node", "Node shutdown completed for {}", self.id);
+

+
        Ok(())
+
    }
+
}
+

+
pub mod daemon {
+
    use std::path::Path;
+
    use std::process::{Child, Command, Stdio};
+
    use std::{env, io, net};
+

+
    pub fn spawn(storage: &Path, addr: net::SocketAddr) -> io::Result<Child> {
+
        let storage = storage.canonicalize()?;
+
        let listen = format!("--listen={}", addr.ip());
+
        let port = format!("--port={}", addr.port());
+
        let child = Command::new("git")
+
            .env_clear()
+
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT")))
+
            .env("GIT_PROTOCOL", "version=2")
+
            .current_dir(storage)
+
            .arg("daemon")
+
            // Make all git directories available.
+
            .arg("--export-all")
+
            .arg("--verbose")
+
            // The git "root". Should be our storage path.
+
            .arg("--base-path=.")
+
            // Timeout (in seconds) between the moment the connection is established
+
            // and the client request is received (typically a rather low value,
+
            // since that should be basically immediate).
+
            .arg("--init-timeout=3")
+
            // Timeout (in seconds) for specific client sub-requests.
+
            // This includes the time it takes for the server to process the sub-request
+
            // and the time spent waiting for the next client’s request.
+
            .arg("--timeout=9")
+
            .arg("--log-destination=stderr")
+
            .arg(listen)
+
            .arg(port)
+
            .stderr(Stdio::piped())
+
            .spawn()?;
+

+
        Ok(child)
+
    }
+
}
added radicle-node/src/runtime/handle.rs
@@ -0,0 +1,222 @@
+
use std::fmt;
+
use std::io::{self, Write};
+
use std::os::unix::net::UnixStream;
+
use std::sync::atomic::{AtomicBool, Ordering};
+
use std::sync::Arc;
+

+
use crossbeam_channel as chan;
+
use cyphernet::EcSign;
+
use thiserror::Error;
+

+
use crate::crypto::Signer;
+
use crate::identity::Id;
+
use crate::node::FetchLookup;
+
use crate::profile::Home;
+
use crate::service;
+
use crate::service::{CommandError, QueryState};
+
use crate::service::{NodeId, Sessions};
+
use crate::wire;
+
use crate::worker::WorkerResp;
+

+
/// An error resulting from a handle method.
+
#[derive(Error, Debug)]
+
pub enum Error {
+
    /// The command channel is no longer connected.
+
    #[error("command channel is not connected")]
+
    NotConnected,
+
    /// The command returned an error.
+
    #[error("command failed: {0}")]
+
    Command(#[from] CommandError),
+
    /// The operation timed out.
+
    #[error("the operation timed out")]
+
    Timeout,
+
    /// An I/O error occured.
+
    #[error(transparent)]
+
    Io(#[from] std::io::Error),
+
}
+

+
impl From<chan::RecvError> for Error {
+
    fn from(_: chan::RecvError) -> Self {
+
        Self::NotConnected
+
    }
+
}
+

+
impl From<chan::RecvTimeoutError> for Error {
+
    fn from(err: chan::RecvTimeoutError) -> Self {
+
        match err {
+
            chan::RecvTimeoutError::Timeout => Self::Timeout,
+
            chan::RecvTimeoutError::Disconnected => Self::NotConnected,
+
        }
+
    }
+
}
+

+
impl<T> From<chan::SendError<T>> for Error {
+
    fn from(_: chan::SendError<T>) -> Self {
+
        Self::NotConnected
+
    }
+
}
+

+
pub struct Handle<G: Signer + EcSign> {
+
    pub(crate) home: Home,
+
    pub(crate) controller: reactor::Controller<wire::Control<G>>,
+

+
    /// Whether a shutdown was initiated or not. Prevents attempting to shutdown twice.
+
    shutdown: Arc<AtomicBool>,
+
}
+

+
impl<G: Signer + EcSign> fmt::Debug for Handle<G> {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        f.debug_struct("Handle").field("home", &self.home).finish()
+
    }
+
}
+

+
impl<G: Signer + EcSign> Clone for Handle<G> {
+
    fn clone(&self) -> Self {
+
        Self {
+
            home: self.home.clone(),
+
            controller: self.controller.clone(),
+
            shutdown: self.shutdown.clone(),
+
        }
+
    }
+
}
+

+
impl<G: Signer + EcSign + 'static> Handle<G> {
+
    pub fn new(home: Home, controller: reactor::Controller<wire::Control<G>>) -> Self {
+
        Self {
+
            home,
+
            controller,
+
            shutdown: Arc::default(),
+
        }
+
    }
+

+
    pub fn worker_result(&mut self, resp: WorkerResp<G>) -> Result<(), Error> {
+
        match self.controller.cmd(wire::Control::Worker(resp)) {
+
            Ok(()) => {}
+
            Err(err) if err.kind() == io::ErrorKind::BrokenPipe => return Err(Error::NotConnected),
+
            Err(err) => return Err(err.into()),
+
        }
+
        Ok(())
+
    }
+

+
    fn command(&self, cmd: service::Command) -> Result<(), Error> {
+
        self.controller.cmd(wire::Control::User(cmd))?;
+
        Ok(())
+
    }
+
}
+

+
impl<G: Signer + EcSign + 'static> radicle::node::Handle for Handle<G> {
+
    type Sessions = Sessions;
+
    type Error = Error;
+

+
    fn is_running(&self) -> bool {
+
        true
+
    }
+

+
    fn connect(&mut self, node: NodeId, addr: radicle::node::Address) -> Result<(), Error> {
+
        self.command(service::Command::Connect(node, addr))?;
+

+
        Ok(())
+
    }
+

+
    fn fetch(&mut self, id: Id) -> Result<FetchLookup, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::Fetch(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

+
    fn track_node(&mut self, id: NodeId, alias: Option<String>) -> Result<bool, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::TrackNode(id, alias, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

+
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::UntrackNode(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

+
    fn track_repo(&mut self, id: Id) -> Result<bool, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::TrackRepo(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

+
    fn untrack_repo(&mut self, id: Id) -> Result<bool, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::UntrackRepo(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

+
    fn announce_refs(&mut self, id: Id) -> Result<(), Error> {
+
        self.command(service::Command::AnnounceRefs(id))
+
    }
+

+
    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error> {
+
        let (sender, receiver) = chan::unbounded();
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            for (id, node) in state.routing().entries()? {
+
                if sender.send((id, node)).is_err() {
+
                    break;
+
                }
+
            }
+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        Ok(receiver)
+
    }
+

+
    fn sessions(&self) -> Result<Self::Sessions, Error> {
+
        let (sender, receiver) = chan::unbounded();
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            sender.send(state.sessions().clone()).ok();
+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        let sessions = receiver.recv()?;
+

+
        Ok(sessions)
+
    }
+

+
    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
+
        let (sender, receiver) = chan::unbounded();
+
        let query: Arc<QueryState> = Arc::new(move |state| {
+
            for id in state.inventory()?.iter() {
+
                if sender.send(*id).is_err() {
+
                    break;
+
                }
+
            }
+
            Ok(())
+
        });
+
        let (err_sender, err_receiver) = chan::bounded(1);
+
        self.command(service::Command::QueryState(query, err_sender))?;
+
        err_receiver.recv()??;
+

+
        Ok(receiver)
+
    }
+

+
    fn shutdown(self) -> Result<(), Error> {
+
        // If the current value is `false`, set it to `true`, otherwise error.
+
        if self
+
            .shutdown
+
            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
+
            .is_err()
+
        {
+
            return Ok(());
+
        }
+
        // Send a shutdown request to our own control socket. This is the only way to kill the
+
        // control thread gracefully. Since the control thread may have called this function,
+
        // the control socket may already be disconnected. Ignore errors.
+
        UnixStream::connect(self.home.socket())
+
            .and_then(|mut sock| sock.write_all(b"shutdown"))
+
            .ok();
+

+
        self.controller.shutdown().map_err(|_| Error::NotConnected)
+
    }
+
}
modified radicle-node/src/test/handle.rs
@@ -3,9 +3,9 @@ use std::sync::{Arc, Mutex};

use crossbeam_channel as chan;

-
use crate::client::handle::Error;
use crate::identity::Id;
use crate::node::FetchLookup;
+
use crate::runtime::HandleError;
use crate::service;
use crate::service::NodeId;

@@ -17,56 +17,56 @@ pub struct Handle {
}

impl radicle::node::Handle for Handle {
-
    type Error = Error;
+
    type Error = HandleError;
    type Sessions = service::Sessions;

    fn is_running(&self) -> bool {
        true
    }

-
    fn connect(&mut self, _node: NodeId, _addr: radicle::node::Address) -> Result<(), Error> {
+
    fn connect(&mut self, _node: NodeId, _addr: radicle::node::Address) -> Result<(), Self::Error> {
        unimplemented!();
    }

-
    fn fetch(&mut self, _id: Id) -> Result<FetchLookup, Error> {
+
    fn fetch(&mut self, _id: Id) -> Result<FetchLookup, Self::Error> {
        Ok(FetchLookup::NotFound)
    }

-
    fn track_repo(&mut self, id: Id) -> Result<bool, Error> {
+
    fn track_repo(&mut self, id: Id) -> Result<bool, Self::Error> {
        Ok(self.tracking_repos.insert(id))
    }

-
    fn untrack_repo(&mut self, id: Id) -> Result<bool, Error> {
+
    fn untrack_repo(&mut self, id: Id) -> Result<bool, Self::Error> {
        Ok(self.tracking_repos.remove(&id))
    }

-
    fn track_node(&mut self, id: NodeId, _alias: Option<String>) -> Result<bool, Error> {
+
    fn track_node(&mut self, id: NodeId, _alias: Option<String>) -> Result<bool, Self::Error> {
        Ok(self.tracking_nodes.insert(id))
    }

-
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Error> {
+
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Self::Error> {
        Ok(self.tracking_nodes.remove(&id))
    }

-
    fn announce_refs(&mut self, id: Id) -> Result<(), Error> {
+
    fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error> {
        self.updates.lock().unwrap().push(id);

        Ok(())
    }

-
    fn routing(&self) -> Result<chan::Receiver<(Id, service::NodeId)>, Error> {
+
    fn routing(&self) -> Result<chan::Receiver<(Id, service::NodeId)>, Self::Error> {
        unimplemented!();
    }

-
    fn sessions(&self) -> Result<Self::Sessions, Error> {
+
    fn sessions(&self) -> Result<Self::Sessions, Self::Error> {
        unimplemented!();
    }

-
    fn inventory(&self) -> Result<chan::Receiver<Id>, Error> {
+
    fn inventory(&self) -> Result<chan::Receiver<Id>, Self::Error> {
        unimplemented!();
    }

-
    fn shutdown(self) -> Result<(), Error> {
+
    fn shutdown(self) -> Result<(), Self::Error> {
        Ok(())
    }
}
modified radicle-node/src/tests.rs
@@ -33,7 +33,7 @@ use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
use crate::LocalTime;
-
use crate::{client, git, identity, rad, service, test};
+
use crate::{git, identity, rad, runtime, service, test};

// NOTE
//
@@ -343,19 +343,13 @@ fn test_tracking() {

    let (sender, receiver) = chan::bounded(1);
    alice.command(Command::TrackRepo(proj_id, sender));
-
    let policy_change = receiver
-
        .recv()
-
        .map_err(client::handle::Error::from)
-
        .unwrap();
+
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
    assert!(policy_change);
    assert!(alice.tracking().is_repo_tracked(&proj_id).unwrap());

    let (sender, receiver) = chan::bounded(1);
    alice.command(Command::UntrackRepo(proj_id, sender));
-
    let policy_change = receiver
-
        .recv()
-
        .map_err(client::handle::Error::from)
-
        .unwrap();
+
    let policy_change = receiver.recv().map_err(runtime::HandleError::from).unwrap();
    assert!(policy_change);
    assert!(!alice.tracking().is_repo_tracked(&proj_id).unwrap());
}
modified radicle-node/src/tests/e2e.rs
@@ -21,7 +21,7 @@ use radicle::{assert_matches, rad};
use crate::node::NodeId;
use crate::storage::git::transport;
use crate::test::logger;
-
use crate::{client, client::handle::Handle, client::Runtime, service};
+
use crate::{runtime, runtime::Handle, service, Runtime};

/// A node that can be run.
struct Node {
@@ -36,7 +36,7 @@ struct NodeHandle {
    storage: Storage,
    signer: MockSigner,
    addr: net::SocketAddr,
-
    thread: ManuallyDrop<thread::JoinHandle<Result<(), client::Error>>>,
+
    thread: ManuallyDrop<thread::JoinHandle<Result<(), runtime::Error>>>,
    handle: ManuallyDrop<Handle<MockSigner>>,
}

modified radicle-node/src/worker.rs
@@ -13,8 +13,8 @@ use radicle::storage::{ReadRepository, RefUpdate, WriteRepository, WriteStorage}
use radicle::{git, Storage};
use reactor::poller::popol;

-
use crate::client::handle::Handle;
use crate::node::{FetchError, FetchResult};
+
use crate::runtime::Handle;
use crate::service::reactor::Fetch;
use crate::wire::{WireReader, WireSession, WireWriter};