Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Add 'disconnect' command to handle
cloudhead committed 2 years ago
commit d77b5d5abe1032d48bd7aa0c1cb70272813674ac
parent 65aeebb2d14e3959d9a920b919e9643aa5bbda99
6 files changed +108 -0
modified radicle-node/src/control.rs
@@ -102,6 +102,12 @@ where
                }
            }
        }
+
        Command::Disconnect { nid } => match handle.disconnect(nid) {
+
            Err(e) => return Err(CommandError::Runtime(e)),
+
            Ok(()) => {
+
                CommandResult::ok().to_writer(writer).ok();
+
            }
+
        },
        Command::Fetch { rid, nid, timeout } => {
            fetch(rid, nid, timeout, writer, &mut handle)?;
        }
modified radicle-node/src/runtime/handle.rs
@@ -179,6 +179,20 @@ impl radicle::node::Handle for Handle {
            .map_err(Error::from)
    }

+
    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error> {
+
        let events = self.events();
+
        self.command(service::Command::Disconnect(node))?;
+
        events
+
            .wait(
+
                |e| match e {
+
                    Event::PeerDisconnected { nid, .. } if nid == &node => Some(()),
+
                    _ => None,
+
                },
+
                time::Duration::MAX,
+
            )
+
            .map_err(Error::from)
+
    }
+

    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error> {
        let (sender, receiver) = chan::bounded(1);
        self.command(service::Command::Seeds(id, sender))?;
modified radicle-node/src/test/environment.rs
@@ -232,6 +232,15 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
        self
    }

+
    pub fn disconnect(&mut self, remote: &NodeHandle<G>) {
+
        self.handle.disconnect(remote.id).unwrap();
+
    }
+

+
    /// Shutdown node.
+
    pub fn shutdown(self) {
+
        drop(self)
+
    }
+

    /// Get the full address of this node.
    pub fn address(&self) -> ConnectAddress {
        (self.id, node::Address::from(self.addr)).into()
@@ -301,6 +310,26 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
        }
    }

+
    /// Wait until this node has a repository.
+
    #[track_caller]
+
    pub fn has_repository(&self, rid: &RepoId) {
+
        log::debug!(target: "test", "Waiting for {} to have {rid}", self.id);
+
        let events = self.handle.events();
+

+
        loop {
+
            if self.storage.repository(*rid).is_ok() {
+
                log::debug!(target: "test", "Node {} has {rid}", self.id);
+
                break;
+
            }
+
            events
+
                .wait(
+
                    |e| matches!(e, Event::RefsFetched { .. }).then_some(()),
+
                    time::Duration::from_secs(6),
+
                )
+
                .unwrap();
+
        }
+
    }
+

    /// Wait until this node has the inventory of another node.
    #[track_caller]
    pub fn has_remote_of(&self, rid: &RepoId, nid: &NodeId) {
@@ -310,6 +339,7 @@ impl<G: Signer + cyphernet::Ecdh> NodeHandle<G> {
        loop {
            if let Ok(repo) = self.storage.repository(*rid) {
                if repo.remote(nid).is_ok() {
+
                    log::debug!(target: "test", "Node {} has {rid}/{nid}", self.id);
                    break;
                }
            }
modified radicle-node/src/test/handle.rs
@@ -48,6 +48,10 @@ impl radicle::node::Handle for Handle {
        unimplemented!();
    }

+
    fn disconnect(&mut self, _node: NodeId) -> Result<(), Self::Error> {
+
        unimplemented!();
+
    }
+

    fn seeds(&mut self, _id: RepoId) -> Result<Seeds, Self::Error> {
        unimplemented!();
    }
modified radicle-node/src/tests/e2e.rs
@@ -1265,3 +1265,43 @@ fn test_background_foreground_fetch() {
    assert_ne!(eves_refs_expected, old_refs);
    assert_eq!(eves_refs_expected, eves_refs);
}
+

+
#[test]
+
/// Alice is offline while Bob pushes some changes to the repo. When Alice reconnects,
+
/// she is made aware of the changes via the `subscribe` message, and fetches from the seed.
+
fn test_catchup_on_refs_announcements() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = Node::init(tmp.path(), Config::test(Alias::new("alice")));
+
    let bob = Node::init(tmp.path(), Config::test(Alias::new("bob")));
+
    let bob_id = bob.id;
+
    let seed = Node::init(tmp.path(), Config::test(Alias::new("seed")));
+
    let acme = alice.project("acme", "");
+

+
    let mut alice = alice.spawn();
+
    let mut bob = bob.spawn();
+
    let mut seed = seed.spawn();
+

+
    bob.handle.seed(acme, Scope::All).unwrap();
+
    seed.handle.seed(acme, Scope::All).unwrap();
+

+
    alice.connect(&seed);
+
    seed.has_repository(&acme);
+
    alice.disconnect(&seed);
+
    bob.connect(&seed);
+
    bob.has_repository(&acme);
+

+
    log::debug!(target: "test", "Bob creating his issue..");
+
    bob.issue(acme, "Bob's issue", "[..]");
+
    bob.handle.announce_refs(acme).unwrap();
+

+
    log::debug!(target: "test", "Waiting for seed to fetch Bob's refs from Bob..");
+
    seed.has_remote_of(&acme, &bob.id); // Seed fetches Bob's refs.
+
    bob.disconnect(&seed);
+
    bob.shutdown();
+

+
    log::debug!(target: "test", "Alice re-connects to the seed..");
+
    alice.connect(&seed);
+
    alice.has_remote_of(&acme, &bob_id);
+
}
modified radicle/src/node.rs
@@ -455,6 +455,10 @@ pub enum Command {
        opts: ConnectOptions,
    },

+
    /// Disconnect from a node.
+
    #[serde(rename_all = "camelCase")]
+
    Disconnect { nid: NodeId },
+

    /// Lookup seeds for the given repository in the routing table.
    #[serde(rename_all = "camelCase")]
    Seeds { rid: RepoId },
@@ -862,6 +866,8 @@ pub trait Handle: Clone + Sync + Send {
        addr: Address,
        opts: ConnectOptions,
    ) -> Result<ConnectResult, Self::Error>;
+
    /// Disconnect from a peer.
+
    fn disconnect(&mut self, node: NodeId) -> Result<(), Self::Error>;
    /// Lookup the seeds of a given repository in the routing table.
    fn seeds(&mut self, id: RepoId) -> Result<Seeds, Self::Error>;
    /// Fetch a repository from the network.
@@ -1069,6 +1075,14 @@ impl Handle for Node {
        Ok(result)
    }

+
    fn disconnect(&mut self, nid: NodeId) -> Result<(), Self::Error> {
+
        self.call::<ConnectResult>(Command::Disconnect { nid }, DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse)??;
+

+
        Ok(())
+
    }
+

    fn seeds(&mut self, rid: RepoId) -> Result<Seeds, Error> {
        let seeds = self
            .call::<Seeds>(Command::Seeds { rid }, DEFAULT_TIMEOUT)?