Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Re-think gossip messages and signing
Alexis Sellier committed 3 years ago
commit 651513c757f34898abd5d0dbedc1ddd4a1ac66a8
parent b265f1b4560d3682bcd9d17c1032ac6b18381f49
16 files changed +533 -382
modified node/src/client.rs
@@ -43,9 +43,10 @@ impl Default for Config {
    }
}

-
pub struct Client<R: Reactor> {
+
pub struct Client<R: Reactor, G: Signer> {
    reactor: R,
    storage: Storage,
+
    signer: G,

    handle: chan::Sender<protocol::Command>,
    commands: chan::Receiver<protocol::Command>,
@@ -54,20 +55,18 @@ pub struct Client<R: Reactor> {
    events: Events,
}

-
impl<R: Reactor> Client<R> {
-
    pub fn new<P: AsRef<Path>, S: Signer + 'static>(
-
        path: P,
-
        signer: S,
-
    ) -> Result<Self, nakamoto_net::error::Error> {
+
impl<R: Reactor, G: Signer> Client<R, G> {
+
    pub fn new<P: AsRef<Path>>(path: P, signer: G) -> Result<Self, nakamoto_net::error::Error> {
        let (handle, commands) = chan::unbounded::<protocol::Command>();
        let (shutdown, shutdown_recv) = chan::bounded(1);
        let (listening_send, listening) = chan::bounded(1);
        let reactor = R::new(shutdown_recv, listening_send)?;
-
        let storage = Storage::open(path, signer)?;
+
        let storage = Storage::open(path)?;
        let events = Events {};

        Ok(Self {
            storage,
+
            signer,
            reactor,
            handle,
            commands,
@@ -82,7 +81,7 @@ impl<R: Reactor> Client<R> {
        let rng = fastrand::Rng::new();
        let time = LocalTime::now();
        let storage = self.storage;
-
        let signer = storage.signer();
+
        let signer = self.signer;
        let addresses = HashMap::with_hasher(rng.clone().into());

        log::info!("Initializing client ({:?})..", network);
modified node/src/client/handle.rs
@@ -77,7 +77,7 @@ impl<W: Waker> traits::Handle for Handle<W> {

    /// Notify the client that a project has been updated.
    fn updated(&self, id: Id) -> Result<(), Error> {
-
        self.command(protocol::Command::AnnounceRefsUpdate(id))
+
        self.command(protocol::Command::AnnounceRefs(id))
    }

    /// Send a command to the command channel, and wake up the event loop.
modified node/src/main.rs
@@ -21,7 +21,7 @@ impl Signer for FailingSigner {

fn main() -> anyhow::Result<()> {
    let signer = FailingSigner {};
-
    let client = client::Client::<Reactor>::new(Path::new("."), signer)?;
+
    let client = client::Client::<Reactor, _>::new(Path::new("."), signer)?;
    let handle = client.handle();
    let config = client::Config::default();
    let socket = env::var("RAD_SOCKET").unwrap_or_else(|_| control::DEFAULT_SOCKET_NAME.to_owned());
modified node/src/protocol.rs
@@ -24,7 +24,7 @@ use crate::collections::{HashMap, HashSet};
use crate::crypto;
use crate::identity::{Id, Project};
use crate::protocol::config::ProjectTracking;
-
use crate::protocol::message::Message;
+
use crate::protocol::message::{Message, NodeAnnouncement, RefsAnnouncement};
use crate::protocol::peer::{Peer, PeerError, PeerState};
use crate::protocol::wire::Encode;
use crate::storage;
@@ -32,6 +32,8 @@ use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, Writ

pub use crate::protocol::config::{Config, Network};

+
use self::message::{InventoryAnnouncement, NodeFeatures};
+

pub const DEFAULT_PORT: u16 = 8776;
pub const PROTOCOL_VERSION: u32 = 1;
pub const TARGET_OUTBOUND_PEERS: usize = 8;
@@ -102,7 +104,7 @@ pub enum FetchResult {
/// Commands sent to the protocol by the operator.
#[derive(Debug)]
pub enum Command {
-
    AnnounceRefsUpdate(Id),
+
    AnnounceRefs(Id),
    Connect(net::SocketAddr),
    Fetch(Id, chan::Sender<FetchLookup>),
    Track(Id, chan::Sender<bool>),
@@ -232,6 +234,16 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco
        &mut self.context.storage
    }

+
    /// Get a project from storage, using the local node's key.
+
    pub fn get(&self, proj: &Id) -> Result<Option<Project>, storage::Error> {
+
        self.storage.get(&self.node_id(), proj)
+
    }
+

+
    /// Get the local signer.
+
    pub fn signer(&self) -> &G {
+
        &self.context.signer
+
    }
+

    /// Get the local protocol time.
    pub fn local_time(&self) -> LocalTime {
        self.context.clock.local_time()
@@ -254,7 +266,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco

    pub fn lookup(&self, id: &Id) -> Lookup {
        Lookup {
-
            local: self.context.storage.get(id).unwrap(),
+
            local: self.context.storage.get(&self.node_id(), id).unwrap(),
            remote: self
                .context
                .routing
@@ -269,7 +281,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
-
        let inv = Message::inventory(&self.context)?;
+
        let inv = Message::inventory(self.context.inventory_announcement()?, &self.context.signer);

        for addr in self.peers.negotiated().map(|(_, p)| p.addr) {
            self.context.write(addr, inv.clone());
@@ -277,27 +289,6 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Protoco
        Ok(())
    }

-
    fn get_inventories(&mut self) -> Result<(), storage::Error> {
-
        let mut msgs = Vec::new();
-
        for id in self.tracked()? {
-
            for (_, peer) in self.seeds(&id) {
-
                if peer.is_negotiated() {
-
                    msgs.push((
-
                        peer.addr,
-
                        Message::GetInventory {
-
                            ids: vec![id.clone()],
-
                        },
-
                    ));
-
                }
-
            }
-
        }
-
        for (remote, msg) in msgs {
-
            self.write(remote, msg);
-
        }
-

-
        Ok(())
-
    }
-

    fn prune_routing_entries(&mut self) {
        // TODO
    }
@@ -357,7 +348,7 @@ where
        if now - self.last_sync >= SYNC_INTERVAL {
            debug!("Running 'sync' task...");

-
            self.get_inventories().unwrap();
+
            // TODO: What do we do here?
            self.context.io.push_back(Io::Wakeup(SYNC_INTERVAL));
            self.last_sync = now;
        }
@@ -451,15 +442,23 @@ where
            Command::Untrack(id, resp) => {
                resp.send(self.untrack(id)).ok();
            }
-
            Command::AnnounceRefsUpdate(id) => {
-
                let signer = *self.storage.public_key();
+
            Command::AnnounceRefs(id) => {
+
                let node = self.node_id();
                let repo = self.storage.repository(&id).unwrap();
-
                let remote = repo.remote(&signer).unwrap();
+
                let remote = repo.remote(&node).unwrap();
                let peers = self.peers.negotiated().map(|(_, p)| p.addr);
-
                let refs = remote.refs.unverified();
-

-
                self.context
-
                    .broadcast(Message::RefsUpdate { id, signer, refs }, peers);
+
                let refs = remote.refs.into();
+
                let message = RefsAnnouncement { id, refs };
+
                let signature = message.sign(&self.signer);
+

+
                self.context.broadcast(
+
                    Message::RefsAnnouncement {
+
                        node,
+
                        message,
+
                        signature,
+
                    },
+
                    peers,
+
                );
            }
        }
    }
@@ -489,21 +488,11 @@ where
        // For inbound connections, we wait for the remote to say "Hello" first.
        // TODO: How should we deal with multiple peers connecting from the same IP address?
        if link.is_outbound() {
-
            let git = self.config.git_url.clone();
+
            // TODO: Refactor this so that we don't create messages if the peer isn't found.
+
            let messages = self.handshake_messages();

            if let Some(peer) = self.peers.get_mut(&ip) {
-
                self.context.write_all(
-
                    peer.addr,
-
                    [
-
                        Message::hello(
-
                            self.context.id(),
-
                            self.context.timestamp(),
-
                            self.context.config.listen.clone(),
-
                            git,
-
                        ),
-
                        Message::get_inventory([]),
-
                    ],
-
                );
+
                self.context.write_all(peer.addr, messages);
                peer.connected();
            }
        } else {
@@ -695,7 +684,7 @@ where
    T: storage::ReadStorage,
    G: crypto::Signer,
{
-
    pub(crate) fn id(&self) -> NodeId {
+
    pub(crate) fn node_id(&self) -> NodeId {
        *self.signer.public_key()
    }
}
@@ -725,6 +714,51 @@ where
        }
    }

+
    fn node_announcement(&self) -> NodeAnnouncement {
+
        let timestamp = self.timestamp();
+
        let features = NodeFeatures::default();
+
        let alias = self.alias();
+
        let addresses = vec![]; // TODO
+

+
        NodeAnnouncement {
+
            features,
+
            timestamp,
+
            alias,
+
            addresses,
+
        }
+
    }
+

+
    fn inventory_announcement(&self) -> Result<InventoryAnnouncement, storage::Error> {
+
        let timestamp = self.timestamp();
+
        let inventory = self.storage.inventory()?;
+

+
        Ok(InventoryAnnouncement {
+
            inventory,
+
            timestamp,
+
        })
+
    }
+

+
    fn handshake_messages(&self) -> [Message; 3] {
+
        let git = self.config.git_url.clone();
+
        [
+
            Message::hello(
+
                self.node_id(),
+
                self.timestamp(),
+
                self.config.listen.clone(),
+
                git,
+
            ),
+
            Message::node(self.node_announcement(), &self.signer),
+
            Message::inventory(self.inventory_announcement().unwrap(), &self.signer),
+
        ]
+
    }
+

+
    fn alias(&self) -> [u8; 32] {
+
        let mut alias = [0u8; 32];
+

+
        alias[..9].copy_from_slice("anonymous".as_bytes());
+
        alias
+
    }
+

    /// Process a peer inventory announcement by updating our routing table.
    fn process_inventory(&mut self, inventory: &Inventory, from: NodeId, remote: &Url) {
        for proj_id in inventory {
modified node/src/protocol/message.rs
@@ -1,4 +1,4 @@
-
use std::{io, net};
+
use std::{fmt, io, net};

use byteorder::{NetworkEndian, ReadBytesExt};

@@ -6,9 +6,8 @@ use crate::crypto;
use crate::git;
use crate::identity::Id;
use crate::protocol::wire;
-
use crate::protocol::{Context, NodeId, Timestamp, PROTOCOL_VERSION};
-
use crate::storage;
-
use crate::storage::refs::SignedRefs;
+
use crate::protocol::{NodeId, Timestamp, PROTOCOL_VERSION};
+
use crate::storage::refs::Refs;

/// Message envelope. All messages sent over the network are wrapped in this type.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -31,10 +30,9 @@ pub struct Hostname(String);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MessageType {
    Hello = 0,
-
    Node = 2,
-
    GetInventory = 4,
-
    Inventory = 6,
-
    RefsUpdate = 8,
+
    NodeAnnouncement = 2,
+
    InventoryAnnouncement = 4,
+
    RefsAnnouncement = 6,
}

impl From<MessageType> for u16 {
@@ -49,10 +47,9 @@ impl TryFrom<u16> for MessageType {
    fn try_from(other: u16) -> Result<Self, Self::Error> {
        match other {
            0 => Ok(MessageType::Hello),
-
            2 => Ok(MessageType::Node),
-
            4 => Ok(MessageType::GetInventory),
-
            6 => Ok(MessageType::Inventory),
-
            8 => Ok(MessageType::RefsUpdate),
+
            2 => Ok(MessageType::NodeAnnouncement),
+
            4 => Ok(MessageType::InventoryAnnouncement),
+
            6 => Ok(MessageType::RefsAnnouncement),
            _ => Err(other),
        }
    }
@@ -197,8 +194,6 @@ impl wire::Decode for Address {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeAnnouncement {
-
    /// Node identifier.
-
    pub id: NodeId,
    /// Advertized features.
    pub features: NodeFeatures,
    /// Monotonic timestamp.
@@ -211,9 +206,9 @@ pub struct NodeAnnouncement {

impl NodeAnnouncement {
    /// Verify a signature on this message.
-
    pub fn verify(&self, signature: &crypto::Signature) -> bool {
+
    pub fn verify(&self, signer: &NodeId, signature: &crypto::Signature) -> bool {
        let msg = wire::serialize(self);
-
        self.id.verify(signature, &msg).is_ok()
+
        signer.verify(signature, &msg).is_ok()
    }
}

@@ -221,7 +216,6 @@ impl wire::Encode for NodeAnnouncement {
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
        let mut n = 0;

-
        n += self.id.encode(writer)?;
        n += self.features.encode(writer)?;
        n += self.timestamp.encode(writer)?;
        n += self.alias.encode(writer)?;
@@ -233,14 +227,12 @@ impl wire::Encode for NodeAnnouncement {

impl wire::Decode for NodeAnnouncement {
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
-
        let id = NodeId::decode(reader)?;
        let features = NodeFeatures::decode(reader)?;
        let timestamp = Timestamp::decode(reader)?;
        let alias = wire::Decode::decode(reader)?;
        let addresses = Vec::<Address>::decode(reader)?;

        Ok(Self {
-
            id,
            features,
            timestamp,
            alias,
@@ -249,9 +241,88 @@ impl wire::Decode for NodeAnnouncement {
    }
}

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct RefsAnnouncement {
+
    /// Repository identifier.
+
    pub id: Id,
+
    /// Updated refs.
+
    pub refs: Refs,
+
}
+

+
impl RefsAnnouncement {
+
    /// Verify a signature on this message.
+
    pub fn verify(&self, signer: &NodeId, signature: &crypto::Signature) -> bool {
+
        let msg = wire::serialize(self);
+
        signer.verify(signature, &msg).is_ok()
+
    }
+

+
    /// Sign this announcement.
+
    pub fn sign<S: crypto::Signer>(&self, signer: S) -> crypto::Signature {
+
        let msg = wire::serialize(self);
+
        signer.sign(&msg)
+
    }
+
}
+

+
impl wire::Encode for RefsAnnouncement {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.id.encode(writer)?;
+
        n += self.refs.encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for RefsAnnouncement {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let id = Id::decode(reader)?;
+
        let refs = Refs::decode(reader)?;
+

+
        Ok(Self { id, refs })
+
    }
+
}
+

+
#[derive(Debug, Clone, PartialEq, Eq)]
+
pub struct InventoryAnnouncement {
+
    pub inventory: Vec<Id>,
+
    pub timestamp: Timestamp,
+
}
+

+
impl InventoryAnnouncement {
+
    /// Verify a signature on this message.
+
    pub fn verify(&self, signer: NodeId, signature: &crypto::Signature) -> bool {
+
        let msg = wire::serialize(self);
+
        signer.verify(signature, &msg).is_ok()
+
    }
+
}
+

+
impl wire::Encode for InventoryAnnouncement {
+
    fn encode<W: io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, io::Error> {
+
        let mut n = 0;
+

+
        n += self.inventory.as_slice().encode(writer)?;
+
        n += self.timestamp.encode(writer)?;
+

+
        Ok(n)
+
    }
+
}
+

+
impl wire::Decode for InventoryAnnouncement {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, wire::Error> {
+
        let inventory = Vec::<Id>::decode(reader)?;
+
        let timestamp = Timestamp::decode(reader)?;
+

+
        Ok(Self {
+
            inventory,
+
            timestamp,
+
        })
+
    }
+
}
+

/// Message payload.
/// These are the messages peers send to each other.
-
#[derive(Debug, Clone, PartialEq, Eq)]
+
#[derive(Clone, PartialEq, Eq)]
pub enum Message {
    /// Say hello to a peer. This is the first message sent to a peer after connection.
    Hello {
@@ -262,30 +333,36 @@ pub enum Message {
        addrs: Vec<Address>,
        git: git::Url,
    },
-
    Node {
+

+
    /// Node announcing its inventory to the network.
+
    /// This should be the whole inventory every time.
+
    InventoryAnnouncement {
+
        /// Node identifier.
+
        node: NodeId,
+
        /// Unsigned node inventory.
+
        message: InventoryAnnouncement,
+
        /// Signature over the announcement.
+
        signature: crypto::Signature,
+
    },
+

+
    /// Node announcing itself to the network.
+
    NodeAnnouncement {
+
        /// Node identifier.
+
        node: NodeId,
        /// Unsigned node announcement.
-
        announcement: NodeAnnouncement,
+
        message: NodeAnnouncement,
        /// Signature over the announcement, by the node being announced.
        signature: crypto::Signature,
    },
-
    /// Get a peer's inventory.
-
    GetInventory { ids: Vec<Id> },
-
    /// Send our inventory to a peer. Sent in response to [`Message::GetInventory`].
-
    /// Nb. This should be the whole inventory, not a partial update.
-
    Inventory {
+

+
    /// Node announcing project refs being created or updated.
+
    RefsAnnouncement {
+
        /// Node identifier.
        node: NodeId,
-
        inv: Vec<Id>,
-
        timestamp: Timestamp,
-
    },
-
    /// Project refs were updated. Includes the signature of the user who updated
-
    /// their view of the project.
-
    RefsUpdate {
-
        /// Project under which the refs were updated.
-
        id: Id,
-
        /// Signing key.
-
        signer: crypto::PublicKey,
-
        /// Updated refs.
-
        refs: SignedRefs<crypto::Unverified>,
+
        /// Unsigned refs announcement.
+
        message: RefsAnnouncement,
+
        /// Signature over the announcement, by the node that updated the refs.
+
        signature: crypto::Signature,
    },
}

@@ -300,47 +377,71 @@ impl Message {
        }
    }

-
    pub fn node<S: crypto::Signer>(announcement: NodeAnnouncement, signer: S) -> Self {
-
        let msg = wire::serialize(&announcement);
+
    pub fn node<S: crypto::Signer>(message: NodeAnnouncement, signer: S) -> Self {
+
        let msg = wire::serialize(&message);
        let signature = signer.sign(&msg);
+
        let node = *signer.public_key();

-
        Self::Node {
+
        Self::NodeAnnouncement {
+
            node,
            signature,
-
            announcement,
+
            message,
        }
    }

-
    pub fn inventory<S, T, G>(ctx: &Context<S, T, G>) -> Result<Self, storage::Error>
-
    where
-
        T: storage::ReadStorage,
-
        G: crypto::Signer,
-
    {
-
        let timestamp = ctx.timestamp();
-
        let inv = ctx.storage.inventory()?;
-

-
        Ok(Self::Inventory {
-
            node: ctx.id(),
-
            inv,
-
            timestamp,
-
        })
-
    }
+
    pub fn inventory<S: crypto::Signer>(message: InventoryAnnouncement, signer: S) -> Self {
+
        let msg = wire::serialize(&message);
+
        let signature = signer.sign(&msg);
+
        let node = *signer.public_key();

-
    pub fn get_inventory(ids: impl Into<Vec<Id>>) -> Self {
-
        Self::GetInventory { ids: ids.into() }
+
        Self::InventoryAnnouncement {
+
            node,
+
            signature,
+
            message,
+
        }
    }

    pub fn type_id(&self) -> u16 {
        match self {
            Self::Hello { .. } => MessageType::Hello,
-
            Self::Node { .. } => MessageType::Node,
-
            Self::GetInventory { .. } => MessageType::GetInventory,
-
            Self::Inventory { .. } => MessageType::Inventory,
-
            Self::RefsUpdate { .. } => MessageType::RefsUpdate,
+
            Self::NodeAnnouncement { .. } => MessageType::NodeAnnouncement,
+
            Self::InventoryAnnouncement { .. } => MessageType::InventoryAnnouncement,
+
            Self::RefsAnnouncement { .. } => MessageType::RefsAnnouncement,
        }
        .into()
    }
}

+
impl fmt::Debug for Message {
+
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+
        match self {
+
            Self::Hello { id, .. } => write!(f, "Hello({})", id),
+
            Self::NodeAnnouncement { node, .. } => write!(f, "NodeAnnouncement({})", node),
+
            Self::InventoryAnnouncement { node, message, .. } => {
+
                write!(
+
                    f,
+
                    "InventoryAnnouncement({}, [{}], {})",
+
                    node,
+
                    message
+
                        .inventory
+
                        .iter()
+
                        .map(|i| i.to_string())
+
                        .collect::<Vec<String>>()
+
                        .join(", "),
+
                    message.timestamp
+
                )
+
            }
+
            Self::RefsAnnouncement { node, message, .. } => {
+
                write!(
+
                    f,
+
                    "RefsAnnouncement({}, {}, {:?})",
+
                    node, message.id, message.refs
+
                )
+
            }
+
        }
+
    }
+
}
+

impl wire::Encode for Message {
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
        let mut n = self.type_id().encode(writer)?;
@@ -359,28 +460,31 @@ impl wire::Encode for Message {
                n += addrs.as_slice().encode(writer)?;
                n += git.encode(writer)?;
            }
-
            Self::RefsUpdate { id, signer, refs } => {
-
                n += id.encode(writer)?;
-
                n += signer.encode(writer)?;
-
                n += refs.encode(writer)?;
-
            }
-
            Self::GetInventory { ids } => {
-
                n += ids.as_slice().encode(writer)?;
+
            Self::RefsAnnouncement {
+
                node,
+
                message,
+
                signature,
+
            } => {
+
                n += node.encode(writer)?;
+
                n += message.encode(writer)?;
+
                n += signature.encode(writer)?;
            }
-
            Self::Inventory {
+
            Self::InventoryAnnouncement {
                node,
-
                inv,
-
                timestamp,
+
                message,
+
                signature,
            } => {
                n += node.encode(writer)?;
-
                n += inv.as_slice().encode(writer)?;
-
                n += timestamp.encode(writer)?;
+
                n += message.encode(writer)?;
+
                n += signature.encode(writer)?;
            }
-
            Self::Node {
-
                announcement,
+
            Self::NodeAnnouncement {
+
                node,
+
                message,
                signature,
            } => {
-
                n += announcement.encode(writer)?;
+
                n += node.encode(writer)?;
+
                n += message.encode(writer)?;
                n += signature.encode(writer)?;
            }
        }
@@ -408,37 +512,38 @@ impl wire::Decode for Message {
                    git,
                })
            }
-
            Ok(MessageType::Node) => {
-
                let announcement = NodeAnnouncement::decode(reader)?;
+
            Ok(MessageType::NodeAnnouncement) => {
+
                let node = NodeId::decode(reader)?;
+
                let message = NodeAnnouncement::decode(reader)?;
                let signature = crypto::Signature::decode(reader)?;

-
                Ok(Self::Node {
-
                    announcement,
+
                Ok(Self::NodeAnnouncement {
+
                    node,
+
                    message,
                    signature,
                })
            }
-
            Ok(MessageType::GetInventory) => {
-
                let ids = Vec::<Id>::decode(reader)?;
-

-
                Ok(Self::GetInventory { ids })
-
            }
-
            Ok(MessageType::Inventory) => {
+
            Ok(MessageType::InventoryAnnouncement) => {
                let node = NodeId::decode(reader)?;
-
                let inv = Vec::<Id>::decode(reader)?;
-
                let timestamp = Timestamp::decode(reader)?;
+
                let message = InventoryAnnouncement::decode(reader)?;
+
                let signature = crypto::Signature::decode(reader)?;

-
                Ok(Self::Inventory {
+
                Ok(Self::InventoryAnnouncement {
                    node,
-
                    inv,
-
                    timestamp,
+
                    message,
+
                    signature,
                })
            }
-
            Ok(MessageType::RefsUpdate) => {
-
                let id = Id::decode(reader)?;
-
                let signer = crypto::PublicKey::decode(reader)?;
-
                let refs = SignedRefs::decode(reader)?;
+
            Ok(MessageType::RefsAnnouncement) => {
+
                let node = NodeId::decode(reader)?;
+
                let message = RefsAnnouncement::decode(reader)?;
+
                let signature = crypto::Signature::decode(reader)?;

-
                Ok(Self::RefsUpdate { id, signer, refs })
+
                Ok(Self::RefsAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                })
            }
            Err(other) => Err(wire::Error::UnknownMessageType(other)),
        }
@@ -450,8 +555,10 @@ mod tests {
    use super::*;
    use quickcheck_macros::quickcheck;

+
    use crate::crypto::Signer;
    use crate::decoder::Decoder;
    use crate::protocol::wire::{self, Encode};
+
    use crate::test::crypto::MockSigner;

    #[quickcheck]
    fn prop_message_encode_decode(message: Message) {
@@ -494,4 +601,13 @@ mod tests {
            addr
        );
    }
+

+
    #[quickcheck]
+
    fn prop_refs_announcement_signing(id: Id, refs: Refs) {
+
        let signer = MockSigner::new(&mut fastrand::Rng::new());
+
        let message = RefsAnnouncement { id, refs };
+
        let signature = message.sign(&signer);
+

+
        assert!(message.verify(signer.public_key(), &signature));
+
    }
}
modified node/src/protocol/peer.rs
@@ -131,14 +131,7 @@ impl Peer {
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
                // extra "acknowledgment" message sent when the `Hello` is well received.
                if self.link.is_inbound() {
-
                    let git = ctx.config.git_url.clone();
-
                    ctx.write_all(
-
                        self.addr,
-
                        [
-
                            Message::hello(ctx.id(), now, ctx.config.listen.clone(), git),
-
                            Message::get_inventory([]),
-
                        ],
-
                    );
+
                    ctx.write_all(self.addr, ctx.handshake_messages());
                }
                // Nb. we don't set the peer timestamp here, since it is going to be
                // set after the first message is received only. Setting it here would
@@ -157,64 +150,66 @@ impl Peer {
                );
                return Err(PeerError::Misbehavior);
            }
-
            (PeerState::Negotiated { .. }, Message::GetInventory { .. }) => {
-
                // TODO: Handle partial inventory requests.
-
                let inventory = Message::inventory(ctx).unwrap();
-
                ctx.write(self.addr, inventory);
-
            }
            (
                PeerState::Negotiated { git, .. },
-
                Message::Inventory {
+
                Message::InventoryAnnouncement {
                    node,
-
                    timestamp,
-
                    inv,
+
                    message,
+
                    signature,
                },
            ) => {
                let now = ctx.clock.local_time();
                let last = self.timestamp;

                // Don't allow messages from too far in the past or future.
-
                if timestamp.abs_diff(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(PeerError::InvalidTimestamp(timestamp));
+
                if message.timestamp.abs_diff(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
+
                    return Err(PeerError::InvalidTimestamp(message.timestamp));
                }
                // Discard inventory messages we've already seen, otherwise update
                // out last seen time.
-
                if timestamp > last {
-
                    self.timestamp = timestamp;
+
                if message.timestamp > last {
+
                    self.timestamp = message.timestamp;
                } else {
                    return Ok(None);
                }
-
                ctx.process_inventory(&inv, node, git);
+
                ctx.process_inventory(&message.inventory, node, git);

                if ctx.config.relay {
-
                    return Ok(Some(Message::Inventory {
+
                    return Ok(Some(Message::InventoryAnnouncement {
                        node,
-
                        inv,
-
                        timestamp,
+
                        message,
+
                        signature,
                    }));
                }
            }
            // Process a peer inventory update announcement by (maybe) fetching.
-
            (PeerState::Negotiated { git, .. }, Message::RefsUpdate { id, signer, refs }) => {
-
                if let Ok(refs) = refs.verified(&signer) {
+
            (
+
                PeerState::Negotiated { git, .. },
+
                Message::RefsAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                },
+
            ) => {
+
                if message.verify(&node, &signature) {
                    // TODO: Buffer/throttle fetches.
                    // TODO: Check that we're tracking this user as well.
-
                    if ctx.config.is_tracking(&id) {
+
                    if ctx.config.is_tracking(&message.id) {
                        // TODO: Check refs to see if we should try to fetch or not.
-
                        let updated_refs = ctx.fetch(&id, git);
+
                        let updated_refs = ctx.fetch(&message.id, git);
                        let is_updated = !updated_refs.is_empty();

                        ctx.io.push_back(Io::Event(Event::RefsFetched {
                            from: git.clone(),
-
                            project: id.clone(),
+
                            project: message.id.clone(),
                            updated: updated_refs,
                        }));

                        if is_updated {
-
                            return Ok(Some(Message::RefsUpdate {
-
                                id,
-
                                signer,
-
                                refs: refs.unverified(),
+
                            return Ok(Some(Message::RefsAnnouncement {
+
                                node,
+
                                message,
+
                                signature,
                            }));
                        }
                    }
@@ -224,15 +219,16 @@ impl Peer {
            }
            (
                PeerState::Negotiated { .. },
-
                Message::Node {
-
                    announcement,
+
                Message::NodeAnnouncement {
+
                    node,
+
                    message,
                    signature,
                },
            ) => {
-
                if !announcement.verify(&signature) {
+
                if !message.verify(&node, &signature) {
                    return Err(PeerError::Misbehavior);
                }
-
                todo!();
+
                log::warn!("Node announcement handling is not implemented");
            }
            (PeerState::Negotiated { .. }, Message::Hello { .. }) => {
                debug!(
modified node/src/rad.rs
@@ -4,12 +4,12 @@ use std::path::Path;
use nonempty::NonEmpty;
use thiserror::Error;

-
use crate::crypto::Verified;
+
use crate::crypto::{Signer, Verified};
use crate::git;
use crate::identity::Id;
use crate::storage::git::RADICLE_ID_REF;
use crate::storage::refs::SignedRefs;
-
use crate::storage::{BranchName, ReadRepository as _, WriteRepository as _};
+
use crate::storage::{BranchName, ReadRepository as _, RemoteId, WriteRepository as _};
use crate::{identity, storage};

pub const REMOTE_NAME: &str = "rad";
@@ -33,14 +33,15 @@ pub enum InitError {
}

/// Initialize a new radicle project from a git repository.
-
pub fn init<'r, S: storage::WriteStorage<'r>>(
+
pub fn init<'r, G: Signer, S: storage::WriteStorage<'r>>(
    repo: &git2::Repository,
    name: &str,
    description: &str,
    default_branch: BranchName,
+
    signer: G,
    storage: S,
) -> Result<(Id, SignedRefs<Verified>), InitError> {
-
    let pk = storage.public_key();
+
    let pk = signer.public_key();
    let delegate = identity::Delegate {
        // TODO: Use actual user name.
        name: String::from("anonymous"),
@@ -102,7 +103,7 @@ pub fn init<'r, S: storage::WriteStorage<'r>>(
        )],
        None,
    )?;
-
    let signed = storage.sign_refs(&project)?;
+
    let signed = storage.sign_refs(&project, signer)?;

    Ok((id, signed))
}
@@ -121,24 +122,24 @@ pub enum CheckoutError {
/// This effectively does a `git-clone` from storage.
pub fn checkout<P: AsRef<Path>, S: storage::ReadStorage>(
    proj: &Id,
+
    remote: &RemoteId,
    path: P,
    storage: S,
) -> Result<git2::Repository, CheckoutError> {
    // TODO: Decide on whether we can use `clone_local`
    // TODO: Look into sharing object databases.
    let project = storage
-
        .get(proj)?
+
        .get(remote, proj)?
        .ok_or_else(|| CheckoutError::NotFound(proj.clone()))?;

    let mut opts = git2::RepositoryInitOptions::new();
    opts.no_reinit(true).description(&project.doc.description);

    let repo = git2::Repository::init_opts(path, &opts)?;
-
    let remote_id = storage.public_key();
    let default_branch = project.doc.default_branch.as_str();

    // Configure and fetch all refs from remote.
-
    git::configure_remote(&repo, REMOTE_NAME, remote_id, &project.path)?.fetch::<&str>(
+
    git::configure_remote(&repo, REMOTE_NAME, remote, &project.path)?.fetch::<&str>(
        &[],
        None,
        None,
@@ -155,7 +156,7 @@ pub fn checkout<P: AsRef<Path>, S: storage::ReadStorage>(
            &repo,
            REMOTE_NAME,
            default_branch,
-
            &format!("refs/remotes/{remote_id}/heads/{default_branch}"),
+
            &format!("refs/remotes/{remote}/heads/{default_branch}"),
        )?;
    }

@@ -174,22 +175,24 @@ mod tests {
    fn test_init() {
        let tempdir = tempfile::tempdir().unwrap();
        let signer = crypto::MockSigner::default();
-
        let mut storage = Storage::open(tempdir.path().join("storage"), signer).unwrap();
+
        let public_key = *signer.public_key();
+
        let mut storage = Storage::open(tempdir.path().join("storage")).unwrap();
        let repo = fixtures::repository(tempdir.path().join("working"));

-
        let (id, refs) = init(
+
        let (proj, refs) = init(
            &repo,
            "acme",
            "Acme's repo",
            BranchName::from("master"),
+
            &signer,
            &mut storage,
        )
        .unwrap();

-
        let project = storage.get(&id).unwrap().unwrap();
+
        let project = storage.get(&public_key, &proj).unwrap().unwrap();

-
        assert_eq!(project.remotes[storage.public_key()].refs, refs);
-
        assert_eq!(project.id, id);
+
        assert_eq!(project.remotes[&public_key].refs, refs);
+
        assert_eq!(project.id, proj);
        assert_eq!(project.doc.name, "acme");
        assert_eq!(project.doc.description, "Acme's repo");
        assert_eq!(project.doc.default_branch, BranchName::from("master"));
@@ -197,7 +200,7 @@ mod tests {
            project.doc.delegates.first(),
            &Delegate {
                name: String::from("anonymous"),
-
                id: Did::from(*storage.public_key()),
+
                id: Did::from(public_key),
            }
        );
    }
@@ -206,7 +209,8 @@ mod tests {
    fn test_checkout() {
        let tempdir = tempfile::tempdir().unwrap();
        let signer = crypto::MockSigner::default();
-
        let mut storage = Storage::open(tempdir.path().join("storage"), signer).unwrap();
+
        let remote_id = signer.public_key();
+
        let mut storage = Storage::open(tempdir.path().join("storage")).unwrap();
        let original = fixtures::repository(tempdir.path().join("original"));

        let (id, _) = init(
@@ -214,11 +218,12 @@ mod tests {
            "acme",
            "Acme's repo",
            BranchName::from("master"),
+
            &signer,
            &mut storage,
        )
        .unwrap();

-
        let copy = checkout(&id, tempdir.path().join("copy"), &mut storage).unwrap();
+
        let copy = checkout(&id, remote_id, tempdir.path().join("copy"), &mut storage).unwrap();

        assert_eq!(
            copy.head().unwrap().target(),
modified node/src/storage.rs
@@ -3,7 +3,7 @@ pub mod refs;

use std::collections::hash_map;
use std::marker::PhantomData;
-
use std::ops::{Deref, DerefMut};
+
use std::ops::Deref;
use std::path::Path;
use std::{fmt, io};

@@ -13,7 +13,8 @@ use thiserror::Error;
pub use radicle_git_ext::Oid;

use crate::collections::HashMap;
-
use crate::crypto::{self, PublicKey, Unverified, Verified};
+
use crate::crypto;
+
use crate::crypto::{PublicKey, Signer, Unverified, Verified};
use crate::git::Url;
use crate::git::{RefError, RefStr, RefString};
use crate::identity;
@@ -202,9 +203,8 @@ impl Remote<Verified> {
}

pub trait ReadStorage {
-
    fn public_key(&self) -> &PublicKey;
    fn url(&self) -> Url;
-
    fn get(&self, proj: &Id) -> Result<Option<Project>, Error>;
+
    fn get(&self, remote: &RemoteId, proj: &Id) -> Result<Option<Project>, Error>;
    fn inventory(&self) -> Result<Inventory, Error>;
}

@@ -212,7 +212,11 @@ pub trait WriteStorage<'r>: ReadStorage {
    type Repository: WriteRepository<'r>;

    fn repository(&self, proj: &Id) -> Result<Self::Repository, Error>;
-
    fn sign_refs(&self, repository: &Self::Repository) -> Result<SignedRefs<Verified>, Error>;
+
    fn sign_refs<G: Signer>(
+
        &self,
+
        repository: &Self::Repository,
+
        signer: G,
+
    ) -> Result<SignedRefs<Verified>, Error>;
}

pub trait ReadRepository<'r> {
@@ -246,10 +250,6 @@ where
    T: Deref<Target = S>,
    S: ReadStorage + 'static,
{
-
    fn public_key(&self) -> &PublicKey {
-
        self.deref().public_key()
-
    }
-

    fn url(&self) -> Url {
        self.deref().url()
    }
@@ -258,14 +258,14 @@ where
        self.deref().inventory()
    }

-
    fn get(&self, proj: &Id) -> Result<Option<Project>, Error> {
-
        self.deref().get(proj)
+
    fn get(&self, remote: &RemoteId, proj: &Id) -> Result<Option<Project>, Error> {
+
        self.deref().get(remote, proj)
    }
}

impl<'r, T, S> WriteStorage<'r> for T
where
-
    T: DerefMut<Target = S>,
+
    T: Deref<Target = S>,
    S: WriteStorage<'r> + 'static,
{
    type Repository = S::Repository;
@@ -274,8 +274,12 @@ where
        self.deref().repository(proj)
    }

-
    fn sign_refs(&self, repository: &S::Repository) -> Result<SignedRefs<Verified>, Error> {
-
        self.deref().sign_refs(repository)
+
    fn sign_refs<G: Signer>(
+
        &self,
+
        repository: &S::Repository,
+
        signer: G,
+
    ) -> Result<SignedRefs<Verified>, Error> {
+
        self.deref().sign_refs(repository, signer)
    }
}

modified node/src/storage/git.rs
@@ -1,6 +1,5 @@
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
-
use std::sync::Arc;
use std::{fmt, fs, io};

use git_ref_format::refspec;
@@ -11,7 +10,7 @@ pub use radicle_git_ext::Oid;
use crate::crypto::{Signer, Verified};
use crate::git;
use crate::identity::{self, IDENTITY_PATH};
-
use crate::identity::{Id, Project, PublicKey};
+
use crate::identity::{Id, Project};
use crate::storage::refs;
use crate::storage::refs::{Refs, SignedRefs};
use crate::storage::{
@@ -28,7 +27,6 @@ pub static SIGNATURES_GLOB: Lazy<refspec::PatternString> =

pub struct Storage {
    path: PathBuf,
-
    signer: Arc<dyn Signer>,
}

impl fmt::Debug for Storage {
@@ -38,10 +36,6 @@ impl fmt::Debug for Storage {
}

impl ReadStorage for Storage {
-
    fn public_key(&self) -> &PublicKey {
-
        self.signer.public_key()
-
    }
-

    fn url(&self) -> git::Url {
        git::Url {
            scheme: git_url::Scheme::File,
@@ -51,13 +45,12 @@ impl ReadStorage for Storage {
        }
    }

-
    fn get(&self, id: &Id) -> Result<Option<Project>, Error> {
+
    fn get(&self, remote: &RemoteId, proj: &Id) -> Result<Option<Project>, Error> {
        // TODO: Don't create a repo here if it doesn't exist?
        // Perhaps for checking we could have a `contains` method?
-
        let local = self.public_key();
-
        let repo = self.repository(id)?;
+
        let repo = self.repository(proj)?;

-
        if let Some(doc) = repo.identity(local)? {
+
        if let Some(doc) = repo.identity(remote)? {
            let remotes = repo.remotes()?.collect::<Result<_, _>>()?;
            let path = repo.path().to_path_buf();

@@ -66,7 +59,7 @@ impl ReadStorage for Storage {
            // an corrupted state.

            Ok(Some(Project {
-
                id: id.clone(),
+
                id: proj.clone(),
                doc,
                remotes,
                path,
@@ -88,10 +81,14 @@ impl<'r> WriteStorage<'r> for Storage {
        Repository::open(self.path.join(proj.to_string()))
    }

-
    fn sign_refs(&self, repository: &Repository) -> Result<SignedRefs<Verified>, Error> {
-
        let remote = self.signer.public_key();
+
    fn sign_refs<G: Signer>(
+
        &self,
+
        repository: &Repository,
+
        signer: G,
+
    ) -> Result<SignedRefs<Verified>, Error> {
+
        let remote = signer.public_key();
        let refs = repository.references(remote)?;
-
        let signed = refs.signed(self.signer.clone())?;
+
        let signed = refs.signed(&signer)?;

        signed.save(remote, repository)?;

@@ -100,10 +97,7 @@ impl<'r> WriteStorage<'r> for Storage {
}

impl Storage {
-
    pub fn open<P: AsRef<Path>, S: Signer + 'static>(
-
        path: P,
-
        signer: S,
-
    ) -> Result<Self, io::Error> {
+
    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, io::Error> {
        let path = path.as_ref().to_path_buf();

        match fs::create_dir_all(&path) {
@@ -112,27 +106,13 @@ impl Storage {
            Ok(()) => {}
        }

-
        Ok(Self {
-
            path,
-
            signer: Arc::new(signer),
-
        })
+
        Ok(Self { path })
    }

    pub fn path(&self) -> &Path {
        self.path.as_path()
    }

-
    pub fn signer(&self) -> Arc<dyn Signer> {
-
        self.signer.clone()
-
    }
-

-
    pub fn with_signer(&self, signer: impl Signer + 'static) -> Self {
-
        Self {
-
            path: self.path.clone(),
-
            signer: Arc::new(signer),
-
        }
-
    }
-

    pub fn projects(&self) -> Result<Vec<Id>, Error> {
        let mut projects = Vec::new();

@@ -404,7 +384,7 @@ mod tests {
    fn test_fetch() {
        let tmp = tempfile::tempdir().unwrap();
        let alice = fixtures::storage(tmp.path().join("alice"));
-
        let bob = Storage::open(tmp.path().join("bob"), MockSigner::default()).unwrap();
+
        let bob = Storage::open(tmp.path().join("bob")).unwrap();
        let inventory = alice.inventory().unwrap();
        let proj = inventory.first().unwrap();
        let repo = alice.repository(proj).unwrap();
@@ -447,9 +427,9 @@ mod tests {
        let tmp = tempfile::tempdir().unwrap();
        let mut rng = fastrand::Rng::new();
        let signer = MockSigner::new(&mut rng);
-
        let storage = Storage::open(tmp.path(), signer).unwrap();
+
        let storage = Storage::open(tmp.path()).unwrap();
        let proj_id = arbitrary::gen::<Id>(1);
-
        let alice = *storage.public_key();
+
        let alice = *signer.public_key();
        let project = storage.repository(&proj_id).unwrap();
        let backend = &project.backend;
        let sig = git2::Signature::now(&alice.to_string(), "anonymous@radicle.xyz").unwrap();
@@ -465,7 +445,7 @@ mod tests {
            )
            .unwrap();

-
        let signed = storage.sign_refs(&project).unwrap();
+
        let signed = storage.sign_refs(&project, &signer).unwrap();
        let remote = project.remote(&alice).unwrap();
        let mut unsigned = project.references(&alice).unwrap();

modified node/src/storage/refs.rs
@@ -157,7 +157,7 @@ impl DerefMut for Refs {

/// Combination of [`Refs`] and a [`Signature`]. The signature is a cryptographic
/// signature over the refs. This allows us to easily verify if a set of refs
-
/// came from a particular user.
+
/// came from a particular key.
///
/// The type parameter keeps track of whether the signature was [`Verified`] or
/// [`Unverified`].
modified node/src/test/arbitrary.rs
@@ -13,7 +13,10 @@ use crate::crypto::{PublicKey, SecretKey};
use crate::git;
use crate::hash;
use crate::identity::{Delegate, Did, Doc, Id, Project};
-
use crate::protocol::message::{Address, Envelope, Message, MessageType, NodeAnnouncement};
+
use crate::protocol::message::{
+
    Address, Envelope, InventoryAnnouncement, Message, MessageType, NodeAnnouncement,
+
    RefsAnnouncement,
+
};
use crate::protocol::{NodeId, Timestamp};
use crate::storage;
use crate::storage::refs::{Refs, SignedRefs};
@@ -70,30 +73,31 @@ impl Arbitrary for Message {
    fn arbitrary(g: &mut quickcheck::Gen) -> Self {
        let type_id = g
            .choose(&[
-
                MessageType::GetInventory,
-
                MessageType::Inventory,
-
                MessageType::Node,
-
                MessageType::RefsUpdate,
+
                MessageType::InventoryAnnouncement,
+
                MessageType::NodeAnnouncement,
+
                MessageType::RefsAnnouncement,
            ])
            .unwrap();

        match type_id {
-
            MessageType::GetInventory => Self::GetInventory {
-
                ids: Vec::<Id>::arbitrary(g),
-
            },
-
            MessageType::Inventory => Self::Inventory {
+
            MessageType::InventoryAnnouncement => Self::InventoryAnnouncement {
                node: NodeId::arbitrary(g),
-
                inv: Vec::<Id>::arbitrary(g),
-
                timestamp: Timestamp::arbitrary(g),
+
                message: InventoryAnnouncement {
+
                    inventory: Vec::<Id>::arbitrary(g),
+
                    timestamp: Timestamp::arbitrary(g),
+
                },
+
                signature: crypto::Signature::from(ByteArray::<64>::arbitrary(g).into_inner()),
            },
-
            MessageType::RefsUpdate => Self::RefsUpdate {
-
                id: Id::arbitrary(g),
-
                signer: PublicKey::arbitrary(g),
-
                refs: SignedRefs::<Unverified>::arbitrary(g),
+
            MessageType::RefsAnnouncement => Self::RefsAnnouncement {
+
                node: NodeId::arbitrary(g),
+
                message: RefsAnnouncement {
+
                    id: Id::arbitrary(g),
+
                    refs: Refs::arbitrary(g),
+
                },
+
                signature: crypto::Signature::from(ByteArray::<64>::arbitrary(g).into_inner()),
            },
-
            MessageType::Node => {
-
                let announcement = NodeAnnouncement {
-
                    id: NodeId::arbitrary(g),
+
            MessageType::NodeAnnouncement => {
+
                let message = NodeAnnouncement {
                    features: ByteArray::<32>::arbitrary(g).into_inner(),
                    timestamp: Timestamp::arbitrary(g),
                    alias: ByteArray::<32>::arbitrary(g).into_inner(),
@@ -102,9 +106,10 @@ impl Arbitrary for Message {
                let bytes: ByteArray<64> = Arbitrary::arbitrary(g);
                let signature = crypto::Signature::from(bytes.into_inner());

-
                Self::Node {
+
                Self::NodeAnnouncement {
+
                    node: NodeId::arbitrary(g),
                    signature,
-
                    announcement,
+
                    message,
                }
            }
            _ => unreachable!(),
modified node/src/test/fixtures.rs
@@ -1,9 +1,10 @@
use std::path::Path;

+
use crate::crypto::Signer as _;
use crate::git;
use crate::identity::Id;
use crate::storage::git::Storage;
-
use crate::storage::{ReadStorage, WriteStorage};
+
use crate::storage::WriteStorage;
use crate::test::arbitrary;
use crate::test::crypto::MockSigner;

@@ -11,15 +12,12 @@ pub fn storage<P: AsRef<Path>>(path: P) -> Storage {
    let path = path.as_ref();
    let proj_ids = arbitrary::set::<Id>(3..=3);
    let signers = arbitrary::set::<MockSigner>(3..=3);
-
    let mut storages = signers
-
        .into_iter()
-
        .map(|s| Storage::open(path, s).unwrap())
-
        .collect::<Vec<_>>();
+
    let storage = Storage::open(path).unwrap();

    crate::test::logger::init(log::Level::Debug);

-
    for storage in &storages {
-
        let remote = storage.public_key();
+
    for signer in signers {
+
        let remote = signer.public_key();

        log::debug!("signer {}...", remote);

@@ -57,10 +55,10 @@ pub fn storage<P: AsRef<Path>>(path: P) -> Storage {
            )
            .unwrap();

-
            storage.sign_refs(&repo).unwrap();
+
            storage.sign_refs(&repo, &signer).unwrap();
        }
    }
-
    storages.pop().unwrap()
+
    storage
}

/// Creates a regular repository at the given path with a couple of commits.
modified node/src/test/logger.rs
@@ -14,11 +14,7 @@ impl Log for Logger {

        match record.target() {
            "test" => {
-
                println!(
-
                    "{} {}",
-
                    "test:".yellow(),
-
                    record.args().to_string().yellow()
-
                )
+
                println!("{} {}", "test:".cyan(), record.args().to_string().yellow())
            }
            "sim" => {
                println!("{}  {}", "sim:".bold(), record.args().to_string().bold())
@@ -26,7 +22,17 @@ impl Log for Logger {
            target => {
                if self.enabled(record.metadata()) {
                    let s = format!("{:<8} {}", format!("{}:", target), record.args());
-
                    println!("{}", s.dimmed());
+
                    match record.level() {
+
                        log::Level::Warn => {
+
                            println!("{}", s.yellow());
+
                        }
+
                        log::Level::Error => {
+
                            println!("{}", s.red());
+
                        }
+
                        _ => {
+
                            println!("{}", s.dimmed());
+
                        }
+
                    }
                }
            }
        }
modified node/src/test/peer.rs
@@ -124,8 +124,8 @@ where
        self.config().git_url.clone()
    }

-
    pub fn id(&self) -> NodeId {
-
        self.protocol.id()
+
    pub fn node_id(&self) -> NodeId {
+
        self.protocol.node_id()
    }

    pub fn receive(&mut self, peer: &net::SocketAddr, msg: Message) {
@@ -145,7 +145,7 @@ where
        self.receive(
            &remote,
            Message::hello(
-
                peer.id(),
+
                peer.node_id(),
                self.local_time().as_secs(),
                vec![Address::from(remote)],
                git,
@@ -155,8 +155,8 @@ where
        let mut msgs = self.messages(&remote);
        msgs.find(|m| matches!(m, Message::Hello { .. }))
            .expect("`hello` is sent");
-
        msgs.find(|m| matches!(m, Message::GetInventory { .. }))
-
            .expect("`get-inventory` is sent");
+
        msgs.find(|m| matches!(m, Message::InventoryAnnouncement { .. }))
+
            .expect("`inventory-announcement` is sent");
    }

    pub fn connect_to(&mut self, peer: &Self) {
@@ -170,14 +170,14 @@ where
        let mut msgs = self.messages(&remote);
        msgs.find(|m| matches!(m, Message::Hello { .. }))
            .expect("`hello` is sent");
-
        msgs.find(|m| matches!(m, Message::GetInventory { .. }))
-
            .expect("`get-inventory` is sent");
+
        msgs.find(|m| matches!(m, Message::InventoryAnnouncement { .. }))
+
            .expect("`inventory-announcement` is sent");

        let git = peer.config().git_url.clone();
        self.receive(
            &remote,
            Message::hello(
-
                peer.id(),
+
                peer.node_id(),
                self.local_time().as_secs(),
                peer.config().listen.clone(),
                git,
modified node/src/test/storage.rs
@@ -1,6 +1,6 @@
use git_url::Url;

-
use crate::crypto::{PublicKey, Verified};
+
use crate::crypto::{Signer, Verified};
use crate::git;
use crate::identity::{Id, Project};
use crate::storage::{refs, RefUpdate};
@@ -26,10 +26,6 @@ impl MockStorage {
}

impl ReadStorage for MockStorage {
-
    fn public_key(&self) -> &PublicKey {
-
        todo!()
-
    }
-

    fn url(&self) -> Url {
        Url {
            scheme: git_url::Scheme::Radicle,
@@ -38,7 +34,7 @@ impl ReadStorage for MockStorage {
        }
    }

-
    fn get(&self, proj: &Id) -> Result<Option<Project>, Error> {
+
    fn get(&self, _remote: &RemoteId, proj: &Id) -> Result<Option<Project>, Error> {
        if let Some(proj) = self.inventory.iter().find(|p| p.id == *proj) {
            return Ok(Some(proj.clone()));
        }
@@ -63,9 +59,10 @@ impl WriteStorage<'_> for MockStorage {
        Ok(MockRepository {})
    }

-
    fn sign_refs(
+
    fn sign_refs<G: Signer>(
        &self,
        _repository: &Self::Repository,
+
        _signer: G,
    ) -> Result<crate::storage::refs::SignedRefs<Verified>, Error> {
        todo!()
    }
modified node/src/test/tests.rs
@@ -14,7 +14,6 @@ use crate::protocol::peer::*;
use crate::protocol::*;
use crate::storage::git::Storage;
use crate::storage::ReadStorage;
-
use crate::test::crypto::MockSigner;
use crate::test::fixtures;
#[allow(unused)]
use crate::test::logger;
@@ -114,7 +113,7 @@ fn test_handshake_invalid_timestamp() {
    alice.receive(
        &bob.addr(),
        Message::hello(
-
            bob.id(),
+
            bob.node_id(),
            alice.timestamp() - time_delta,
            vec![],
            bob.git_url(),
@@ -135,7 +134,7 @@ fn test_inventory_sync() {
    let mut alice = Peer::new(
        "alice",
        [7, 7, 7, 7],
-
        Storage::open(tmp.path().join("alice"), MockSigner::default()).unwrap(),
+
        Storage::open(tmp.path().join("alice")).unwrap(),
    );
    let bob_storage = fixtures::storage(tmp.path().join("bob"));
    let bob = Peer::new("bob", [8, 8, 8, 8], bob_storage);
@@ -145,16 +144,18 @@ fn test_inventory_sync() {
    alice.connect_to(&bob);
    alice.receive(
        &bob.addr(),
-
        Message::Inventory {
-
            node: bob.id(),
-
            timestamp: now,
-
            inv: projs.clone(),
-
        },
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: projs.clone(),
+
                timestamp: now,
+
            },
+
            bob.signer(),
+
        ),
    );

    for proj in &projs {
        let seeds = alice.routing().get(proj).unwrap();
-
        assert!(seeds.contains(&bob.id()));
+
        assert!(seeds.contains(&bob.node_id()));
    }

    let a = alice
@@ -212,11 +213,13 @@ fn test_inventory_relay_bad_timestamp() {
    alice.connect_to(&bob);
    alice.receive(
        &bob.addr(),
-
        Message::Inventory {
-
            node: bob.id(),
-
            timestamp,
-
            inv: vec![],
-
        },
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: vec![],
+
                timestamp,
+
            },
+
            bob.signer(),
+
        ),
    );
    assert_matches!(
        alice.outbox().next(),
@@ -239,16 +242,18 @@ fn test_inventory_relay() {
    alice.connect_from(&eve);
    alice.receive(
        &bob.addr(),
-
        Message::Inventory {
-
            node: bob.id(),
-
            timestamp: now,
-
            inv: inv.clone(),
-
        },
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: inv.clone(),
+
                timestamp: now,
+
            },
+
            bob.signer(),
+
        ),
    );
    assert_matches!(
        alice.messages(&eve.addr()).next(),
-
        Some(Message::Inventory { node, timestamp, .. })
-
        if node == bob.id() && timestamp == now
+
        Some(Message::InventoryAnnouncement { node, message: InventoryAnnouncement { timestamp, .. }, .. })
+
        if node == bob.node_id() && timestamp == now
    );
    assert_matches!(
        alice.messages(&bob.addr()).next(),
@@ -258,11 +263,13 @@ fn test_inventory_relay() {

    alice.receive(
        &bob.addr(),
-
        Message::Inventory {
-
            node: bob.id(),
-
            timestamp: now,
-
            inv: inv.clone(),
-
        },
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: inv.clone(),
+
                timestamp: now,
+
            },
+
            bob.signer(),
+
        ),
    );
    assert_matches!(
        alice.messages(&eve.addr()).next(),
@@ -272,32 +279,36 @@ fn test_inventory_relay() {

    alice.receive(
        &bob.addr(),
-
        Message::Inventory {
-
            node: bob.id(),
-
            timestamp: now + 1,
-
            inv: inv.clone(),
-
        },
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: inv.clone(),
+
                timestamp: now + 1,
+
            },
+
            bob.signer(),
+
        ),
    );
    assert_matches!(
        alice.messages(&eve.addr()).next(),
-
        Some(Message::Inventory { node, timestamp, .. })
-
        if node == bob.id() && timestamp == now + 1,
+
        Some(Message::InventoryAnnouncement { node, message: InventoryAnnouncement{ timestamp, .. }, .. })
+
        if node == bob.node_id() && timestamp == now + 1,
        "Sending a new inventory does trigger the relay"
    );

    // Inventory from Eve relayed to Bob.
    alice.receive(
        &eve.addr(),
-
        Message::Inventory {
-
            node: eve.id(),
-
            timestamp: now,
-
            inv,
-
        },
+
        Message::inventory(
+
            InventoryAnnouncement {
+
                inventory: inv,
+
                timestamp: now,
+
            },
+
            eve.signer(),
+
        ),
    );
    assert_matches!(
        alice.messages(&bob.addr()).next(),
-
        Some(Message::Inventory { node, timestamp, .. })
-
        if node == eve.id() && timestamp == now
+
        Some(Message::InventoryAnnouncement { node, message: InventoryAnnouncement { timestamp, .. }, .. })
+
        if node == eve.node_id() && timestamp == now
    );
}

@@ -374,26 +385,14 @@ fn test_push_and_pull() {

    let tempdir = tempfile::tempdir().unwrap();

-
    let storage_alice = Storage::open(
-
        tempdir.path().join("alice").join("storage"),
-
        MockSigner::default(),
-
    )
-
    .unwrap();
+
    let storage_alice = Storage::open(tempdir.path().join("alice").join("storage")).unwrap();
    let repo = fixtures::repository(tempdir.path().join("working"));
    let mut alice = Peer::new("alice", [7, 7, 7, 7], storage_alice);

-
    let storage_bob = Storage::open(
-
        tempdir.path().join("bob").join("storage"),
-
        MockSigner::default(),
-
    )
-
    .unwrap();
+
    let storage_bob = Storage::open(tempdir.path().join("bob").join("storage")).unwrap();
    let mut bob = Peer::new("bob", [8, 8, 8, 8], storage_bob);

-
    let storage_eve = Storage::open(
-
        tempdir.path().join("eve").join("storage"),
-
        MockSigner::default(),
-
    )
-
    .unwrap();
+
    let storage_eve = Storage::open(tempdir.path().join("eve").join("storage")).unwrap();
    let mut eve = Peer::new("eve", [9, 9, 9, 9], storage_eve);

    // Alice and Bob connect to Eve.
@@ -416,7 +415,8 @@ fn test_push_and_pull() {
        "alice",
        "alice's repo",
        storage::BranchName::from("master"),
-
        alice.storage_mut(),
+
        alice.signer(),
+
        alice.storage(),
    )
    .unwrap();

@@ -429,16 +429,25 @@ fn test_push_and_pull() {
    eve.command(protocol::Command::Track(proj_id.clone(), sender));

    // Neither of them have it in the beginning.
-
    assert!(eve.storage().get(&proj_id).unwrap().is_none());
-
    assert!(bob.storage().get(&proj_id).unwrap().is_none());
+
    assert!(eve.get(&proj_id).unwrap().is_none());
+
    assert!(bob.get(&proj_id).unwrap().is_none());

    // Alice announces her refs.
    // We now expect Eve to fetch Alice's project from Alice.
    // Then we expect Bob to fetch Alice's project from Eve.
-
    alice.command(protocol::Command::AnnounceRefsUpdate(proj_id.clone()));
+
    alice.command(protocol::Command::AnnounceRefs(proj_id.clone()));
    sim.run_while([&mut alice, &mut bob, &mut eve], |s| !s.is_settled());
-
    assert!(eve.storage().get(&proj_id).unwrap().is_some());
-
    assert!(bob.storage().get(&proj_id).unwrap().is_some());
+

+
    assert!(eve
+
        .storage()
+
        .get(&alice.node_id(), &proj_id)
+
        .unwrap()
+
        .is_some());
+
    assert!(bob
+
        .storage()
+
        .get(&alice.node_id(), &proj_id)
+
        .unwrap()
+
        .is_some());
    assert_matches!(
        sim.events(&bob.ip).next(),
        Some(protocol::Event::RefsFetched { from, .. })
@@ -457,9 +466,9 @@ fn prop_inventory_exchange_dense() {
        let mut routing = Routing::with_hasher(rng.clone().into());

        for (inv, peer) in &[
-
            (alice_inv.inventory, alice.id()),
-
            (bob_inv.inventory, bob.id()),
-
            (eve_inv.inventory, eve.id()),
+
            (alice_inv.inventory, alice.node_id()),
+
            (bob_inv.inventory, bob.node_id()),
+
            (eve_inv.inventory, eve.node_id()),
        ] {
            for proj in inv {
                routing
@@ -475,9 +484,13 @@ fn prop_inventory_exchange_dense() {
        eve.command(Command::Connect(alice.addr()));
        eve.command(Command::Connect(bob.addr()));

-
        let mut peers: HashMap<_, _> = [(alice.id(), alice), (bob.id(), bob), (eve.id(), eve)]
-
            .into_iter()
-
            .collect();
+
        let mut peers: HashMap<_, _> = [
+
            (alice.node_id(), alice),
+
            (bob.node_id(), bob),
+
            (eve.node_id(), eve),
+
        ]
+
        .into_iter()
+
        .collect();
        let mut simulator = Simulation::new(LocalTime::now(), rng, simulator::Options::default())
            .initialize(peers.values_mut());

@@ -488,14 +501,12 @@ fn prop_inventory_exchange_dense() {
                let lookup = peer.lookup(proj_id);

                if lookup.local.is_some() {
-
                    peer.storage()
-
                        .get(proj_id)
+
                    peer.get(proj_id)
                        .expect("There are no errors querying storage")
                        .expect("The project is available locally");
                } else {
                    for remote in &lookup.remote {
                        peers[remote]
-
                            .storage()
                            .get(proj_id)
                            .expect("There are no errors querying storage")
                            .expect("The project is available remotely");