Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement inventory sync command
Alexis Sellier committed 3 years ago
commit 7087288cfbc762d92757768973987096a23d66f5
parent 8c7c2242cc47cf093078728a6b9f640c88970b96
7 files changed +88 -33
modified radicle-node/src/control.rs
@@ -62,8 +62,6 @@ enum CommandError {
    InvalidCommandArg(String, Box<dyn std::error::Error>),
    #[error("invalid command arguments `{0:?}`")]
    InvalidCommandArgs(Vec<String>),
-
    #[error("unknown command `{0}`")]
-
    UnknownCommand(String),
    #[error("serialization failed: {0}")]
    Serialization(#[from] json::Error),
    #[error("runtime error: {0}")]
@@ -89,6 +87,9 @@ fn command<H: Handle<Error = runtime::HandleError, FetchResult = FetchResult>>(
    let cmd: Command = json::from_str(input)?;

    match cmd.name {
+
        CommandName::Connect => {
+
            todo!();
+
        }
        CommandName::Fetch => {
            let (rid, nid): (Id, NodeId) = parse::args(cmd)?;
            fetch(rid, nid, LineWriter::new(stream), handle)?;
@@ -162,6 +163,14 @@ fn command<H: Handle<Error = runtime::HandleError, FetchResult = FetchResult>>(
            }
            CommandResult::ok().to_writer(writer).ok();
        }
+
        CommandName::SyncInventory => match handle.sync_inventory() {
+
            Ok(updated) => {
+
                CommandResult::Okay { updated }.to_writer(writer)?;
+
            }
+
            Err(e) => {
+
                return Err(CommandError::Runtime(e));
+
            }
+
        },
        CommandName::Status => {
            CommandResult::ok().to_writer(writer).ok();
        }
@@ -184,9 +193,6 @@ fn command<H: Handle<Error = runtime::HandleError, FetchResult = FetchResult>>(
        CommandName::Shutdown => {
            return Err(CommandError::Shutdown);
        }
-
        _ => {
-
            return Err(CommandError::UnknownCommand(line));
-
        }
    }
    Ok(())
}
modified radicle-node/src/runtime/handle.rs
@@ -159,6 +159,12 @@ impl<G: Signer + EcSign + 'static> radicle::node::Handle for Handle<G> {
        self.command(service::Command::AnnounceRefs(id))
    }

+
    fn sync_inventory(&mut self) -> Result<bool, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::SyncInventory(sender))?;
+
        receiver.recv().map_err(Error::from)
+
    }
+

    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error> {
        let (sender, receiver) = chan::unbounded();
        let query: Arc<QueryState> = Arc::new(move |state| {
modified radicle-node/src/service.rs
@@ -1,5 +1,6 @@
#![allow(clippy::too_many_arguments)]
#![allow(clippy::collapsible_match)]
+
#![allow(clippy::collapsible_if)]
pub mod config;
pub mod filter;
pub mod message;
@@ -103,6 +104,8 @@ pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Se
pub enum Command {
    /// Announce repository references for given repository to peers.
    AnnounceRefs(Id),
+
    /// Announce local inventory to peers.
+
    SyncInventory(chan::Sender<bool>),
    /// Connect to node with the given address.
    Connect(NodeId, Address),
    /// Lookup seeds for the given repository in the routing table.
@@ -125,6 +128,7 @@ impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({id})"),
+
            Self::SyncInventory(_) => write!(f, "SyncInventory(..)"),
            Self::Connect(id, addr) => write!(f, "Connect({id}, {addr})"),
            Self::Seeds(id, _) => write!(f, "Seeds({id})"),
            Self::Fetch(id, node, _) => write!(f, "Fetch({id}, {node})"),
@@ -396,7 +400,11 @@ where
        }
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
            if self.out_of_sync {
-
                if let Err(err) = self.announce_inventory() {
+
                if let Err(err) = self
+
                    .storage
+
                    .inventory()
+
                    .and_then(|i| self.announce_inventory(i))
+
                {
                    error!("Error announcing inventory: {}", err);
                }
            }
@@ -480,6 +488,12 @@ where
                    error!("Error announcing refs: {}", err);
                }
            }
+
            Command::SyncInventory(resp) => {
+
                let updated = self
+
                    .sync_and_announce_inventory()
+
                    .expect("Service::command: error syncing and announcing inventory");
+
                resp.send(updated).ok();
+
            }
            Command::QueryState(query, sender) => {
                sender.send(query(self)).ok();
            }
@@ -735,16 +749,16 @@ where
                    return Ok(false);
                }

-
                if let Err(err) = self.sync_inventory(
-
                    message.inventory.as_slice(),
-
                    *announcer,
-
                    &message.timestamp,
-
                ) {
-
                    error!("Error processing inventory from {}: {}", announcer, err);
-

-
                    // There's not much we can do if the peer sending us this message isn't the
-
                    // origin of it.
-
                    return Ok(false);
+
                match self.sync_routing(&message.inventory, *announcer, message.timestamp) {
+
                    Ok(updated) => {
+
                        if !updated {
+
                            return Ok(false);
+
                        }
+
                    }
+
                    Err(e) => {
+
                        error!("Error processing inventory from {}: {}", announcer, e);
+
                        return Ok(false);
+
                    }
                }
                return Ok(relay);
            }
@@ -995,19 +1009,32 @@ where
        Ok(())
    }

+
    /// Sync, and if needed, announce our local inventory.
