Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add track/untrack support
Slack Coder committed 3 years ago
commit d273cd02475f26f8af80964f324c133a22cc2cf0
parent 8fbcb519636f3066fc90686593fc5323d73e4e80
5 files changed +117 -7
modified node/src/client/handle.rs
@@ -54,6 +54,25 @@ pub struct Handle<R: Reactor> {
}

impl<R: Reactor> traits::Handle for Handle<R> {
+
    /// Retrieve or update the project from network.
+
    fn fetch(&self, _id: ProjId) -> Result<(), Error> {
+
        todo!()
+
    }
+

+
    /// Start tracking the given project. Doesn't do anything if the project is already tracked.
+
    fn track(&self, id: ProjId) -> Result<bool, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.commands.send(protocol::Command::Track(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

+
    /// Untrack the given project and delete it from storage.
+
    fn untrack(&self, id: ProjId) -> Result<bool, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.commands.send(protocol::Command::Untrack(id, sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

    /// Notify the client that a project has been updated.
    fn updated(&self, id: ProjId) -> Result<(), Error> {
        self.command(protocol::Command::AnnounceRefsUpdate(id))
@@ -80,6 +99,13 @@ pub mod traits {
    use super::*;

    pub trait Handle {
+
        /// Retrieve or update the project from network.
+
        fn fetch(&self, id: ProjId) -> Result<(), Error>;
+
        /// Start tracking the given project. Doesn't do anything if the project is already
+
        /// tracked.
+
        fn track(&self, id: ProjId) -> Result<bool, Error>;
+
        /// Untrack the given project and delete it from storage.
+
        fn untrack(&self, id: ProjId) -> Result<bool, Error>;
        /// Notify the client that a project has been updated.
        fn updated(&self, id: ProjId) -> Result<(), Error>;
        /// Send a command to the command channel, and wake up the event loop.
modified node/src/control.rs
@@ -60,8 +60,36 @@ enum DrainError {
fn drain<H: Handle>(stream: &UnixStream, handle: &H) -> Result<(), DrainError> {
    let mut reader = BufReader::new(stream);

+
    // TODO: refactor to include helper
    for line in reader.by_ref().lines().flatten() {
        match line.split_once(' ') {
+
            Some(("fetch", arg)) => {
+
                if let Ok(id) = arg.parse() {
+
                    if let Err(e) = handle.fetch(id) {
+
                        return Err(DrainError::Client(e));
+
                    }
+
                } else {
+
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
+
                }
+
            }
+
            Some(("track", arg)) => {
+
                if let Ok(id) = arg.parse() {
+
                    if let Err(e) = handle.track(id) {
+
                        return Err(DrainError::Client(e));
+
                    }
+
                } else {
+
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
+
                }
+
            }
+
            Some(("untrack", arg)) => {
+
                if let Ok(id) = arg.parse() {
+
                    if let Err(e) = handle.untrack(id) {
+
                        return Err(DrainError::Client(e));
+
                    }
+
                } else {
+
                    return Err(DrainError::InvalidCommandArg(arg.to_owned()));
+
                }
+
            }
            Some(("update", arg)) => {
                if let Ok(id) = arg.parse() {
                    if let Err(e) = handle.updated(id) {
modified node/src/protocol.rs
@@ -86,9 +86,11 @@ pub enum FetchResult {
/// Commands sent to the protocol by the operator.
#[derive(Debug)]
pub enum Command {
+
    AnnounceRefsUpdate(ProjId),
    Connect(net::SocketAddr),
    Fetch(ProjId, chan::Sender<FetchLookup>),
-
    AnnounceRefsUpdate(ProjId),
+
    Track(ProjId, chan::Sender<bool>),
+
    Untrack(ProjId, chan::Sender<bool>),
}

/// Command-related errors.
@@ -412,6 +414,12 @@ where
                    }
                }
            }
+
            Command::Track(proj, resp) => {
+
                resp.send(self.track(proj)).ok();
+
            }
+
            Command::Untrack(proj, resp) => {
+
                resp.send(self.untrack(proj)).ok();
+
            }
            Command::AnnounceRefsUpdate(proj) => {
                let user = *self.storage.user_id();
                let repo = self.storage.repository(&proj).unwrap();
modified node/src/test/handle.rs
@@ -1,26 +1,39 @@
use std::sync::{Arc, Mutex};

-
use crate::client::handle;
use crate::client::handle::traits;
-
use crate::{client, identity, protocol};
+
use crate::client::handle::Error;
+
use crate::identity::ProjId;
+
use crate::protocol;

#[derive(Default, Clone)]
pub struct Handle {
-
    pub updates: Arc<Mutex<Vec<identity::ProjId>>>,
+
    pub updates: Arc<Mutex<Vec<ProjId>>>,
}

impl traits::Handle for Handle {
-
    fn updated(&self, id: identity::ProjId) -> Result<(), handle::Error> {
+
    fn fetch(&self, _id: ProjId) -> Result<(), Error> {
+
        Ok(())
+
    }
+

+
    fn track(&self, _id: ProjId) -> Result<bool, Error> {
+
        Ok(true)
+
    }
+

+
    fn untrack(&self, _id: ProjId) -> Result<bool, Error> {
+
        Ok(true)
+
    }
+

+
    fn updated(&self, id: ProjId) -> Result<(), Error> {
        self.updates.lock().unwrap().push(id);

        Ok(())
    }

-
    fn command(&self, _cmd: protocol::Command) -> Result<(), handle::Error> {
+
    fn command(&self, _cmd: protocol::Command) -> Result<(), Error> {
        Ok(())
    }

-
    fn shutdown(self) -> Result<(), client::handle::Error> {
+
    fn shutdown(self) -> Result<(), Error> {
        Ok(())
    }
}
modified node/src/test/tests.rs
@@ -1,6 +1,7 @@
use std::io;
use std::sync::Arc;

+
use crossbeam_channel as chan;
use nakamoto_net as nakamoto;
use nakamoto_net::simulator;
use nakamoto_net::simulator::{Peer as _, Simulation};
@@ -168,6 +169,40 @@ fn test_inventory_sync() {
}

#[test]
+
fn test_tracking() {
+
    let mut alice = Peer::config(
+
        "alice",
+
        Config {
+
            project_tracking: ProjectTracking::Allowed(HashSet::default()),
+
            ..Config::default()
+
        },
+
        [7, 7, 7, 7],
+
        vec![],
+
        MockStorage::empty(),
+
        fastrand::Rng::new(),
+
    );
+
    let proj_id: identity::ProjId = test::arbitrary::gen(1);
+

+
    let (sender, receiver) = chan::bounded(1);
+
    alice.command(Command::Track(proj_id.clone(), sender));
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(client::handle::Error::from)
+
        .unwrap();
+
    assert!(policy_change);
+
    assert!(alice.config().is_tracking(&proj_id));
+

+
    let (sender, receiver) = chan::bounded(1);
+
    alice.command(Command::Untrack(proj_id.clone(), sender));
+
    let policy_change = receiver
+
        .recv()
+
        .map_err(client::handle::Error::from)
+
        .unwrap();
+
    assert!(policy_change);
+
    assert!(!alice.config().is_tracking(&proj_id));
+
}
+

+
#[test]
fn test_inventory_relay_bad_timestamp() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());