Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Rely on inventory announcement in `rad init`
Alexis Sellier committed 3 years ago
commit 3e7761740fa2a39c53e1593ed9a37f5984870920
parent b461950d59f55fccb8cf4bfed1d7c99fb9444018
9 files changed +94 -50
deleted radicle-cli/examples/rad-init-announce-refs.md
@@ -1,24 +0,0 @@
-

-
To create your first radicle project, navigate to a git repository, and run
-
the `init` command:
-

-
```
-
$ rad init --name heartwood --description "Radicle Heartwood Protocol & Stack" --no-confirm
-

-
Initializing local 🌱 project in .
-

-
ok Project heartwood created
-
{
-
  "name": "heartwood",
-
  "description": "Radicle Heartwood Protocol & Stack",
-
  "defaultBranch": "master"
-
}
-
ok Announcing refs..
-

-
Your project id is rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji. You can show it any time by running:
-
    rad .
-

-
To publish your project to the network, run:
-
    rad push
-

-
```
added radicle-cli/examples/rad-init-sync.md
@@ -0,0 +1,24 @@
+

+
To create your first radicle project, navigate to a git repository, and run
+
the `init` command:
+

+
```
+
$ rad init --name heartwood --description "Radicle Heartwood Protocol & Stack" --no-confirm
+

+
Initializing local 🌱 project in .
+

+
ok Project heartwood created
+
{
+
  "name": "heartwood",
+
  "description": "Radicle Heartwood Protocol & Stack",
+
  "defaultBranch": "master"
+
}
+
ok Syncing inventory..
+

+
Your project id is rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji. You can show it any time by running:
+
    rad .
+

+
To publish your project to the network, run:
+
    rad push
+

+
```
modified radicle-cli/src/commands/init.rs
@@ -242,8 +242,8 @@ pub fn init(options: Options, profile: &profile::Profile) -> anyhow::Result<()>
                self::setup_signing(profile.id(), &repo, interactive)?;
            }
            if options.sync {
-
                let spinner = term::spinner("Announcing refs..");
-
                if let Err(e) = node.announce_refs(id) {
+
                let spinner = term::spinner("Syncing inventory..");
+
                if let Err(e) = node.sync_inventory() {
                    spinner.error(e);
                } else {
                    spinner.finish();
modified radicle-cli/tests/commands.rs
@@ -1,5 +1,6 @@
use std::env;
use std::path::Path;
+
use std::{thread, time};

use radicle::git;
use radicle::profile::Home;
@@ -226,7 +227,7 @@ fn rad_clone_unknown() {
}

#[test]
-
fn rad_init_announce_refs_and_clone() {
+
fn rad_init_sync_and_clone() {
    logger::init(log::Level::Debug);

    let mut environment = Environment::new();
@@ -241,9 +242,14 @@ fn rad_init_announce_refs_and_clone() {

    fixtures::repository(working.join("alice"));

+
    // Necessary for now, if we don't want the new inventry announcement to be considered stale
+
    // for Bob.
+
    // TODO: Find a way to advance internal clocks instead.
+
    thread::sleep(time::Duration::from_secs(1));
+

    // Alice initializes a repo after her node has started, and after bob has connected to it.
    test(
-
        "examples/rad-init-announce-refs.md",
+
        "examples/rad-init-sync.md",
        &working.join("alice"),
        Some(&alice.home),
        [],
modified radicle-node/src/control.rs
@@ -42,6 +42,8 @@ pub fn listen<H: Handle<Error = runtime::HandleError, FetchResult = FetchResult>
                        handle.shutdown().ok();
                        break;
                    }
+
                    log::error!(target: "control", "Command returned error: {e}");
+

                    CommandResult::error(e).to_writer(&mut stream).ok();

                    stream.flush().ok();
modified radicle-node/src/service.rs
@@ -579,6 +579,15 @@ where
            } else {
                log::debug!(target: "service", "No fetch requests found for {rid}..");
            }
+

+
            // Announce the newly fetched project to the network, if necessary.
+
            // Since this fetch could be either a full clone or simply a ref update, we need to
+
            // either announce new inventory, or new refs.
+
            //
+
            // TODO: Announce new refs? Would require the ability to announce other peer refs.
+
            if let Err(e) = self.sync_and_announce_inventory() {
+
                error!(target: "service", "Failed to announce new inventory: {e}");
+
            }
        }

        if let Some(session) = self.sessions.get_mut(&remote) {
@@ -760,6 +769,35 @@ where
                        return Ok(false);
                    }
                }
+

+
                for id in message.inventory.as_slice() {
+
                    if let Some(sess) = self.sessions.get_mut(announcer) {
+
                        // If we are connected to the announcer of this inventory, update the peer's
+
                        // subscription filter to include all inventory items. This way, we'll
+
                        // relay messages relating to the peer's inventory.
+
                        if let Some(sub) = &mut sess.subscribe {
+
                            sub.filter.insert(id);
+
                        }
+

+
                        // If we're tracking and connected to the announcer, and we don't have
+
                        // the inventory, fetch it from the announcer.
+
                        if self.tracking.is_repo_tracked(id).expect(
+
                            "Service::handle_announcement: error accessing tracking configuration",
+
                        ) {
+
                            // Only if we do not have the repository locally do we fetch here.
+
                            // If we do have it, only fetch after receiving a ref announcement.
+
                            if let Ok(true) = self.storage.contains(id) {
+
                                // Do nothing.
+
                            } else {
+
                                // We may hit this branch due to an error returned by storage.
+
                                // We attempt to fetch in case of error because it's likely
+
                                // the repository was corrupted, and fetching will fix it.
+
                                self.fetch(*id, announcer);
+
                            }
+
                        }
+
                    }
+
                }
+

                return Ok(relay);
            }
            // Process a peer inventory update announcement by (maybe) fetching.
@@ -1068,11 +1106,6 @@ where
        let peers = self.sessions.negotiated().map(|(_, p)| p);
        let timestamp = self.clock.as_secs();

-
        // Make sure our local routing table is up to date with new repositories.
-
        if let Err(e) = self.routing.insert(id, self.node_id(), timestamp) {
-
            log::error!(target: "service", "Failed to update routing table with repository {id}: {e}");
-
        }
-

        if remote.refs.len() > Refs::max() {
            error!(
                target: "service",
modified radicle-node/src/test/peer.rs
@@ -202,7 +202,7 @@ where
    pub fn inventory_announcement(&self) -> Message {
        Message::inventory(
            InventoryAnnouncement {
-
                inventory: arbitrary::gen(3),
+
                inventory: arbitrary::vec(3).try_into().unwrap(),
                timestamp: self.timestamp(),
            },
            self.signer(),
modified radicle-node/src/test/simulator.rs
@@ -108,8 +108,8 @@ impl fmt::Display for Scheduled {
            Input::Fetched(fetch, _) => {
                write!(
                    f,
-
                    "{} <~ {} ({}): Fetched",
-
                    self.node, fetch.remote, fetch.rid
+
                    "{} <<~ {} ({}): Fetched (initiated={})",
+
                    self.node, fetch.remote, fetch.rid, fetch.initiated
                )
            }
        }
@@ -410,10 +410,10 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                    }
                    Input::Fetched(f, result) => {
                        let result = Rc::try_unwrap(result).unwrap();
-
                        let mut repo = p.storage().repository(f.rid).unwrap();
-

-
                        fetch(&mut repo, &f.remote, f.namespaces.clone()).unwrap();
-

+
                        if f.initiated {
+
                            let mut repo = p.storage().repository(f.rid).unwrap();
+
                            fetch(&mut repo, &f.remote, f.namespaces.clone()).unwrap();
+
                        }
                        p.fetched(f, result);
                    }
                }
@@ -608,17 +608,19 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                }
            }
            Io::Fetch(fetch) => {
+
                let remote = fetch.remote;
+

                if fetch.initiated {
                    log::info!(
                        target: "sim",
-
                        "{:05} {} ~> {} ({})",
-
                        self.elapsed().as_millis(), node, fetch.remote, fetch.rid
+
                        "{:05} {} ~> {} ({}): Fetch outgoing",
+
                        self.elapsed().as_millis(), node, remote, fetch.rid
                    );
                } else {
                    log::info!(
                        target: "sim",
-
                        "{:05} {} <~ {} ({})",
-
                        self.elapsed().as_millis(), node, fetch.remote, fetch.rid
+
                        "{:05} {} <~ {} ({}): Fetch incoming",
+
                        self.elapsed().as_millis(), node, remote, fetch.rid
                    );
                }

@@ -627,7 +629,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        self.time + LocalDuration::from_secs(3),
                        Scheduled {
                            node,
-
                            remote: fetch.remote,
+
                            remote,
                            input: Input::Fetched(
                                fetch,
                                Rc::new(Err(FetchError::Io(io::ErrorKind::Other.into()))),
@@ -639,7 +641,7 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                        self.time + LocalDuration::from_secs(3),
                        Scheduled {
                            node,
-
                            remote: fetch.remote,
+
                            remote,
                            input: Input::Fetched(fetch, Rc::new(Ok(vec![]))),
                        },
                    );
@@ -694,7 +696,6 @@ fn fetch<W: WriteRepository>(
    });
    opts.remote_callbacks(callbacks);

-
    // Fetch from the remote into the staging copy.
    let mut remote = repo.raw().remote_anonymous(
        radicle::storage::git::transport::remote::Url {
            node: *node,
modified radicle-node/src/tests.rs
@@ -548,8 +548,8 @@ fn test_refs_announcement_relay() {
    alice.connect_to(&bob);
    alice.connect_to(&eve);
    alice.receive(eve.id(), Message::Subscribe(Subscribe::all()));
-

    alice.receive(bob.id(), bob.refs_announcement(bob_inv[0]));
+

    assert_matches!(
        alice.messages(eve.id()).next(),
        Some(Message::Announcement(_)),
@@ -896,11 +896,13 @@ fn test_push_and_pull() {
    assert!(eve.get(proj_id).unwrap().is_none());
    assert!(bob.get(proj_id).unwrap().is_none());

-
    // Alice announces her refs.
+
    let (send, _) = chan::bounded(1);
+
    // Alice announces her inventory.
    // We now expect Eve to fetch Alice's project from Alice.
    // Then we expect Bob to fetch Alice's project from Eve.
    alice.elapse(LocalDuration::from_secs(1)); // Make sure our announcement is fresh.
-
    alice.command(service::Command::AnnounceRefs(proj_id));
+
    alice.command(service::Command::SyncInventory(send));
+

    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());

    // TODO: Refs should be compared between the two peers.