Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
radicle: return RefsAt for announce_refs
Fintan Halpenny committed 2 years ago
commit 85d452b2221e125923ea87d3041dff46384d681b
parent 63ffdcd46785a5e426dcccb9c26e9f6cba84527e
12 files changed +76 -39
modified radicle-cli/examples/rad-patch-pull-update.md
@@ -40,7 +40,7 @@ We wait for Alice to sync our fork.
``` ~bob
$ rad node events -n 2 --timeout 1
{"type":"refsAnnounced","nid":"z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi","rid":"rad:zhbMU4DUXrzB8xT6qAJh6yZ7bFMK","refs":[{"remote":"z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk","at":"161b775a3509c8098de67f57f750972bba015b31"}],"timestamp":[..]}
-
{"type":"refsSynced","remote":"z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi","rid":"rad:zhbMU4DUXrzB8xT6qAJh6yZ7bFMK"}
+
{"type":"refsSynced","remote":"z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi","rid":"rad:zhbMU4DUXrzB8xT6qAJh6yZ7bFMK","at":"161b775a3509c8098de67f57f750972bba015b31"}
```

Bob then opens a patch.
modified radicle-cli/src/commands/issue.rs
@@ -418,7 +418,7 @@ pub fn run(options: Options, ctx: impl term::Context) -> anyhow::Result<()> {

    if announce {
        match node.announce_refs(rid) {
-
            Ok(()) => {}
+
            Ok(_) => {}
            Err(e) if e.is_connection_err() => {}
            Err(e) => return Err(e.into()),
        }
modified radicle-cli/src/main.rs
@@ -279,7 +279,7 @@ fn run_other(exe: &str, args: &[OsString]) -> Result<(), Option<anyhow::Error>>
        ),
        other => {
            let exe = format!("{NAME}-{exe}");
-
            let status = process::Command::new(exe.clone()).args(args).status();
+
            let status = process::Command::new(exe).args(args).status();

            match status {
                Ok(status) => {
modified radicle-node/src/control.rs
@@ -153,10 +153,9 @@ where
            }
        },
        Command::AnnounceRefs { rid } => {
-
            if let Err(e) = handle.announce_refs(rid) {
-
                return Err(CommandError::Runtime(e));
-
            }
-
            CommandResult::ok().to_writer(writer).ok();
+
            let refs = handle.announce_refs(rid)?;
+

+
            json::to_writer(writer, &refs)?;
        }
        Command::AnnounceInventory => {
            if let Err(e) = handle.announce_inventory() {
@@ -265,7 +264,14 @@ mod tests {
            let stream = BufReader::new(stream);
            let line = stream.lines().next().unwrap().unwrap();

-
            assert_eq!(line, json::json!({}).to_string());
+
            assert_eq!(
+
                line,
+
                json::json!({
+
                    "remote": handle.nid().unwrap(),
+
                    "at":"0000000000000000000000000000000000000000"
+
                })
+
                .to_string()
+
            );
        }

        for rid in &rids {
modified radicle-node/src/runtime/handle.rs
@@ -5,6 +5,7 @@ use std::{fmt, io, time};

use crossbeam_channel as chan;
use radicle::node::{ConnectOptions, ConnectResult, Seeds};
+
use radicle::storage::refs::RefsAt;
use reactor::poller::popol::PopolWaker;
use thiserror::Error;

@@ -224,9 +225,10 @@ impl radicle::node::Handle for Handle {
        receiver.recv().map_err(Error::from)
    }

-
    fn announce_refs(&mut self, id: Id) -> Result<(), Error> {
-
        self.command(service::Command::AnnounceRefs(id))
-
            .map_err(Error::from)
+
    fn announce_refs(&mut self, id: Id) -> Result<RefsAt, Error> {
+
        let (sender, receiver) = chan::bounded(1);
+
        self.command(service::Command::AnnounceRefs(id, sender))?;
+
        receiver.recv().map_err(Error::from)
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
modified radicle-node/src/service.rs
@@ -137,7 +137,7 @@ pub type QueryState = dyn Fn(&dyn ServiceState) -> Result<(), CommandError> + Se
/// Commands sent to the service by the operator.
pub enum Command {
    /// Announce repository references for given repository to peers.
-
    AnnounceRefs(Id),
+
    AnnounceRefs(Id, chan::Sender<RefsAt>),
    /// Announce local repositories to peers.
    AnnounceInventory,
    /// Announce local inventory to peers.
@@ -167,7 +167,7 @@ pub enum Command {
impl fmt::Debug for Command {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
-
            Self::AnnounceRefs(id) => write!(f, "AnnounceRefs({id})"),
+
            Self::AnnounceRefs(id, _) => write!(f, "AnnounceRefs({id})"),
            Self::AnnounceInventory => write!(f, "AnnounceInventory"),
            Self::SyncInventory(_) => write!(f, "SyncInventory(..)"),
            Self::Connect(id, addr, opts) => write!(f, "Connect({id}, {addr}, {opts:?})"),
@@ -611,11 +611,17 @@ where
                    .expect("Service::command: error untracking node");
                resp.send(untracked).ok();
            }
-
            Command::AnnounceRefs(id) => {
-
                if let Err(err) = self.announce_refs(id, [self.node_id()]) {
-
                    error!("Error announcing refs: {}", err);
+
            Command::AnnounceRefs(id, resp) => match self.announce_refs(id, [self.node_id()]) {
+
                Ok(refs) => {
+
                    #[allow(clippy::unwrap_used)]
+
                    // SAFETY: we announced only our refs
+
                    let refs = refs.first().unwrap();
+
                    resp.send(*refs).ok();
                }
-
            }
+
                Err(err) => {
+
                    error!("Error announcing refs: {err}");
+
                }
+
            },
            Command::AnnounceInventory => {
                if let Err(err) = self
                    .storage
@@ -1134,10 +1140,11 @@ where
                // This event is used for showing sync progress to users.
                match message.is_synced(&self.node_id(), &self.storage) {
                    Ok(synced) => {
-
                        if synced {
+
                        if let Some(at) = synced {
                            self.emitter.emit(Event::RefsSynced {
                                rid: message.rid,
                                remote: *announcer,
+
                                at,
                            });
                        }
                    }
@@ -1482,7 +1489,7 @@ where
        &mut self,
        rid: Id,
        remotes: impl IntoIterator<Item = NodeId>,
-
    ) -> Result<(), Error> {
+
    ) -> Result<Vec<RefsAt>, Error> {
        let repo = self.storage.repository(rid)?;
        let doc = repo.identity_doc()?;
        let peers = self.sessions.connected().map(|(_, p)| p);
@@ -1502,7 +1509,7 @@ where

        let msg = AnnouncementMessage::from(RefsAnnouncement {
            rid,
-
            refs,
+
            refs: refs.clone(),
            timestamp,
        });
        let ann = msg.signed(&self.signer);
@@ -1515,7 +1522,7 @@ where
            }),
        );

-
        Ok(())
+
        Ok(refs.into())
    }

    fn sync_and_announce(&mut self) {
modified radicle-node/src/service/message.rs
@@ -1,5 +1,6 @@
use std::{fmt, io, mem};

+
use radicle::git::Oid;
use radicle::storage::refs::RefsAt;

use crate::crypto;
@@ -191,19 +192,19 @@ impl RefsAnnouncement {
        &self,
        remote: &NodeId,
        storage: S,
-
    ) -> Result<bool, storage::Error> {
+
    ) -> Result<Option<Oid>, storage::Error> {
        let repo = match storage.repository(self.rid) {
            // If the repo doesn't exist, we're not in sync.
-
            Err(e) if e.is_not_found() => return Ok(false),
+
            Err(e) if e.is_not_found() => return Ok(None),
            Err(e) => return Err(e),
            Ok(r) => r,
        };

        if let Some(theirs) = self.refs.iter().find(|refs_at| refs_at.remote == *remote) {
            let ours = RefsAt::new(&repo, *remote)?;
-
            return Ok(ours.at == theirs.at);
+
            return Ok((ours.at == theirs.at).then_some(theirs.at));
        }
-
        Ok(false)
+
        Ok(None)
    }
}

modified radicle-node/src/test/handle.rs
@@ -3,6 +3,9 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::{io, time};

+
use radicle::git;
+
use radicle::storage::refs::RefsAt;
+

use crate::identity::Id;
use crate::node::{Alias, Config, ConnectOptions, ConnectResult, Event, FetchResult, Seeds};
use crate::runtime::HandleError;
@@ -80,10 +83,13 @@ impl radicle::node::Handle for Handle {
        Ok(self.tracking_nodes.lock().unwrap().remove(&id))
    }

-
    fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error> {
+
    fn announce_refs(&mut self, id: Id) -> Result<RefsAt, Self::Error> {
        self.updates.lock().unwrap().push(id);

-
        Ok(())
+
        Ok(RefsAt {
+
            remote: self.nid()?,
+
            at: git::raw::Oid::zero().into(),
+
        })
    }

    fn announce_inventory(&mut self) -> Result<(), Self::Error> {
modified radicle-node/src/tests.rs
@@ -1353,7 +1353,12 @@ fn test_refs_synced_event() {
    events
        .wait(
            |e| {
-
                if let Event::RefsSynced { remote, rid } = e {
+
                if let Event::RefsSynced {
+
                    remote,
+
                    rid,
+
                    at: _at,
+
                } = e
+
                {
                    assert_eq!(remote, &bob.id);
                    assert_eq!(rid, &acme);

modified radicle/src/node.rs
@@ -24,6 +24,7 @@ use serde_json as json;
use crate::crypto::PublicKey;
use crate::identity::Id;
use crate::profile;
+
use crate::storage::refs::RefsAt;
use crate::storage::RefUpdate;

pub use address::{KnownAddress, SyncedAt};
@@ -742,7 +743,7 @@ pub trait Handle: Clone + Sync + Send {
    /// Untrack the given node.
    fn untrack_node(&mut self, id: NodeId) -> Result<bool, Self::Error>;
    /// Notify the service that a project has been updated, and announce local refs.
-
    fn announce_refs(&mut self, id: Id) -> Result<(), Self::Error>;
+
    fn announce_refs(&mut self, id: Id) -> Result<RefsAt, Self::Error>;
    /// Announce local inventory.
    fn announce_inventory(&mut self) -> Result<(), Self::Error>;
    /// Notify the service that our inventory was updated.
@@ -823,7 +824,7 @@ impl Node {
        let events = self.subscribe(timeout)?;
        let mut seeds = seeds.into_iter().collect::<BTreeSet<_>>();

-
        self.announce_refs(rid)?;
+
        let refs = self.announce_refs(rid)?;

        callback(AnnounceEvent::Announced);

@@ -832,9 +833,14 @@ impl Node {

        for e in events {
            match e {
-
                Ok(Event::RefsSynced { remote, rid: rid_ }) if rid == rid_ => {
-
                    seeds.remove(&remote);
-
                    synced.push(remote);
+
                Ok(Event::RefsSynced {
+
                    remote,
+
                    rid: rid_,
+
                    at,
+
                }) if rid == rid_ => {
+
                    if seeds.remove(&remote) && refs.at == at {
+
                        synced.push(remote);
+
                    }

                    callback(AnnounceEvent::RefsSynced { remote });
                }
@@ -963,11 +969,13 @@ impl Handle for Node {
        Ok(response.updated)
    }

-
    fn announce_refs(&mut self, rid: Id) -> Result<(), Error> {
-
        for line in self.call::<Success>(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)? {
-
            line?;
-
        }
-
        Ok(())
+
    fn announce_refs(&mut self, rid: Id) -> Result<RefsAt, Error> {
+
        let refs: RefsAt = self
+
            .call(Command::AnnounceRefs { rid }, DEFAULT_TIMEOUT)?
+
            .next()
+
            .ok_or(Error::EmptyResponse)??;
+

+
        Ok(refs)
    }

    fn announce_inventory(&mut self) -> Result<(), Error> {
modified radicle/src/node/events.rs
@@ -3,6 +3,7 @@ use std::time;

use crossbeam_channel as chan;

+
use crate::git::Oid;
use crate::node;
use crate::prelude::*;
use crate::storage::{refs, RefUpdate};
@@ -19,6 +20,7 @@ pub enum Event {
    RefsSynced {
        remote: NodeId,
        rid: Id,
+
        at: Oid,
    },
    SeedDiscovered {
        rid: Id,
modified radicle/src/storage/refs.rs
@@ -369,7 +369,7 @@ impl<V> Deref for SignedRefs<V> {
///
/// It can also be used for communicating announcements of updates
/// references to other nodes.
-
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RefsAt {
    /// The remote namespace of the `rad/sigrefs`.