Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Implement auto-fetching of projects
Alexis Sellier committed 3 years ago
commit 92f17d71c3e851174913a47f8c4addfaafa80c8d
parent a5a7af57b50c2c9ddaa5b278417f82e61deddc99
6 files changed +146 -62
modified node/src/protocol.rs
@@ -180,6 +180,13 @@ impl Config {
        self.connect.contains(addr)
    }

+
    pub fn is_tracking(&self, proj: &ProjId) -> bool {
+
        match &self.project_tracking {
+
            ProjectTracking::All { blocked } => !blocked.contains(proj),
+
            ProjectTracking::Allowed(projs) => projs.contains(proj),
+
        }
+
    }
+

    /// Track a project. Returns whether the policy was updated.
    pub fn track(&mut self, proj: ProjId) -> bool {
        match &mut self.project_tracking {
@@ -686,7 +693,7 @@ pub struct Context<S, T> {

impl<S, T> Context<S, T>
where
-
    T: storage::ReadStorage,
+
    T: storage::ReadStorage + storage::WriteStorage,
{
    pub(crate) fn new(
        config: Config,
@@ -706,22 +713,25 @@ where
        }
    }

-
    /// Get current local timestamp.
-
    pub(crate) fn timestamp(&self) -> Timestamp {
-
        self.clock.local_time().as_secs()
-
    }
-

    /// Process a peer inventory announcement by updating our routing table.
-
    fn process_inventory(&mut self, inventory: &Inventory, from: PeerId) {
+
    fn process_inventory(&mut self, inventory: &Inventory, from: PeerId, remote: &Url) {
        for proj_id in inventory {
            let inventory = self
                .routing
                .entry(proj_id.clone())
                .or_insert_with(|| HashSet::with_hasher(self.rng.clone().into()));

-
            // TODO: If we're tracking this project, check the refs to see if we need to
-
            // fetch updates from this peer.
+
            if self.config.is_tracking(proj_id) {
+
                // TODO: Verify refs before adding them to storage.
+
                let mut repo = self.storage.repository(proj_id).unwrap();
+
                repo.fetch(&Url {
+
                    path: format!("/{}", proj_id).into(),
+
                    ..remote.clone()
+
                })
+
                .unwrap();
+
            }

+
            // TODO: Fire an event on routing update.
            inventory.insert(from);
        }
    }
@@ -733,6 +743,11 @@ where
}

impl<S, T> Context<S, T> {
+
    /// Get current local timestamp.
+
    pub(crate) fn timestamp(&self) -> Timestamp {
+
        self.clock.local_time().as_secs()
+
    }
+

    /// Connect to a peer.
    fn connect(&mut self, addr: net::SocketAddr) {
        // TODO: Make sure we don't try to connect more than once to the same address.
@@ -816,6 +831,8 @@ pub enum PeerError {
    WrongVersion(u32),
    #[error("invalid inventory timestamp: {0}")]
    InvalidTimestamp(u64),
+
    #[error("peer misbehaved")]
+
    Misbehavior,
}

#[derive(Debug)]
@@ -866,19 +883,22 @@ impl Peer {
        ctx: &mut Context<S, T>,
    ) -> Result<Option<Message>, PeerError>
    where
-
        T: storage::ReadStorage,
+
        T: storage::ReadStorage + storage::WriteStorage,
    {
        if envelope.magic != NETWORK_MAGIC {
            return Err(PeerError::WrongMagic(envelope.magic));
        }
        debug!("Received {:?} from {}", &envelope.msg, self.id());

-
        match envelope.msg {
-
            Message::Hello {
-
                timestamp,
-
                version,
-
                git,
-
            } => {
+
        match (&self.state, envelope.msg) {
+
            (
+
                PeerState::Initial,
+
                Message::Hello {
+
                    timestamp,
+
                    version,
+
                    git,
+
                },
+
            ) => {
                let now = ctx.timestamp();

                if timestamp.abs_diff(now) > MAX_TIME_DELTA.as_secs() {
@@ -887,37 +907,43 @@ impl Peer {
                if version != PROTOCOL_VERSION {
                    return Err(PeerError::WrongVersion(version));
                }
-
                if let PeerState::Initial = self.state {
-
                    // Nb. This is a very primitive handshake. Eventually we should have anyhow
-
                    // extra "acknowledgment" message sent when the `Hello` is well received.
-
                    if self.link.is_inbound() {
-
                        let git = ctx.config.git_url.clone();
-
                        ctx.write_all(
-
                            self.addr,
-
                            [Message::hello(now, git), Message::get_inventory([])],
-
                        );
-
                    }
-
                    // Nb. we don't set the peer timestamp here, since it is going to be
-
                    // set after the first message is received only. Setting it here would
-
                    // mean that messages received right after the handshake could be ignored.
-
                    self.state = PeerState::Negotiated {
-
                        since: ctx.clock.local_time(),
-
                        git,
-
                    };
-
                } else {
-
                    // TODO: Handle misbehavior.
+
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
+
                // extra "acknowledgment" message sent when the `Hello` is well received.
+
                if self.link.is_inbound() {
+
                    let git = ctx.config.git_url.clone();
+
                    ctx.write_all(
+
                        self.addr,
+
                        [Message::hello(now, git), Message::get_inventory([])],
+
                    );
                }
+
                // Nb. we don't set the peer timestamp here, since it is going to be
+
                // set after the first message is received only. Setting it here would
+
                // mean that messages received right after the handshake could be ignored.
+
                self.state = PeerState::Negotiated {
+
                    since: ctx.clock.local_time(),
+
                    git,
+
                };
            }
-
            Message::GetInventory { .. } => {
+
            (PeerState::Initial, _) => {
+
                debug!(
+
                    "Disconnecting peer {} for sending us a message before handshake",
+
                    self.id()
+
                );
+
                return Err(PeerError::Misbehavior);
+
            }
+
            (PeerState::Negotiated { .. }, Message::GetInventory { .. }) => {
                // TODO: Handle partial inventory requests.
                let inventory = Message::inventory(ctx).unwrap();
                ctx.write(self.addr, inventory);
            }
-
            Message::Inventory {
-
                timestamp,
-
                inv,
-
                origin,
-
            } => {
+
            (
+
                PeerState::Negotiated { git, .. },
+
                Message::Inventory {
+
                    timestamp,
+
                    inv,
+
                    origin,
+
                },
+
            ) => {
                let now = ctx.clock.local_time();
                let last = self.timestamp;

@@ -932,7 +958,7 @@ impl Peer {
                } else {
                    return Ok(None);
                }
-
                ctx.process_inventory(&inv, origin.unwrap_or_else(|| self.id()));
+
                ctx.process_inventory(&inv, origin.unwrap_or_else(|| self.id()), git);

                if ctx.config.relay {
                    return Ok(Some(Message::Inventory {
@@ -942,14 +968,24 @@ impl Peer {
                    }));
                }
            }
-
            Message::GetAddrs => {
+
            (PeerState::Negotiated { .. }, Message::GetAddrs) => {
                // TODO: Send peer addresses.
                todo!();
            }
-
            Message::Addrs { .. } => {
+
            (PeerState::Negotiated { .. }, Message::Addrs { .. }) => {
                // TODO: Update address book.
                todo!();
            }
+
            (PeerState::Negotiated { .. }, Message::Hello { .. }) => {
+
                debug!(
+
                    "Disconnecting peer {} for sending us a redundant handshake message",
+
                    self.id()
+
                );
+
                return Err(PeerError::Misbehavior);
+
            }
+
            (PeerState::Disconnected { .. }, msg) => {
+
                debug!("Ignoring {:?} from disconnected peer {}", msg, self.id());
+
            }
        }
        Ok(None)
    }
modified node/src/storage.rs
@@ -116,6 +116,7 @@ impl Remote<Unverified> {
}

pub trait ReadStorage {
+
    fn url(&self) -> Url;
    fn get(&self, proj: &ProjId) -> Result<Option<Remotes<Unverified>>, Error>;
    fn inventory(&self) -> Result<Inventory, Error>;
}
@@ -140,6 +141,10 @@ where
    T: Deref<Target = S>,
    S: ReadStorage,
{
+
    fn url(&self) -> Url {
+
        self.deref().url()
+
    }
+

    fn inventory(&self) -> Result<Inventory, Error> {
        self.deref().inventory()
    }
modified node/src/storage/git.rs
@@ -41,6 +41,14 @@ impl fmt::Debug for Storage {
}

impl ReadStorage for Storage {
+
    fn url(&self) -> Url {
+
        Url {
+
            scheme: git_url::Scheme::File,
+
            host: Some(self.path.to_string_lossy().to_string()),
+
            ..Url::default()
+
        }
+
    }
+

    fn get(&self, _id: &ProjId) -> Result<Option<Remotes<Unverified>>, Error> {
        todo!()
    }
modified node/src/test/peer.rs
@@ -63,7 +63,10 @@ where
    pub fn new(name: &'static str, ip: impl Into<net::IpAddr>, storage: S) -> Self {
        Self::config(
            name,
-
            Config::default(),
+
            Config {
+
                git_url: storage.url(),
+
                ..Config::default()
+
            },
            ip,
            vec![],
            storage,
@@ -143,20 +146,22 @@ where
            .expect("`get-inventory` is sent");
    }

-
    pub fn connect_to(&mut self, remote: &net::SocketAddr) {
+
    pub fn connect_to(&mut self, peer: &Self) {
+
        let remote = simulator::Peer::<Protocol<S>>::addr(peer);
+

        self.initialize();
-
        self.protocol.attempted(remote);
+
        self.protocol.attempted(&remote);
        self.protocol
-
            .connected(*remote, &self.local_addr, Link::Outbound);
+
            .connected(remote, &self.local_addr, Link::Outbound);

-
        let mut msgs = self.messages(remote);
+
        let mut msgs = self.messages(&remote);
        msgs.find(|m| matches!(m, Message::Hello { .. }))
            .expect("`hello` is sent");
        msgs.find(|m| matches!(m, Message::GetInventory { .. }))
            .expect("`get-inventory` is sent");

-
        let git = self.config().git_url.clone();
-
        self.receive(remote, Message::hello(self.local_time().as_secs(), git));
+
        let git = peer.config().git_url.clone();
+
        self.receive(&remote, Message::hello(self.local_time().as_secs(), git));
    }

    /// Drain outgoing messages sent from this peer to the remote address.
modified node/src/test/storage.rs
@@ -23,6 +23,14 @@ impl MockStorage {
}

impl ReadStorage for MockStorage {
+
    fn url(&self) -> Url {
+
        Url {
+
            scheme: git_url::Scheme::Radicle,
+
            host: Some("mock".to_string()),
+
            ..Url::default()
+
        }
+
    }
+

    fn get(&self, proj: &ProjId) -> Result<Option<Remotes<Unverified>>, Error> {
        if let Some((_, refs)) = self.inventory.iter().find(|(id, _)| id == proj) {
            return Ok(Some(refs.clone()));
@@ -45,7 +53,7 @@ impl WriteStorage for MockStorage {
    type Repository = MockRepository;

    fn repository(&self, _proj: &ProjId) -> Result<Self::Repository, Error> {
-
        todo!()
+
        Ok(MockRepository {})
    }
}

@@ -53,7 +61,7 @@ pub struct MockRepository {}

impl WriteRepository for MockRepository {
    fn fetch(&mut self, _url: &Url) -> Result<(), git2::Error> {
-
        todo!()
+
        Ok(())
    }

    fn namespace(
modified node/src/test/tests.rs
@@ -31,8 +31,8 @@ fn test_outbound_connection() {
    let bob = Peer::new("bob", [9, 9, 9, 9], MockStorage::empty());
    let eve = Peer::new("eve", [7, 7, 7, 7], MockStorage::empty());

-
    alice.connect_to(&bob.addr());
-
    alice.connect_to(&eve.addr());
+
    alice.connect_to(&bob);
+
    alice.connect_to(&eve);

    let peers = alice
        .protocol
@@ -120,20 +120,42 @@ fn test_wrong_peer_magic() {
}

#[test]
-
fn test_inventory_fetch() {
-
    let mut alice = Peer::new("alice", [7, 7, 7, 7], MockStorage::empty());
-
    let bob = Peer::new("bob", [8, 8, 8, 8], MockStorage::empty());
+
fn test_inventory_sync() {
+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = Peer::new(
+
        "alice",
+
        [7, 7, 7, 7],
+
        Storage::open(tmp.path().join("alice")).unwrap(),
+
    );
+
    let bob_storage = fixtures::storage(tmp.path().join("bob"));
+
    let bob = Peer::new("bob", [8, 8, 8, 8], bob_storage);
    let now = LocalTime::now().as_secs();
+
    let projs = bob.storage().inventory().unwrap();

-
    alice.connect_to(&bob.addr());
+
    alice.connect_to(&bob);
    alice.receive(
        &bob.addr(),
        Message::Inventory {
            timestamp: now,
-
            inv: vec![],
+
            inv: projs.clone(),
            origin: None,
        },
    );
+

+
    for proj in &projs {
+
        let providers = alice.routing().get(proj).unwrap();
+
        assert!(providers.contains(&bob.ip));
+
    }
+

+
    let a = alice
+
        .storage()
+
        .inventory()
+
        .unwrap()
+
        .into_iter()
+
        .collect::<HashSet<_>>();
+
    let b = projs.into_iter().collect::<HashSet<_>>();
+

+
    assert_eq!(a, b);
}

#[test]
@@ -143,7 +165,7 @@ fn test_inventory_relay_bad_timestamp() {
    let two_hours = 3600 * 2;
    let timestamp = alice.local_time.as_secs() - two_hours;

-
    alice.connect_to(&bob.addr());
+
    alice.connect_to(&bob);
    alice.receive(
        &bob.addr(),
        Message::Inventory {
@@ -169,7 +191,7 @@ fn test_inventory_relay() {
    let now = LocalTime::now().as_secs();

    // Inventory from Bob relayed to Eve.
-
    alice.connect_to(&bob.addr());
+
    alice.connect_to(&bob);
    alice.connect_from(&eve.addr());
    alice.receive(
        &bob.addr(),