+
    fn sync_and_announce_inventory(&mut self) -> Result<bool, Error> {
+
        let inventory = self.storage.inventory()?;
+
        let updated = self.sync_routing(&inventory, self.node_id(), self.clock.as_secs())?;
+

+
        if updated {
+
            self.announce_inventory(inventory)?;
+
        }
+
        Ok(updated)
+
    }
+

    /// Process a peer inventory announcement by updating our routing table.
    /// This function expects the peer's full inventory, and prunes entries that are not in the
    /// given inventory.
-
    fn sync_inventory(
+
    fn sync_routing(
        &mut self,
        inventory: &[Id],
        from: NodeId,
-
        timestamp: &Timestamp,
-
    ) -> Result<(), Error> {
+
        timestamp: Timestamp,
+
    ) -> Result<bool, Error> {
+
        let mut updated = false;
        let mut included = HashSet::new();
+

        for proj_id in inventory {
            included.insert(proj_id);
-
            if self.routing.insert(*proj_id, from, *timestamp)? {
+
            if self.routing.insert(*proj_id, from, timestamp)? {
                info!(target: "service", "Routing table updated for {proj_id} with seed {from}");

                if self
@@ -1018,14 +1045,17 @@ where
                    // TODO: We should fetch here if we're already connected, case this seed has
                    // refs we don't have.
                }
+
                updated = true;
            }
        }
        for id in self.routing.get_resources(&from)?.into_iter() {
            if !included.contains(&id) {
-
                self.routing.remove(&id, &from)?;
+
                if self.routing.remove(&id, &from)? {
+
                    updated = true;
+
                }
            }
        }
-
        Ok(())
+
        Ok(updated)
    }

    /// Announce local refs for given id.
@@ -1088,13 +1118,9 @@ where
    ////////////////////////////////////////////////////////////////////////////

    /// Announce our inventory to all connected peers.
-
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
-
        let inventory = self.storage().inventory()?;
-
        let inv = Message::inventory(
-
            gossip::inventory(self.clock.as_secs(), inventory),
-
            &self.signer,
-
        );
-

+
    fn announce_inventory(&mut self, inventory: Vec<Id>) -> Result<(), storage::Error> {
+
        let time = self.clock.as_secs();
+
        let inv = Message::inventory(gossip::inventory(time, inventory), &self.signer);
        for id in self.sessions.negotiated().map(|(id, _)| id) {
            self.reactor.write(*id, inv.clone());
        }
modified radicle-node/src/test/gossip.rs
@@ -26,7 +26,7 @@ pub fn messages(count: usize, now: LocalTime, delta: LocalDuration) -> Vec<Messa

        msgs.push(Message::inventory(
            InventoryAnnouncement {
-
                inventory: arbitrary::gen(3),
+
                inventory: arbitrary::vec(3).try_into().unwrap(),
                timestamp: time.as_secs(),
            },
            &signer,
modified radicle-node/src/test/handle.rs
@@ -60,6 +60,10 @@ impl radicle::node::Handle for Handle {
        Ok(())
    }

+
    fn sync_inventory(&mut self) -> Result<bool, Self::Error> {
+
        unimplemented!()
+
    }
+

    fn routing(&self) -> Result<chan::Receiver<(Id, service::NodeId)>, Self::Error> {
        unimplemented!();
    }
modified radicle-node/src/tests.rs
@@ -598,7 +598,7 @@ fn test_inventory_relay() {
    let mut alice = Peer::new("alice", [7, 7, 7, 7]);
    let bob = Peer::new("bob", [8, 8, 8, 8]);
    let eve = Peer::new("eve", [9, 9, 9, 9]);
-
    let inv = BoundedVec::new();
+
    let inv = BoundedVec::try_from(arbitrary::vec(1)).unwrap();
    let now = LocalTime::now().as_secs();

    // Inventory from Bob relayed to Eve.
modified radicle/src/node.rs
@@ -108,6 +108,8 @@ impl From<net::SocketAddr> for Address {
pub enum CommandName {
    /// Announce repository references for given repository to peers.
    AnnounceRefs,
+
    /// Sync local inventory with node.
+
    SyncInventory,
    /// Connect to node with the given address.
    Connect,
    /// Lookup seeds for the given repository in the routing table.
@@ -261,9 +263,11 @@ pub trait Handle {
    fn untrack_repo(&mut self, id: Id) -> Result<bool, Self::Error>;
    /// Untrack the given node.
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Self::Error>;
-
    /// Notify the client that a project has been updated.
+
    /// Notify the service that a project has been updated.
    fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error>;
-
    /// Ask the client to shutdown.
+
    /// Notify the service that our inventory was updated.
+
    fn sync_inventory(&mut self) -> Result<bool, Self::Error>;
+
    /// Ask the service to shutdown.
    fn shutdown(self) -> Result<(), Self::Error>;
    /// Query the routing table entries.
    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Self::Error>;
@@ -403,6 +407,15 @@ impl Handle for Node {
        Ok(())
    }

+
    fn sync_inventory(&mut self) -> Result<bool, Error> {
+
        let mut line = self.call::<&str, _>(CommandName::SyncInventory, [])?;
+
        let response: CommandResult = line.next().ok_or(Error::EmptyResponse {
+
            cmd: CommandName::SyncInventory,
+
        })??;
+

+
        response.into()
+
    }
+

    fn routing(&self) -> Result<chan::Receiver<(Id, NodeId)>, Error> {
        todo!();
    }