Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Implement node handle to control node
Alexis Sellier committed 3 years ago
commit c2b599f0683bca4ea16544502bc0999ab89225f3
parent f5bfb8d90a1a93ea2c888503e9948a91d9e9a8d2
6 files changed +151 -9
modified radicle-node/src/client/handle.rs
@@ -76,7 +76,7 @@ impl<W: Waker> traits::Handle for Handle<W> {
    }

    /// Notify the client that a project has been updated.
-
    fn updated(&self, id: Id) -> Result<(), Error> {
+
    fn announce_refs(&self, id: Id) -> Result<(), Error> {
        self.command(service::Command::AnnounceRefs(id))
    }

@@ -109,7 +109,7 @@ pub mod traits {
        /// Untrack the given project and delete it from storage.
        fn untrack(&self, id: Id) -> Result<bool, Error>;
        /// Notify the client that a project has been updated.
-
        fn updated(&self, id: Id) -> Result<(), Error>;
+
        fn announce_refs(&self, id: Id) -> Result<(), Error>;
        /// Send a command to the command channel, and wake up the event loop.
        fn command(&self, cmd: service::Command) -> Result<(), Error>;
        /// Ask the client to shutdown.
modified radicle-node/src/control.rs
@@ -94,9 +94,9 @@ fn drain<H: Handle>(stream: &UnixStream, handle: &H) -> Result<(), DrainError> {
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
                }
            }
-
            Some(("update", arg)) => {
+
            Some(("announce-refs", arg)) => {
                if let Ok(id) = arg.parse() {
-
                    if let Err(e) = handle.updated(id) {
+
                    if let Err(e) = handle.announce_refs(id) {
                        return Err(DrainError::Client(e));
                    }
                } else {
@@ -188,7 +188,7 @@ mod tests {
            }
        };
        for proj in &projs {
-
            writeln!(&stream, "update {}", proj).unwrap();
+
            writeln!(&stream, "announce-refs {}", proj).unwrap();
        }

        let mut buf = [0; 2];
modified radicle-node/src/test/handle.rs
@@ -24,7 +24,7 @@ impl traits::Handle for Handle {
        Ok(true)
    }

-
    fn updated(&self, id: Id) -> Result<(), Error> {
+
    fn announce_refs(&self, id: Id) -> Result<(), Error> {
        self.updates.lock().unwrap().push(id);

        Ok(())
modified radicle/src/lib.rs
@@ -3,6 +3,7 @@ pub mod crypto;
pub mod git;
pub mod hash;
pub mod identity;
+
pub mod node;
pub mod rad;
pub mod serde_ext;
pub mod storage;
added radicle/src/node.rs
@@ -0,0 +1,88 @@
+
use std::fmt;
+
use std::io;
+
use std::io::{BufRead, BufReader, Write};
+
use std::os::unix::net::UnixStream;
+
use std::path::Path;
+

+
use crate::identity::Id;
+

+
#[derive(thiserror::Error, Debug)]
+
pub enum Error {
+
    #[error("i/o error: {0}")]
+
    Io(#[from] io::Error),
+
}
+

+
pub trait Handle {
+
    /// Fetch a project from the network. Fails if the project isn't tracked.
+
    fn fetch(&self, id: &Id) -> Result<(), Error>;
+
    /// Start tracking the given project. Doesn't do anything if the project is already
+
    /// tracked.
+
    fn track(&self, id: &Id) -> Result<bool, Error>;
+
    /// Untrack the given project and delete it from storage.
+
    fn untrack(&self, id: &Id) -> Result<bool, Error>;
+
    /// Notify the network that we have new refs.
+
    fn announce_refs(&self, id: &Id) -> Result<(), Error>;
+
    /// Ask the node to shutdown.
+
    fn shutdown(self) -> Result<(), Error>;
+
}
+

+
/// Node control socket.
+
pub struct Socket {
+
    stream: UnixStream,
+
}
+

+
impl Socket {
+
    pub fn connect<P: AsRef<Path>>(path: P) -> Result<Self, io::Error> {
+
        let stream = UnixStream::connect(path)?;
+

+
        Ok(Self { stream })
+
    }
+

+
    pub fn call<A: fmt::Display>(
+
        &self,
+
        cmd: &str,
+
        arg: &A,
+
    ) -> Result<impl Iterator<Item = Result<String, io::Error>> + '_, io::Error> {
+
        writeln!(&self.stream, "{cmd} {arg}")?;
+

+
        Ok(BufReader::new(&self.stream).lines())
+
    }
+
}
+

+
impl Handle for Socket {
+
    fn fetch(&self, id: &Id) -> Result<(), Error> {
+
        for line in self.call("fetch", id)? {
+
            let line = line?;
+
            log::info!("node: {}", line);
+
        }
+
        Ok(())
+
    }
+

+
    fn track(&self, id: &Id) -> Result<bool, Error> {
+
        for line in self.call("track", id)? {
+
            let line = line?;
+
            log::info!("node: {}", line);
+
        }
+
        Ok(true)
+
    }
+

+
    fn untrack(&self, id: &Id) -> Result<bool, Error> {
+
        for line in self.call("untrack", id)? {
+
            let line = line?;
+
            log::info!("node: {}", line);
+
        }
+
        Ok(true)
+
    }
+

+
    fn announce_refs(&self, id: &Id) -> Result<(), Error> {
+
        for line in self.call("announce-refs", id)? {
+
            let line = line?;
+
            log::info!("node: {}", line);
+
        }
+
        Ok(())
+
    }
+

+
    fn shutdown(self) -> Result<(), Error> {
+
        todo!();
+
    }
+
}
modified radicle/src/rad.rs
@@ -1,3 +1,4 @@
+
#![allow(clippy::let_unit_value)]
use std::io;
use std::path::Path;

@@ -6,6 +7,7 @@ use thiserror::Error;
use crate::crypto::{Signer, Verified};
use crate::git;
use crate::identity::Id;
+
use crate::node;
use crate::storage::refs::SignedRefs;
use crate::storage::{BranchName, ReadRepository as _, RemoteId, WriteRepository as _};
use crate::{identity, storage};
@@ -149,8 +151,8 @@ pub fn fork_remote<'r, G: Signer, S: storage::WriteStorage<'r>>(

pub fn fork<'r, G: Signer, S: storage::WriteStorage<'r>>(
    proj: &Id,
-
    signer: G,
-
    storage: S,
+
    signer: &G,
+
    storage: &S,
) -> Result<(), ForkError> {
    let me = signer.public_key();
    let repository = storage.repository(proj)?;
@@ -197,6 +199,57 @@ pub fn fork<'r, G: Signer, S: storage::WriteStorage<'r>>(
}

#[derive(Error, Debug)]
+
pub enum CloneError {
+
    #[error("node: {0}")]
+
    Node(#[from] node::Error),
+
    #[error("fork: {0}")]
+
    Fork(#[from] ForkError),
+
    #[error("checkout: {0}")]
+
    Checkout(#[from] CheckoutError),
+
}
+

+
pub fn clone<'r, P: AsRef<Path>, G: Signer, S: storage::WriteStorage<'r>, H: node::Handle>(
+
    proj: &Id,
+
    path: P,
+
    signer: &G,
+
    storage: &S,
+
    handle: &H,
+
) -> Result<git2::Repository, CloneError> {
+
    let _ = handle.fetch(proj)?;
+
    let _ = fork(proj, signer, storage)?;
+
    let working = checkout(proj, signer.public_key(), path, storage)?;
+

+
    Ok(working)
+
}
+

+
#[derive(Error, Debug)]
+
pub enum CloneUrlError {
+
    #[error("storage: {0}")]
+
    Storage(#[from] storage::Error),
+
    #[error("fetch: {0}")]
+
    Fetch(#[from] storage::FetchError),
+
    #[error("fork: {0}")]
+
    Fork(#[from] ForkError),
+
    #[error("checkout: {0}")]
+
    Checkout(#[from] CheckoutError),
+
}
+

+
pub fn clone_url<'r, P: AsRef<Path>, G: Signer, S: storage::WriteStorage<'r>>(
+
    proj: &Id,
+
    url: &git::Url,
+
    path: P,
+
    signer: &G,
+
    storage: &S,
+
) -> Result<git2::Repository, CloneUrlError> {
+
    let mut project = storage.repository(proj)?;
+
    let _updates = project.fetch(url)?;
+
    let _ = fork(proj, signer, storage)?;
+
    let working = checkout(proj, signer.public_key(), path, storage)?;
+

+
    Ok(working)
+
}
+

+
#[derive(Error, Debug)]
pub enum CheckoutError {
    #[error("git: {0}")]
    Git(#[from] git2::Error),
@@ -212,7 +265,7 @@ pub fn checkout<P: AsRef<Path>, S: storage::ReadStorage>(
    proj: &Id,
    remote: &RemoteId,
    path: P,
-
    storage: S,
+
    storage: &S,
) -> Result<git2::Repository, CheckoutError> {
    // TODO: Decide on whether we can use `clone_local`
    // TODO: Look into sharing object databases.