Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
Refactor service
Alexis Sellier committed 3 years ago
commit fcb1007f9d2f0873ab6f89c0445836c500f412b0
parent 71698818496edab9bef8854553b5e97ca7a7e72a
19 files changed +586 -542
modified radicle-node/src/clock.rs
@@ -3,6 +3,9 @@ use std::rc::Rc;

use crate::{LocalDuration, LocalTime};

+
/// Seconds since epoch.
+
pub type Timestamp = u64;
+

/// Clock with interior mutability.
#[derive(Debug, Clone)]
pub struct RefClock(Rc<RefCell<LocalTime>>);
@@ -28,6 +31,10 @@ impl RefClock {
    pub fn set(&mut self, time: LocalTime) {
        *self.borrow_mut() = time;
    }
+

+
    pub fn timestamp(&self) -> Timestamp {
+
        self.local_time().as_secs()
+
    }
}

impl From<LocalTime> for RefClock {
modified radicle-node/src/lib.rs
@@ -1,6 +1,4 @@
#![allow(dead_code)]
-
pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};
-

pub mod address_book;
pub mod address_manager;
pub mod client;
@@ -14,15 +12,18 @@ pub mod test;
pub mod transport;
pub mod wire;

+
pub use nakamoto_net::{Io, Link, LocalDuration, LocalTime};
pub use radicle::{collections, crypto, git, hash, identity, rad, storage};

pub mod prelude {
+
    pub use crate::clock::Timestamp;
    pub use crate::crypto::{PublicKey, Signature, Signer};
    pub use crate::decoder::Decoder;
    pub use crate::hash::Digest;
    pub use crate::identity::{Did, Id};
    pub use crate::service::filter::Filter;
-
    pub use crate::service::{NodeId, Timestamp};
+
    pub use crate::service::{DisconnectReason, Envelope, Event, Message, Network, NodeId};
    pub use crate::storage::refs::Refs;
    pub use crate::storage::WriteStorage;
+
    pub use crate::{LocalDuration, LocalTime};
}
modified radicle-node/src/service.rs
@@ -1,10 +1,10 @@
-
#![allow(dead_code)]
pub mod config;
pub mod filter;
pub mod message;
pub mod peer;
+
pub mod reactor;

-
use std::collections::{BTreeMap, VecDeque};
+
use std::collections::BTreeMap;
use std::ops::{Deref, DerefMut};
use std::{fmt, net, net::IpAddr};

@@ -15,14 +15,15 @@ use nakamoto::{LocalDuration, LocalTime};
use nakamoto_net as nakamoto;
use nakamoto_net::Link;
use nonempty::NonEmpty;
+
use radicle::storage::ReadStorage;

use crate::address_book;
use crate::address_book::AddressBook;
use crate::address_manager::AddressManager;
-
use crate::clock::RefClock;
+
use crate::clock::{RefClock, Timestamp};
use crate::collections::{HashMap, HashSet};
use crate::crypto;
-
use crate::crypto::Verified;
+
use crate::crypto::{Signer, Verified};
use crate::git;
use crate::git::Url;
use crate::identity::{Doc, Id};
@@ -35,8 +36,8 @@ use crate::storage::{Inventory, ReadRepository, RefUpdate, WriteRepository, Writ
pub use crate::service::config::{Config, Network};
pub use crate::service::message::{Envelope, Message};

-
use self::filter::Filter;
use self::message::{InventoryAnnouncement, NodeFeatures};
+
use self::reactor::Reactor;

pub const DEFAULT_PORT: u16 = 8776;
pub const PROTOCOL_VERSION: u32 = 1;
@@ -52,23 +53,6 @@ pub const MAX_TIME_DELTA: LocalDuration = LocalDuration::from_mins(60);
pub type NodeId = crypto::PublicKey;
/// Network routing table. Keeps track of where projects are hosted.
pub type Routing = HashMap<Id, HashSet<NodeId>>;
-
/// Seconds since epoch.
-
pub type Timestamp = u64;
-

-
/// Output of a state transition.
-
#[derive(Debug)]
-
pub enum Io {
-
    /// There are some messages ready to be sent to a peer.
-
    Write(net::SocketAddr, Vec<Envelope>),
-
    /// Connect to a peer.
-
    Connect(net::SocketAddr),
-
    /// Disconnect from a peer.
-
    Disconnect(net::SocketAddr, DisconnectReason),
-
    /// Ask for a wakeup in a specified amount of time.
-
    Wakeup(LocalDuration),
-
    /// Emit an event.
-
    Event(Event),
-
}

/// A service event.
#[derive(Debug, Clone)]
@@ -138,11 +122,27 @@ pub enum Command {
pub enum CommandError {}

#[derive(Debug)]
-
pub struct Service<S, T, G> {
+
pub struct Service<A, S, G> {
+
    /// Service configuration.
+
    config: Config,
+
    /// Our cryptographic signer and key.
+
    signer: G,
+
    /// Project storage.
+
    storage: S,
+
    /// Tracks the location of projects.
+
    routing: Routing,
    /// Peer sessions, currently or recently connected.
    sessions: Sessions,
-
    /// Service state that isn't peer-specific.
-
    context: Context<S, T, G>,
+
    /// Keeps track of peer states.
+
    peers: BTreeMap<NodeId, Peer>,
+
    /// Clock. Tells the time.
+
    clock: RefClock,
+
    /// Interface to the I/O reactor.
+
    reactor: Reactor,
+
    /// Peer address manager.
+
    addrmgr: AddressManager<A>,
+
    /// Source of entropy.
+
    rng: Rng,
    /// Whether our local inventory no long represents what we have announced to the network.
    out_of_sync: bool,
    /// Last time the service was idle.
@@ -157,20 +157,36 @@ pub struct Service<S, T, G> {
    start_time: LocalTime,
}

-
impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service<S, T, G> {
+
impl<A, S, G> Service<A, S, G>
+
where
+
    A: address_book::Store,
+
    S: WriteStorage + 'static,
+
    G: crypto::Signer,
+
{
    pub fn new(
        config: Config,
        clock: RefClock,
-
        storage: T,
-
        addresses: S,
+
        storage: S,
+
        addresses: A,
        signer: G,
        rng: Rng,
    ) -> Self {
        let addrmgr = AddressManager::new(addresses);
+
        let routing = HashMap::with_hasher(rng.clone().into());
+
        let sessions = Sessions::new(rng.clone());
+
        let network = config.network;

        Self {
-
            context: Context::new(config, clock, storage, addrmgr, signer, rng.clone()),
-
            sessions: Sessions::new(rng),
+
            config,
+
            storage,
+
            addrmgr,
+
            signer,
+
            rng,
+
            clock,
+
            routing,
+
            peers: BTreeMap::new(),
+
            reactor: Reactor::new(network),
+
            sessions,
            out_of_sync: false,
            last_idle: LocalTime::default(),
            last_sync: LocalTime::default(),
@@ -180,10 +196,8 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
        }
    }

-
    pub fn disconnect(&mut self, remote: &IpAddr, reason: DisconnectReason) {
-
        if let Some(addr) = self.sessions.get(remote).map(|p| p.addr) {
-
            self.context.disconnect(addr, reason);
-
        }
+
    pub fn node_id(&self) -> NodeId {
+
        *self.signer.public_key()
    }

    pub fn seeds(&self, id: &Id) -> Box<dyn Iterator<Item = (&NodeId, &Session)> + '_> {
@@ -243,17 +257,17 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

    /// Get the current inventory.
    pub fn inventory(&self) -> Result<Inventory, storage::Error> {
-
        self.context.storage.inventory()
+
        self.storage.inventory()
    }

    /// Get the storage instance.
-
    pub fn storage(&self) -> &T {
-
        &self.context.storage
+
    pub fn storage(&self) -> &S {
+
        &self.storage
    }

    /// Get the mutable storage instance.
-
    pub fn storage_mut(&mut self) -> &mut T {
-
        &mut self.context.storage
+
    pub fn storage_mut(&mut self) -> &mut S {
+
        &mut self.storage
    }

    /// Get a project from storage, using the local node's key.
@@ -263,34 +277,38 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

    /// Get the local signer.
    pub fn signer(&self) -> &G {
-
        &self.context.signer
+
        &self.signer
+
    }
+

+
    /// Get the clock.
+
    pub fn clock(&self) -> &RefClock {
+
        &self.clock
    }

    /// Get the local service time.
    pub fn local_time(&self) -> LocalTime {
-
        self.context.clock.local_time()
+
        self.clock.local_time()
    }

    /// Get service configuration.
    pub fn config(&self) -> &Config {
-
        &self.context.config
+
        &self.config
    }

    /// Get reference to routing table.
    pub fn routing(&self) -> &Routing {
-
        &self.context.routing
+
        &self.routing
    }

-
    /// Get I/O outbox.
-
    pub fn outbox(&mut self) -> &mut VecDeque<Io> {
-
        &mut self.context.io
+
    /// Get I/O reactor.
+
    pub fn reactor(&mut self) -> &mut Reactor {
+
        &mut self.reactor
    }

    pub fn lookup(&self, id: &Id) -> Lookup {
        Lookup {
-
            local: self.context.storage.get(&self.node_id(), id).unwrap(),
+
            local: self.storage.get(&self.node_id(), id).unwrap(),
            remote: self
-
                .context
                .routing
                .get(id)
                .map_or(vec![], |r| r.iter().cloned().collect()),
@@ -303,20 +321,20 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
        self.start_time = time;

        // Connect to configured peers.
-
        let addrs = self.context.config.connect.clone();
+
        let addrs = self.config.connect.clone();
        for addr in addrs {
-
            self.context.connect(addr);
+
            self.reactor.connect(addr);
        }
    }

    pub fn tick(&mut self, now: nakamoto::LocalTime) {
        trace!("Tick +{}", now - self.start_time);

-
        self.context.clock.set(now);
+
        self.clock.set(now);
    }

    pub fn wake(&mut self) {
-
        let now = self.context.clock.local_time();
+
        let now = self.clock.local_time();

        trace!("Wake +{}", now - self.start_time);

@@ -324,28 +342,28 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
            debug!("Running 'idle' task...");

            self.maintain_connections();
-
            self.context.io.push_back(Io::Wakeup(IDLE_INTERVAL));
+
            self.reactor.wakeup(IDLE_INTERVAL);
            self.last_idle = now;
        }
        if now - self.last_sync >= SYNC_INTERVAL {
            debug!("Running 'sync' task...");

            // TODO: What do we do here?
-
            self.context.io.push_back(Io::Wakeup(SYNC_INTERVAL));
+
            self.reactor.wakeup(SYNC_INTERVAL);
            self.last_sync = now;
        }
        if now - self.last_announce >= ANNOUNCE_INTERVAL {
            if self.out_of_sync {
                self.announce_inventory().unwrap();
            }
-
            self.context.io.push_back(Io::Wakeup(ANNOUNCE_INTERVAL));
+
            self.reactor.wakeup(ANNOUNCE_INTERVAL);
            self.last_announce = now;
        }
        if now - self.last_prune >= PRUNE_INTERVAL {
            debug!("Running 'prune' task...");

            self.prune_routing_entries();
-
            self.context.io.push_back(Io::Wakeup(PRUNE_INTERVAL));
+
            self.reactor.wakeup(PRUNE_INTERVAL);
            self.last_prune = now;
        }
    }
@@ -354,7 +372,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
        debug!("Command {:?}", cmd);

        match cmd {
-
            Command::Connect(addr) => self.context.connect(addr),
+
            Command::Connect(addr) => self.reactor.connect(addr),
            Command::Fetch(id, resp) => {
                if !self.config.is_tracking(&id) {
                    resp.send(FetchLookup::NotTracking).ok();
@@ -433,7 +451,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
                let message = RefsAnnouncement { id, refs };
                let signature = message.sign(&self.signer);

-
                self.context.broadcast(
+
                self.reactor.broadcast(
                    Message::RefsAnnouncement {
                        node,
                        message,
@@ -447,7 +465,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

    pub fn attempted(&mut self, addr: &std::net::SocketAddr) {
        let ip = addr.ip();
-
        let persistent = self.context.config.is_persistent(addr);
+
        let persistent = self.config.is_persistent(addr);
        let peer = self
            .sessions
            .entry(ip)
@@ -470,21 +488,24 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
        // For inbound connections, we wait for the remote to say "Hello" first.
        // TODO: How should we deal with multiple peers connecting from the same IP address?
        if link.is_outbound() {
-
            // TODO: Refactor this so that we don't create messages if the peer isn't found.
-
            let messages = self.handshake_messages();
-

            if let Some(peer) = self.sessions.get_mut(&ip) {
-
                self.context.write_all(peer.addr, messages);
-
                peer.connected();
+
                if link.is_outbound() {
+
                    self.reactor.write_all(
+
                        addr,
+
                        gossip::handshake(
+
                            self.clock.timestamp(),
+
                            &self.storage,
+
                            &self.signer,
+
                            &self.config,
+
                        ),
+
                    );
+
                }
+
                peer.connected(link);
            }
        } else {
            self.sessions.insert(
                ip,
-
                Session::new(
-
                    addr,
-
                    Link::Inbound,
-
                    self.context.config.is_persistent(&addr),
-
                ),
+
                Session::new(addr, Link::Inbound, self.config.is_persistent(&addr)),
            );
        }
    }
@@ -503,8 +524,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
            peer.state = SessionState::Disconnected { since };

            // Attempt to re-connect to persistent peers.
-
            if self.context.config.is_persistent(addr) && peer.attempts() < MAX_CONNECTION_ATTEMPTS
-
            {
+
            if self.config.is_persistent(addr) && peer.attempts() < MAX_CONNECTION_ATTEMPTS {
                if reason.is_dial_err() {
                    return;
                }
@@ -520,7 +540,7 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
                // TODO: Try to reconnect only if the peer was attempted. A disconnect without
                // even a successful attempt means that we're unlikely to be able to reconnect.

-
                self.context.connect(*addr);
+
                self.reactor.connect(*addr);
            } else {
                // TODO: Non-persistent peers should be removed from the
                // map here or at some later point.
@@ -528,36 +548,208 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
        }
    }

-
    pub fn received_message(&mut self, addr: &std::net::SocketAddr, msg: Envelope) {
-
        let peer_ip = addr.ip();
-
        let peer = if let Some(peer) = self.sessions.get_mut(&peer_ip) {
-
            peer
-
        } else {
-
            return;
-
        };
+
    pub fn received_message(&mut self, addr: &net::SocketAddr, envelope: Envelope) {
+
        match self.handle_message(addr, envelope) {
+
            Ok(relay) => {
+
                if let Some(msg) = relay {
+
                    let negotiated = self
+
                        .sessions
+
                        .negotiated()
+
                        .filter(|(ip, _)| **ip != addr.ip())
+
                        .map(|(_, p)| p);

-
        let relay = match peer.received(msg, &mut self.context) {
-
            Ok(msg) => msg,
+
                    self.reactor.relay(msg, negotiated.clone());
+
                }
+
            }
+
            Err(SessionError::NotFound(ip)) => {
+
                error!("Session not found for {}", ip);
+
            }
            Err(err) => {
-
                self.context
-
                    .disconnect(peer.addr, DisconnectReason::Error(err));
                // If there's an error, stop processing messages from this peer.
                // However, we still relay messages returned up to this point.
-
                //
+
                self.reactor.disconnect(*addr, DisconnectReason::Error(err));
+

                // FIXME: The peer should be set in a state such that we don'that
                // process further messages.
-
                return;
            }
+
        }
+
    }
+

+
    pub fn handle_message(
+
        &mut self,
+
        remote: &net::SocketAddr,
+
        envelope: Envelope,
+
    ) -> Result<Option<Message>, peer::SessionError> {
+
        let peer_ip = remote.ip();
+
        let peer = if let Some(peer) = self.sessions.get_mut(&peer_ip) {
+
            peer
+
        } else {
+
            return Err(SessionError::NotFound(remote.ip()));
        };

-
        if let Some(msg) = relay {
-
            let negotiated = self
-
                .sessions
-
                .negotiated()
-
                .filter(|(ip, _)| **ip != peer_ip)
-
                .map(|(_, p)| p);
+
        if envelope.magic != self.config.network.magic() {
+
            return Err(SessionError::WrongMagic(envelope.magic));
+
        }
+
        debug!("Received {:?} from {}", &envelope.msg, peer.ip());
+

+
        match (&peer.state, envelope.msg) {
+
            (
+
                SessionState::Initial,
+
                Message::Initialize {
+
                    id,
+
                    version,
+
                    addrs,
+
                    git,
+
                },
+
            ) => {
+
                if version != PROTOCOL_VERSION {
+
                    return Err(SessionError::WrongVersion(version));
+
                }
+
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
+
                // extra "acknowledgment" message sent when the `Initialize` is well received.
+
                if peer.link.is_inbound() {
+
                    self.reactor.write_all(
+
                        peer.addr,
+
                        gossip::handshake(
+
                            self.clock.timestamp(),
+
                            &self.storage,
+
                            &self.signer,
+
                            &self.config,
+
                        ),
+
                    );
+
                }
+
                // Nb. we don't set the peer timestamp here, since it is going to be
+
                // set after the first message is received only. Setting it here would
+
                // mean that messages received right after the handshake could be ignored.
+
                peer.state = SessionState::Negotiated {
+
                    id,
+
                    since: self.clock.local_time(),
+
                    addrs,
+
                    git,
+
                };
+
            }
+
            (SessionState::Initial, _) => {
+
                debug!(
+
                    "Disconnecting peer {} for sending us a message before handshake",
+
                    peer.ip()
+
                );
+
                return Err(SessionError::Misbehavior);
+
            }
+
            (
+
                SessionState::Negotiated { git, .. },
+
                Message::InventoryAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                },
+
            ) => {
+
                let now = self.clock.local_time();
+
                let peer = self.peers.entry(node).or_insert_with(Peer::default);
+
                let relay = self.config.relay;
+
                let git = git.clone();
+

+
                // Don't allow messages from too far in the future.
+
                if message.timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
+
                    return Err(SessionError::InvalidTimestamp(message.timestamp));
+
                }
+
                // Discard inventory messages we've already seen, otherwise update
+
                // out last seen time.
+
                if message.timestamp > peer.last_message {
+
                    peer.last_message = message.timestamp;
+
                } else {
+
                    return Ok(None);
+
                }
+
                self.process_inventory(&message.inventory, node, &git);
+

+
                if relay {
+
                    return Ok(Some(Message::InventoryAnnouncement {
+
                        node,
+
                        message,
+
                        signature,
+
                    }));
+
                }
+
            }
+
            // Process a peer inventory update announcement by (maybe) fetching.
+
            (
+
                SessionState::Negotiated { git, .. },
+
                Message::RefsAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                },
+
            ) => {
+
                // FIXME: Check message timestamp.
+

+
                if message.verify(&node, &signature) {
+
                    // TODO: Buffer/throttle fetches.
+
                    // TODO: Check that we're tracking this user as well.
+
                    if self.config.is_tracking(&message.id) {
+
                        // TODO: Check refs to see if we should try to fetch or not.
+
                        let updated = self.storage.fetch(&message.id, git).unwrap();
+
                        let is_updated = !updated.is_empty();
+

+
                        self.reactor.event(Event::RefsFetched {
+
                            from: git.clone(),
+
                            project: message.id.clone(),
+
                            updated,
+
                        });
+

+
                        if is_updated {
+
                            return Ok(Some(Message::RefsAnnouncement {
+
                                node,
+
                                message,
+
                                signature,
+
                            }));
+
                        }
+
                    }
+
                } else {
+
                    return Err(SessionError::Misbehavior);
+
                }
+
            }
+
            (
+
                SessionState::Negotiated { .. },
+
                Message::NodeAnnouncement {
+
                    node,
+
                    message,
+
                    signature,
+
                },
+
            ) => {
+
                // FIXME: Check message timestamp.
+

+
                if !message.verify(&node, &signature) {
+
                    return Err(SessionError::Misbehavior);
+
                }
+
                log::warn!("Node announcement handling is not implemented");
+
            }
+
            (SessionState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
+
                peer.subscribe = Some(subscribe);
+
            }
+
            (SessionState::Negotiated { .. }, Message::Initialize { .. }) => {
+
                debug!(
+
                    "Disconnecting peer {} for sending us a redundant handshake message",
+
                    peer.ip()
+
                );
+
                return Err(SessionError::Misbehavior);
+
            }
+
            (SessionState::Disconnected { .. }, msg) => {
+
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.ip());
+
            }
+
        }
+
        Ok(None)
+
    }
+

+
    /// Process a peer inventory announcement by updating our routing table.
+
    fn process_inventory(&mut self, inventory: &Inventory, from: NodeId, remote: &Url) {
+
        for proj_id in inventory {
+
            let inventory = self
+
                .routing
+
                .entry(proj_id.clone())
+
                .or_insert_with(|| HashSet::with_hasher(self.rng.clone().into()));

-
            self.context.relay(msg, negotiated.clone());
+
            // TODO: Fire an event on routing update.
+
            if inventory.insert(from) && self.config.is_tracking(proj_id) {
+
                self.storage.fetch(proj_id, remote).unwrap();
+
            }
        }
    }

@@ -567,10 +759,14 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service

    /// Announce our inventory to all connected peers.
    fn announce_inventory(&mut self) -> Result<(), storage::Error> {
-
        let inv = Message::inventory(self.context.inventory_announcement()?, &self.context.signer);
+
        let inventory = self.storage().inventory()?;
+
        let inv = Message::inventory(
+
            gossip::inventory(self.clock.timestamp(), inventory),
+
            &self.signer,
+
        );

        for addr in self.sessions.negotiated().map(|(_, p)| p.addr) {
-
            self.context.write(addr, inv.clone());
+
            self.reactor.write(addr, inv.clone());
        }
        Ok(())
    }
@@ -591,20 +787,6 @@ impl<'r, T: WriteStorage<'r>, S: address_book::Store, G: crypto::Signer> Service
    }
}

-
impl<S, T, G> Deref for Service<S, T, G> {
-
    type Target = Context<S, T, G>;
-

-
    fn deref(&self) -> &Self::Target {
-
        &self.context
-
    }
-
}
-

-
impl<S, T, G> DerefMut for Service<S, T, G> {
-
    fn deref_mut(&mut self) -> &mut Self::Target {
-
        &mut self.context
-
    }
-
}
-

#[derive(Debug, Clone)]
pub enum DisconnectReason {
    User,
@@ -635,11 +817,11 @@ impl fmt::Display for DisconnectReason {
    }
}

-
impl<S, T, G> Iterator for Service<S, T, G> {
-
    type Item = Io;
+
impl<A, S, G> Iterator for Service<A, S, G> {
+
    type Item = reactor::Io;

    fn next(&mut self) -> Option<Self::Item> {
-
        self.context.io.pop_front()
+
        self.reactor.next()
    }
}

@@ -659,202 +841,6 @@ pub struct Peer {
    pub last_message: Timestamp,
}

-
/// Global service state used across peers.
-
#[derive(Debug)]
-
pub struct Context<S, T, G> {
-
    /// Service configuration.
-
    config: Config,
-
    /// Our cryptographic signer and key.
-
    signer: G,
-
    /// Tracks the location of projects.
-
    routing: Routing,
-
    /// Keeps track of peer states.
-
    peers: BTreeMap<NodeId, Peer>,
-
    /// Outgoing I/O queue.
-
    io: VecDeque<Io>,
-
    /// Clock. Tells the time.
-
    clock: RefClock,
-
    /// Project storage.
-
    storage: T,
-
    /// Peer address manager.
-
    addrmgr: AddressManager<S>,
-
    /// Source of entropy.
-
    rng: Rng,
-
}
-

-
impl<S, T, G> Context<S, T, G>
-
where
-
    T: storage::ReadStorage,
-
    G: crypto::Signer,
-
{
-
    pub(crate) fn node_id(&self) -> NodeId {
-
        *self.signer.public_key()
-
    }
-
}
-

-
impl<'r, S, T, G> Context<S, T, G>
-
where
-
    T: storage::WriteStorage<'r>,
-
    G: crypto::Signer,
-
{
-
    pub(crate) fn new(
-
        config: Config,
-
        clock: RefClock,
-
        storage: T,
-
        addrmgr: AddressManager<S>,
-
        signer: G,
-
        rng: Rng,
-
    ) -> Self {
-
        Self {
-
            config,
-
            signer,
-
            clock,
-
            routing: HashMap::with_hasher(rng.clone().into()),
-
            peers: BTreeMap::new(),
-
            io: VecDeque::new(),
-
            storage,
-
            addrmgr,
-
            rng,
-
        }
-
    }
-

-
    fn node_announcement(&self) -> NodeAnnouncement {
-
        let timestamp = self.timestamp();
-
        let features = NodeFeatures::default();
-
        let alias = self.alias();
-
        let addresses = vec![]; // TODO
-

-
        NodeAnnouncement {
-
            features,
-
            timestamp,
-
            alias,
-
            addresses,
-
        }
-
    }
-

-
    fn inventory_announcement(&self) -> Result<InventoryAnnouncement, storage::Error> {
-
        let timestamp = self.timestamp();
-
        let inventory = self.storage.inventory()?;
-

-
        Ok(InventoryAnnouncement {
-
            inventory,
-
            timestamp,
-
        })
-
    }
-

-
    fn filter(&self) -> Filter {
-
        match &self.config.project_tracking {
-
            ProjectTracking::All { .. } => Filter::default(),
-
            ProjectTracking::Allowed(ids) => Filter::new(ids.iter()),
-
        }
-
    }
-

-
    fn handshake_messages(&self) -> [Message; 4] {
-
        let git = self.config.git_url.clone();
-
        [
-
            Message::init(self.node_id(), self.config.listen.clone(), git),
-
            Message::node(self.node_announcement(), &self.signer),
-
            Message::inventory(self.inventory_announcement().unwrap(), &self.signer),
-
            Message::subscribe(self.filter(), self.timestamp(), Timestamp::MAX),
-
        ]
-
    }
-

-
    fn alias(&self) -> [u8; 32] {
-
        let mut alias = [0u8; 32];
-

-
        alias[..9].copy_from_slice("anonymous".as_bytes());
-
        alias
-
    }
-

-
    /// Process a peer inventory announcement by updating our routing table.
-
    fn process_inventory(&mut self, inventory: &Inventory, from: NodeId, remote: &Url) {
-
        for proj_id in inventory {
-
            let inventory = self
-
                .routing
-
                .entry(proj_id.clone())
-
                .or_insert_with(|| HashSet::with_hasher(self.rng.clone().into()));
-

-
            // TODO: Fire an event on routing update.
-
            if inventory.insert(from) && self.config.is_tracking(proj_id) {
-
                self.fetch(proj_id, remote);
-
            }
-
        }
-
    }
-

-
    fn fetch(&mut self, proj_id: &Id, remote: &Url) -> Vec<RefUpdate> {
-
        let mut repo = self.storage.repository(proj_id).unwrap();
-
        let mut path = remote.path.clone();
-

-
        path.push(b'/');
-
        path.extend(proj_id.to_string().into_bytes());
-

-
        repo.fetch(&Url {
-
            path,
-
            ..remote.clone()
-
        })
-
        .unwrap()
-
    }
-

-
    /// Disconnect a peer.
-
    fn disconnect(&mut self, addr: net::SocketAddr, reason: DisconnectReason) {
-
        self.io.push_back(Io::Disconnect(addr, reason));
-
    }
-
}
-

-
impl<S, T, G> Context<S, T, G> {
-
    /// Get current local timestamp.
-
    pub(crate) fn timestamp(&self) -> Timestamp {
-
        self.clock.local_time().as_secs()
-
    }
-

-
    /// Connect to a peer.
-
    fn connect(&mut self, addr: net::SocketAddr) {
-
        // TODO: Make sure we don't try to connect more than once to the same address.
-
        self.io.push_back(Io::Connect(addr));
-
    }
-

-
    fn write_all(&mut self, remote: net::SocketAddr, msgs: impl IntoIterator<Item = Message>) {
-
        let envelopes = msgs
-
            .into_iter()
-
            .map(|msg| self.config.network.envelope(msg))
-
            .collect();
-
        self.io.push_back(Io::Write(remote, envelopes));
-
    }
-

-
    fn write(&mut self, remote: net::SocketAddr, msg: Message) {
-
        debug!("Write {:?} to {}", &msg, remote.ip());
-

-
        let envelope = self.config.network.envelope(msg);
-
        self.io.push_back(Io::Write(remote, vec![envelope]));
-
    }
-

-
    /// Broadcast a message to a list of peers.
-
    fn broadcast<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
-
        for peer in peers {
-
            self.write(peer.addr, msg.clone());
-
        }
-
    }
-

-
    /// Relay a message to interested peers.
-
    fn relay<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
-
        if let Message::RefsAnnouncement { message, .. } = &msg {
-
            let id = message.id.clone();
-
            let peers = peers.into_iter().filter(|p| {
-
                if let Some(subscribe) = &p.subscribe {
-
                    subscribe.filter.contains(&id)
-
                } else {
-
                    // If the peer did not send us a `subscribe` message, we don'the
-
                    // relay any messages to them.
-
                    false
-
                }
-
            });
-
            self.broadcast(msg, peers);
-
        } else {
-
            self.broadcast(msg, peers);
-
        }
-
    }
-
}
-

#[derive(Debug)]
/// Holds currently (or recently) connected peers.
pub struct Sessions(AddressBook<IpAddr, Session>);
@@ -893,3 +879,44 @@ impl DerefMut for Sessions {
        &mut self.0
    }
}
+

+
mod gossip {
+
    use super::*;
+

+
    pub fn handshake<G: Signer, S: ReadStorage>(
+
        timestamp: Timestamp,
+
        storage: &S,
+
        signer: &G,
+
        config: &Config,
+
    ) -> [Message; 4] {
+
        let git = config.git_url.clone();
+
        let inventory = storage.inventory().unwrap();
+

+
        [
+
            Message::init(*signer.public_key(), config.listen.clone(), git),
+
            Message::node(gossip::node(timestamp, config), signer),
+
            Message::inventory(gossip::inventory(timestamp, inventory), signer),
+
            Message::subscribe(config.filter(), timestamp, Timestamp::MAX),
+
        ]
+
    }
+

+
    pub fn node(timestamp: Timestamp, config: &Config) -> NodeAnnouncement {
+
        let features = NodeFeatures::default();
+
        let alias = config.alias();
+
        let addresses = vec![]; // TODO
+

+
        NodeAnnouncement {
+
            features,
+
            timestamp,
+
            alias,
+
            addresses,
+
        }
+
    }
+

+
    pub fn inventory(timestamp: Timestamp, inventory: Vec<Id>) -> InventoryAnnouncement {
+
        InventoryAnnouncement {
+
            inventory,
+
            timestamp,
+
        }
+
    }
+
}
modified radicle-node/src/service/config.rs
@@ -4,6 +4,7 @@ use crate::collections::HashSet;
use crate::git;
use crate::git::Url;
use crate::identity::{Id, PublicKey};
+
use crate::service::filter::Filter;
use crate::service::message::{Address, Envelope, Message};

/// Peer-to-peer network.
@@ -124,4 +125,18 @@ impl Config {
            ProjectTracking::Allowed(ids) => ids.remove(&id),
        }
    }
+

+
    pub fn filter(&self) -> Filter {
+
        match &self.project_tracking {
+
            ProjectTracking::All { .. } => Filter::default(),
+
            ProjectTracking::Allowed(ids) => Filter::new(ids.iter()),
+
        }
+
    }
+

+
    pub fn alias(&self) -> [u8; 32] {
+
        let mut alias = [0u8; 32];
+

+
        alias[..9].copy_from_slice("anonymous".as_bytes());
+
        alias
+
    }
}
modified radicle-node/src/service/peer.rs
@@ -31,6 +31,8 @@ pub enum SessionError {
    WrongVersion(u32),
    #[error("invalid announcement timestamp: {0}")]
    InvalidTimestamp(u64),
+
    #[error("session not found for address `{0}`")]
+
    NotFound(net::IpAddr),
    #[error("peer misbehaved")]
    Misbehavior,
}
@@ -84,157 +86,7 @@ impl Session {
        self.attempts += 1;
    }

-
    pub fn connected(&mut self) {
+
    pub fn connected(&mut self, _link: Link) {
        self.attempts = 0;
    }
-

-
    pub fn received<'r, S, T, G>(
-
        &mut self,
-
        envelope: Envelope,
-
        ctx: &mut Context<S, T, G>,
-
    ) -> Result<Option<Message>, SessionError>
-
    where
-
        T: storage::WriteStorage<'r>,
-
        G: crypto::Signer,
-
    {
-
        if envelope.magic != ctx.config.network.magic() {
-
            return Err(SessionError::WrongMagic(envelope.magic));
-
        }
-
        debug!("Received {:?} from {}", &envelope.msg, self.ip());
-

-
        match (&self.state, envelope.msg) {
-
            (
-
                SessionState::Initial,
-
                Message::Initialize {
-
                    id,
-
                    version,
-
                    addrs,
-
                    git,
-
                },
-
            ) => {
-
                if version != PROTOCOL_VERSION {
-
                    return Err(SessionError::WrongVersion(version));
-
                }
-
                // Nb. This is a very primitive handshake. Eventually we should have anyhow
-
                // extra "acknowledgment" message sent when the `Initialize` is well received.
-
                if self.link.is_inbound() {
-
                    ctx.write_all(self.addr, ctx.handshake_messages());
-
                }
-
                // Nb. we don't set the peer timestamp here, since it is going to be
-
                // set after the first message is received only. Setting it here would
-
                // mean that messages received right after the handshake could be ignored.
-
                self.state = SessionState::Negotiated {
-
                    id,
-
                    since: ctx.clock.local_time(),
-
                    addrs,
-
                    git,
-
                };
-
            }
-
            (SessionState::Initial, _) => {
-
                debug!(
-
                    "Disconnecting peer {} for sending us a message before handshake",
-
                    self.ip()
-
                );
-
                return Err(SessionError::Misbehavior);
-
            }
-
            (
-
                SessionState::Negotiated { git, .. },
-
                Message::InventoryAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                },
-
            ) => {
-
                let now = ctx.clock.local_time();
-
                let peer = ctx.peers.entry(node).or_insert_with(Peer::default);
-

-
                // Don't allow messages from too far in the future.
-
                if message.timestamp.saturating_sub(now.as_secs()) > MAX_TIME_DELTA.as_secs() {
-
                    return Err(SessionError::InvalidTimestamp(message.timestamp));
-
                }
-
                // Discard inventory messages we've already seen, otherwise update
-
                // out last seen time.
-
                if message.timestamp > peer.last_message {
-
                    peer.last_message = message.timestamp;
-
                } else {
-
                    return Ok(None);
-
                }
-
                ctx.process_inventory(&message.inventory, node, git);
-

-
                if ctx.config.relay {
-
                    return Ok(Some(Message::InventoryAnnouncement {
-
                        node,
-
                        message,
-
                        signature,
-
                    }));
-
                }
-
            }
-
            // Process a peer inventory update announcement by (maybe) fetching.
-
            (
-
                SessionState::Negotiated { git, .. },
-
                Message::RefsAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                },
-
            ) => {
-
                // FIXME: Check message timestamp.
-

-
                if message.verify(&node, &signature) {
-
                    // TODO: Buffer/throttle fetches.
-
                    // TODO: Check that we're tracking this user as well.
-
                    if ctx.config.is_tracking(&message.id) {
-
                        // TODO: Check refs to see if we should try to fetch or not.
-
                        let updated_refs = ctx.fetch(&message.id, git);
-
                        let is_updated = !updated_refs.is_empty();
-

-
                        ctx.io.push_back(Io::Event(Event::RefsFetched {
-
                            from: git.clone(),
-
                            project: message.id.clone(),
-
                            updated: updated_refs,
-
                        }));
-

-
                        if is_updated {
-
                            return Ok(Some(Message::RefsAnnouncement {
-
                                node,
-
                                message,
-
                                signature,
-
                            }));
-
                        }
-
                    }
-
                } else {
-
                    return Err(SessionError::Misbehavior);
-
                }
-
            }
-
            (
-
                SessionState::Negotiated { .. },
-
                Message::NodeAnnouncement {
-
                    node,
-
                    message,
-
                    signature,
-
                },
-
            ) => {
-
                // FIXME: Check message timestamp.
-

-
                if !message.verify(&node, &signature) {
-
                    return Err(SessionError::Misbehavior);
-
                }
-
                log::warn!("Node announcement handling is not implemented");
-
            }
-
            (SessionState::Negotiated { .. }, Message::Subscribe(subscribe)) => {
-
                self.subscribe = Some(subscribe);
-
            }
-
            (SessionState::Negotiated { .. }, Message::Initialize { .. }) => {
-
                debug!(
-
                    "Disconnecting peer {} for sending us a redundant handshake message",
-
                    self.ip()
-
                );
-
                return Err(SessionError::Misbehavior);
-
            }
-
            (SessionState::Disconnected { .. }, msg) => {
-
                debug!("Ignoring {:?} from disconnected peer {}", msg, self.ip());
-
            }
-
        }
-
        Ok(None)
-
    }
}
added radicle-node/src/service/reactor.rs
@@ -0,0 +1,114 @@
+
use std::collections::VecDeque;
+
use std::net;
+

+
use log::*;
+

+
use crate::prelude::*;
+
use crate::service::peer::Session;
+

+
/// Output of a state transition.
+
#[derive(Debug)]
+
pub enum Io {
+
    /// There are some messages ready to be sent to a peer.
+
    Write(net::SocketAddr, Vec<Envelope>),
+
    /// Connect to a peer.
+
    Connect(net::SocketAddr),
+
    /// Disconnect from a peer.
+
    Disconnect(net::SocketAddr, DisconnectReason),
+
    /// Ask for a wakeup in a specified amount of time.
+
    Wakeup(LocalDuration),
+
    /// Emit an event.
+
    Event(Event),
+
}
+

+
/// Interface to the network reactor.
+
#[derive(Debug)]
+
pub struct Reactor {
+
    /// The network we're on.
+
    network: Network,
+
    /// Outgoing I/O queue.
+
    io: VecDeque<Io>,
+
}
+

+
impl Reactor {
+
    pub fn new(network: Network) -> Self {
+
        Self {
+
            network,
+
            io: VecDeque::new(),
+
        }
+
    }
+

+
    /// Emit an event.
+
    pub fn event(&mut self, event: Event) {
+
        self.io.push_back(Io::Event(event));
+
    }
+

+
    /// Connect to a peer.
+
    pub fn connect(&mut self, addr: net::SocketAddr) {
+
        // TODO: Make sure we don't try to connect more than once to the same address.
+
        self.io.push_back(Io::Connect(addr));
+
    }
+

+
    /// Disconnect a peer.
+
    pub fn disconnect(&mut self, addr: net::SocketAddr, reason: DisconnectReason) {
+
        self.io.push_back(Io::Disconnect(addr, reason));
+
    }
+

+
    pub fn write(&mut self, remote: net::SocketAddr, msg: Message) {
+
        debug!("Write {:?} to {}", &msg, remote.ip());
+

+
        let envelope = self.network.envelope(msg);
+
        self.io.push_back(Io::Write(remote, vec![envelope]));
+
    }
+

+
    pub fn write_all(&mut self, remote: net::SocketAddr, msgs: impl IntoIterator<Item = Message>) {
+
        let envelopes = msgs
+
            .into_iter()
+
            .map(|msg| self.network.envelope(msg))
+
            .collect();
+
        self.io.push_back(Io::Write(remote, envelopes));
+
    }
+

+
    pub fn wakeup(&mut self, after: LocalDuration) {
+
        self.io.push_back(Io::Wakeup(after));
+
    }
+

+
    /// Broadcast a message to a list of peers.
+
    pub fn broadcast<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
+
        for peer in peers {
+
            self.write(peer.addr, msg.clone());
+
        }
+
    }
+

+
    /// Relay a message to interested peers.
+
    pub fn relay<'a>(&mut self, msg: Message, peers: impl IntoIterator<Item = &'a Session>) {
+
        if let Message::RefsAnnouncement { message, .. } = &msg {
+
            let id = message.id.clone();
+
            let peers = peers.into_iter().filter(|p| {
+
                if let Some(subscribe) = &p.subscribe {
+
                    subscribe.filter.contains(&id)
+
                } else {
+
                    // If the peer did not send us a `subscribe` message, we don'the
+
                    // relay any messages to them.
+
                    false
+
                }
+
            });
+
            self.broadcast(msg, peers);
+
        } else {
+
            self.broadcast(msg, peers);
+
        }
+
    }
+

+
    #[cfg(test)]
+
    pub(crate) fn outbox(&mut self) -> &mut VecDeque<Io> {
+
        &mut self.io
+
    }
+
}
+

+
impl Iterator for Reactor {
+
    type Item = Io;
+

+
    fn next(&mut self) -> Option<Self::Item> {
+
        self.io.pop_front()
+
    }
+
}
modified radicle-node/src/test/arbitrary.rs
@@ -4,14 +4,12 @@ use bloomy::BloomFilter;
use quickcheck::Arbitrary;

use crate::crypto;
-
use crate::identity::Id;
+
use crate::prelude::{Id, NodeId, Refs, Timestamp};
use crate::service::filter::{Filter, FILTER_SIZE};
use crate::service::message::{
    Address, Envelope, InventoryAnnouncement, Message, NodeAnnouncement, RefsAnnouncement,
    Subscribe,
};
-
use crate::service::{NodeId, Timestamp};
-
use crate::storage::refs::Refs;
use crate::wire::message::MessageType;

pub use radicle::test::arbitrary::*;
modified radicle-node/src/test/peer.rs
@@ -1,16 +1,18 @@
+
use std::iter;
use std::net;
use std::ops::{Deref, DerefMut};

use log::*;

use crate::address_book::{KnownAddress, Source};
-
use crate::clock::RefClock;
+
use crate::clock::{RefClock, Timestamp};
use crate::collections::HashMap;
use crate::git;
use crate::git::Url;
use crate::service;
use crate::service::config::*;
use crate::service::message::*;
+
use crate::service::reactor::Io;
use crate::service::*;
use crate::storage::WriteStorage;
use crate::test::signer::MockSigner;
@@ -32,9 +34,9 @@ pub struct Peer<S> {
    initialized: bool,
}

-
impl<'r, S> simulator::Peer<S> for Peer<S>
+
impl<S> simulator::Peer<S> for Peer<S>
where
-
    S: WriteStorage<'r> + 'static,
+
    S: WriteStorage + 'static,
{
    fn init(&mut self) {
        self.initialize()
@@ -59,9 +61,9 @@ impl<S> DerefMut for Peer<S> {
    }
}

-
impl<'r, S> Peer<S>
+
impl<S> Peer<S>
where
-
    S: WriteStorage<'r> + 'static,
+
    S: WriteStorage + 'static,
{
    pub fn new(name: &'static str, ip: impl Into<net::IpAddr>, storage: S) -> Self {
        let git_url = Url {
@@ -124,7 +126,7 @@ where
    }

    pub fn timestamp(&self) -> Timestamp {
-
        self.service.timestamp()
+
        self.service.clock().timestamp()
    }

    pub fn git_url(&self) -> Url {
@@ -185,8 +187,8 @@ where
    pub fn messages(&mut self, remote: &net::SocketAddr) -> impl Iterator<Item = Message> {
        let mut msgs = Vec::new();

-
        self.service.outbox().retain(|o| match o {
-
            service::Io::Write(a, envelopes) if a == remote => {
+
        self.service.reactor().outbox().retain(|o| match o {
+
            Io::Write(a, envelopes) if a == remote => {
                msgs.extend(envelopes.iter().map(|e| e.msg.clone()));
                false
            }
@@ -204,6 +206,6 @@ where

    /// Get a draining iterator over the peer's I/O outbox.
    pub fn outbox(&mut self) -> impl Iterator<Item = Io> + '_ {
-
        self.service.outbox().drain(..)
+
        iter::from_fn(|| self.service.reactor().next())
    }
}
modified radicle-node/src/test/simulator.rs
@@ -13,7 +13,8 @@ use log::*;
use nakamoto_net as nakamoto;
use nakamoto_net::{Link, LocalDuration, LocalTime};

-
use crate::service::{DisconnectReason, Envelope, Event, Io};
+
use crate::service::reactor::Io;
+
use crate::service::{DisconnectReason, Envelope, Event};
use crate::storage::WriteStorage;
use crate::test::peer::Service;

@@ -191,7 +192,7 @@ pub struct Simulation<S> {
    storage: PhantomData<S>,
}

-
impl<'r, S: WriteStorage<'r>> Simulation<S> {
+
impl<S: WriteStorage + 'static> Simulation<S> {
    /// Create a new simulation.
    pub fn new(time: LocalTime, rng: fastrand::Rng, opts: Options) -> Self {
        Self {
modified radicle-node/src/test/tests.rs
@@ -8,6 +8,7 @@ use crate::collections::{HashMap, HashSet};
use crate::service::config::*;
use crate::service::message::*;
use crate::service::peer::*;
+
use crate::service::reactor::Io;
use crate::service::*;
use crate::storage::git::Storage;
use crate::storage::ReadStorage;
modified radicle-node/src/transport.rs
@@ -32,9 +32,9 @@ impl<S, T, G> Transport<S, T, G> {
    }
}

-
impl<'r, S, T, G> nakamoto::Protocol for Transport<S, T, G>
+
impl<S, T, G> nakamoto::Protocol for Transport<S, T, G>
where
-
    T: WriteStorage<'r> + 'static,
+
    T: WriteStorage + 'static,
    S: address_book::Store,
    G: crypto::Signer,
{
modified radicle-node/src/wire.rs
@@ -20,6 +20,7 @@ use crate::hash::Digest;
use crate::identity::Id;
use crate::service;
use crate::service::filter;
+
use crate::service::reactor::Io;
use crate::storage::refs::Refs;
use crate::storage::refs::SignedRefs;
use crate::storage::WriteStorage;
@@ -450,10 +451,10 @@ impl<S, T, G> Wire<S, T, G> {
    }
}

-
impl<'r, S, T, G> Wire<S, T, G>
+
impl<S, T, G> Wire<S, T, G>
where
    S: address_book::Store,
-
    T: WriteStorage<'r> + 'static,
+
    T: WriteStorage + 'static,
    G: Signer,
{
    pub fn connected(
@@ -505,7 +506,7 @@ impl<S, T, G> Iterator for Wire<S, T, G> {

    fn next(&mut self) -> Option<Self::Item> {
        match self.inner.next() {
-
            Some(service::Io::Write(addr, msgs)) => {
+
            Some(Io::Write(addr, msgs)) => {
                let mut buf = Vec::new();
                for msg in msgs {
                    log::debug!("Write {:?} to {}", &msg, addr.ip());
@@ -515,10 +516,10 @@ impl<S, T, G> Iterator for Wire<S, T, G> {
                }
                Some(nakamoto::Io::Write(addr, buf))
            }
-
            Some(service::Io::Event(e)) => Some(nakamoto::Io::Event(e)),
-
            Some(service::Io::Connect(a)) => Some(nakamoto::Io::Connect(a)),
-
            Some(service::Io::Disconnect(a, r)) => Some(nakamoto::Io::Disconnect(a, r)),
-
            Some(service::Io::Wakeup(d)) => Some(nakamoto::Io::Wakeup(d)),
+
            Some(Io::Event(e)) => Some(nakamoto::Io::Event(e)),
+
            Some(Io::Connect(a)) => Some(nakamoto::Io::Connect(a)),
+
            Some(Io::Disconnect(a, r)) => Some(nakamoto::Io::Disconnect(a, r)),
+
            Some(Io::Wakeup(d)) => Some(nakamoto::Io::Wakeup(d)),

            None => None,
        }
modified radicle/src/identity/project.rs
@@ -125,11 +125,11 @@ impl Doc<Verified> {
        Ok((oid, sig))
    }

-
    pub fn create<'r, S: WriteStorage<'r>>(
+
    pub fn create<S: WriteStorage>(
        &self,
        remote: &RemoteId,
        msg: &str,
-
        storage: &'r S,
+
        storage: &S,
    ) -> Result<(Id, git::Oid, S::Repository), Error> {
        // You can checkout this branch in your working copy with:
        //
@@ -147,7 +147,7 @@ impl Doc<Verified> {
        Ok((id, oid, repo))
    }

-
    pub fn update<'r, R: WriteRepository<'r>>(
+
    pub fn update<R: WriteRepository>(
        &self,
        remote: &RemoteId,
        msg: &str,
@@ -320,7 +320,7 @@ impl Doc<Unverified> {
        })
    }

-
    pub fn blob_at<'r, R: ReadRepository<'r>>(
+
    pub fn blob_at<R: ReadRepository>(
        commit: Oid,
        repo: &R,
    ) -> Result<Option<git2::Blob>, git::Error> {
@@ -331,7 +331,7 @@ impl Doc<Unverified> {
        }
    }

-
    pub fn load_at<'r, R: ReadRepository<'r>>(
+
    pub fn load_at<R: ReadRepository>(
        commit: Oid,
        repo: &R,
    ) -> Result<Option<(Self, Oid)>, git::Error> {
@@ -342,7 +342,7 @@ impl Doc<Unverified> {
        Ok(None)
    }

-
    pub fn load<'r, R: ReadRepository<'r>>(
+
    pub fn load<R: ReadRepository>(
        remote: &RemoteId,
        repo: &R,
    ) -> Result<Option<(Self, Oid)>, git::Error> {
@@ -355,10 +355,7 @@ impl Doc<Unverified> {
}

impl<V> Doc<V> {
-
    pub fn head<'r, R: ReadRepository<'r>>(
-
        remote: &RemoteId,
-
        repo: &R,
-
    ) -> Result<Option<Oid>, git::Error> {
+
    pub fn head<R: ReadRepository>(remote: &RemoteId, repo: &R) -> Result<Option<Oid>, git::Error> {
        let head = &git::refname!("heads").join(&*git::refs::IDENTITY_BRANCH);
        if let Some(oid) = repo.reference_oid(remote, head)? {
            Ok(Some(oid))
@@ -421,7 +418,7 @@ impl Identity<Oid> {
}

impl Identity<Untrusted> {
-
    pub fn load<'r, R: ReadRepository<'r>>(
+
    pub fn load<R: ReadRepository>(
        remote: &RemoteId,
        repo: &R,
    ) -> Result<Option<Identity<Oid>>, IdentityError> {
modified radicle/src/rad.rs
@@ -35,13 +35,13 @@ pub enum InitError {
}

/// Initialize a new radicle project from a git repository.
-
pub fn init<'r, G: Signer, S: storage::WriteStorage<'r>>(
+
pub fn init<G: Signer, S: storage::WriteStorage>(
    repo: &git2::Repository,
    name: &str,
    description: &str,
    default_branch: BranchName,
    signer: G,
-
    storage: &'r S,
+
    storage: &S,
) -> Result<(Id, SignedRefs<Verified>), InitError> {
    let pk = signer.public_key();
    let delegate = identity::Delegate {
@@ -98,7 +98,7 @@ pub enum ForkError {
}

/// Create a local tree for an existing project, from an existing remote.
-
pub fn fork_remote<'r, G: Signer, S: storage::WriteStorage<'r>>(
+
pub fn fork_remote<G: Signer, S: storage::WriteStorage>(
    proj: &Id,
    remote: &RemoteId,
    signer: G,
@@ -147,7 +147,7 @@ pub fn fork_remote<'r, G: Signer, S: storage::WriteStorage<'r>>(
    Ok(())
}

-
pub fn fork<'r, G: Signer, S: storage::WriteStorage<'r>>(
+
pub fn fork<G: Signer, S: storage::WriteStorage>(
    proj: &Id,
    signer: &G,
    storage: &S,
@@ -206,7 +206,7 @@ pub enum CloneError {
    Checkout(#[from] CheckoutError),
}

-
pub fn clone<'r, P: AsRef<Path>, G: Signer, S: storage::WriteStorage<'r>, H: node::Handle>(
+
pub fn clone<P: AsRef<Path>, G: Signer, S: storage::WriteStorage, H: node::Handle>(
    proj: &Id,
    path: P,
    signer: &G,
@@ -232,7 +232,7 @@ pub enum CloneUrlError {
    Checkout(#[from] CheckoutError),
}

-
pub fn clone_url<'r, P: AsRef<Path>, G: Signer, S: storage::WriteStorage<'r>>(
+
pub fn clone_url<P: AsRef<Path>, G: Signer, S: storage::WriteStorage>(
    proj: &Id,
    url: &git::Url,
    path: P,
modified radicle/src/storage.rs
@@ -222,8 +222,8 @@ pub trait ReadStorage {
    fn inventory(&self) -> Result<Inventory, Error>;
}

-
pub trait WriteStorage<'r>: ReadStorage {
-
    type Repository: WriteRepository<'r>;
+
pub trait WriteStorage: ReadStorage {
+
    type Repository: WriteRepository;

    fn repository(&self, proj: &Id) -> Result<Self::Repository, Error>;
    fn sign_refs<G: Signer>(
@@ -231,11 +231,10 @@ pub trait WriteStorage<'r>: ReadStorage {
        repository: &Self::Repository,
        signer: G,
    ) -> Result<SignedRefs<Verified>, Error>;
+
    fn fetch(&self, proj_id: &Id, remote: &Url) -> Result<Vec<RefUpdate>, FetchError>;
}

-
pub trait ReadRepository<'r> {
-
    type Remotes: Iterator<Item = Result<(RemoteId, Remote<Verified>), refs::Error>> + 'r;
-

+
pub trait ReadRepository {
    fn is_empty(&self) -> Result<bool, git2::Error>;
    fn path(&self) -> &Path;
    fn blob_at<'a>(&'a self, oid: Oid, path: &'a Path) -> Result<git2::Blob<'a>, git_ext::Error>;
@@ -253,13 +252,13 @@ pub trait ReadRepository<'r> {
    ) -> Result<Option<Oid>, git2::Error>;
    fn references(&self, remote: &RemoteId) -> Result<Refs, Error>;
    fn remote(&self, remote: &RemoteId) -> Result<Remote<Verified>, refs::Error>;
-
    fn remotes(&'r self) -> Result<Self::Remotes, git2::Error>;
+
    fn remotes(&self) -> Result<Remotes<Verified>, refs::Error>;
    /// Return the project associated with this repository.
    fn project(&self) -> Result<identity::Doc<Verified>, Error>;
    fn project_identity(&self) -> Result<(Oid, identity::Doc<Unverified>), ProjectError>;
}

-
pub trait WriteRepository<'r>: ReadRepository<'r> {
+
pub trait WriteRepository: ReadRepository {
    fn fetch(&mut self, url: &Url) -> Result<Vec<RefUpdate>, FetchError>;
    fn raw(&self) -> &git2::Repository;
}
@@ -286,10 +285,10 @@ where
    }
}

-
impl<'r, T, S> WriteStorage<'r> for T
+
impl<T, S> WriteStorage for T
where
    T: Deref<Target = S>,
-
    S: WriteStorage<'r> + 'static,
+
    S: WriteStorage + 'static,
{
    type Repository = S::Repository;

@@ -304,6 +303,10 @@ where
    ) -> Result<SignedRefs<Verified>, Error> {
        self.deref().sign_refs(repository, signer)
    }
+

+
    fn fetch(&self, proj_id: &Id, remote: &Url) -> Result<Vec<RefUpdate>, FetchError> {
+
        self.deref().fetch(proj_id, remote)
+
    }
}

#[cfg(test)]
modified radicle/src/storage/git.rs
@@ -13,7 +13,7 @@ use crate::identity::{Doc, Id};
use crate::storage::refs;
use crate::storage::refs::{Refs, SignedRefs};
use crate::storage::{
-
    Error, FetchError, Inventory, ReadRepository, ReadStorage, Remote, WriteRepository,
+
    Error, FetchError, Inventory, ReadRepository, ReadStorage, Remote, Remotes, WriteRepository,
    WriteStorage,
};

@@ -79,7 +79,7 @@ impl ReadStorage for Storage {
    }
}

-
impl<'r> WriteStorage<'r> for Storage {
+
impl WriteStorage for Storage {
    type Repository = Repository;

    fn repository(&self, proj: &Id) -> Result<Self::Repository, Error> {
@@ -99,6 +99,19 @@ impl<'r> WriteStorage<'r> for Storage {

        Ok(signed)
    }
+

+
    fn fetch(&self, proj_id: &Id, remote: &Url) -> Result<Vec<RefUpdate>, FetchError> {
+
        let mut repo = self.repository(proj_id).unwrap();
+
        let mut path = remote.path.clone();
+

+
        path.push(b'/');
+
        path.extend(proj_id.to_string().into_bytes());
+

+
        repo.fetch(&Url {
+
            path,
+
            ..remote.clone()
+
        })
+
    }
}

impl Storage {
@@ -338,11 +351,28 @@ impl Repository {
        );
        Ok(iter)
    }
-
}

-
impl<'r> ReadRepository<'r> for Repository {
-
    type Remotes = Box<dyn Iterator<Item = Result<(RemoteId, Remote<Verified>), refs::Error>> + 'r>;
+
    pub fn remotes(
+
        &self,
+
    ) -> Result<
+
        impl Iterator<Item = Result<(RemoteId, Remote<Verified>), refs::Error>> + '_,
+
        git2::Error,
+
    > {
+
        let remotes = self.backend.references_glob(SIGNATURES_GLOB.as_str())?.map(
+
            |reference| -> Result<_, _> {
+
                let r = reference?;
+
                let name = r.name().ok_or(refs::Error::InvalidRef)?;
+
                let (id, _) = git::parse_ref::<RemoteId>(name)?;
+
                let remote = self.remote(&id)?;

+
                Ok((id, remote))
+
            },
+
        );
+
        Ok(remotes)
+
    }
+
}
+

+
impl ReadRepository for Repository {
    fn is_empty(&self) -> Result<bool, git2::Error> {
        let some = self.remotes()?.next().is_some();
        Ok(!some)
@@ -425,19 +455,12 @@ impl<'r> ReadRepository<'r> for Repository {
        Ok(refs.into())
    }

-
    fn remotes(&'r self) -> Result<Self::Remotes, git2::Error> {
-
        let iter = self.backend.references_glob(SIGNATURES_GLOB.as_str())?.map(
-
            |reference| -> Result<(RemoteId, Remote<Verified>), refs::Error> {
-
                let r = reference?;
-
                let name = r.name().ok_or(refs::Error::InvalidRef)?;
-
                let (id, _) = git::parse_ref::<RemoteId>(name)?;
-
                let remote = self.remote(&id)?;
-

-
                Ok((id, remote))
-
            },
-
        );
-

-
        Ok(Box::new(iter))
+
    fn remotes(&self) -> Result<Remotes<Verified>, refs::Error> {
+
        let mut remotes = Vec::new();
+
        for remote in Repository::remotes(self)? {
+
            remotes.push(remote?);
+
        }
+
        Ok(Remotes::from_iter(remotes))
    }

    fn project(&self) -> Result<Doc<Verified>, Error> {
@@ -449,7 +472,7 @@ impl<'r> ReadRepository<'r> for Repository {
    }
}

-
impl<'r> WriteRepository<'r> for Repository {
+
impl WriteRepository for Repository {
    /// Fetch all remotes of a project from the given URL.
    fn fetch(&mut self, url: &git::Url) -> Result<Vec<RefUpdate>, FetchError> {
        // TODO: Have function to fetch specific remotes.
modified radicle/src/storage/refs.rs
@@ -213,9 +213,9 @@ impl SignedRefs<Unverified> {
}

impl SignedRefs<Verified> {
-
    pub fn load<'r, S>(remote: &RemoteId, repo: &S) -> Result<Self, Error>
+
    pub fn load<S>(remote: &RemoteId, repo: &S) -> Result<Self, Error>
    where
-
        S: ReadRepository<'r>,
+
        S: ReadRepository,
    {
        if let Some(oid) = repo.reference_oid(remote, &SIGNATURE_REF)? {
            Self::load_at(oid, remote, repo)
@@ -224,9 +224,9 @@ impl SignedRefs<Verified> {
        }
    }

-
    pub fn load_at<'r, S>(oid: Oid, remote: &RemoteId, repo: &S) -> Result<Self, Error>
+
    pub fn load_at<S>(oid: Oid, remote: &RemoteId, repo: &S) -> Result<Self, Error>
    where
-
        S: storage::ReadRepository<'r>,
+
        S: storage::ReadRepository,
    {
        let refs = repo.blob_at(oid, Path::new(REFS_BLOB_PATH))?;
        let signature = repo.blob_at(oid, Path::new(SIGNATURE_BLOB_PATH))?;
@@ -248,7 +248,7 @@ impl SignedRefs<Verified> {

    /// Save the signed refs to disk.
    /// This creates a new commit on the signed refs branch, and updates the branch pointer.
-
    pub fn save<'r, S: WriteRepository<'r>>(
+
    pub fn save<S: WriteRepository>(
        &self,
        // TODO: This should be part of the signed refs.
        remote: &RemoteId,
modified radicle/src/test/fixtures.rs
@@ -33,9 +33,9 @@ pub fn storage<P: AsRef<Path>, G: Signer>(path: P, signer: G) -> Result<Storage,
}

/// Create a new repository at the given path, and initialize it into a project.
-
pub fn project<'r, P: AsRef<Path>, S: WriteStorage<'r>, G: Signer>(
+
pub fn project<P: AsRef<Path>, S: WriteStorage, G: Signer>(
    path: P,
-
    storage: &'r S,
+
    storage: &S,
    signer: G,
) -> Result<(Id, SignedRefs<Verified>, git2::Repository, git2::Oid), rad::InitError> {
    let (repo, head) = repository(path);
modified radicle/src/test/storage.rs
@@ -53,7 +53,7 @@ impl ReadStorage for MockStorage {
    }
}

-
impl WriteStorage<'_> for MockStorage {
+
impl WriteStorage for MockStorage {
    type Repository = MockRepository;

    fn repository(&self, _proj: &Id) -> Result<Self::Repository, Error> {
@@ -67,13 +67,15 @@ impl WriteStorage<'_> for MockStorage {
    ) -> Result<crate::storage::refs::SignedRefs<Verified>, Error> {
        todo!()
    }
+

+
    fn fetch(&self, _proj_id: &Id, _remote: &Url) -> Result<Vec<RefUpdate>, FetchError> {
+
        Ok(vec![])
+
    }
}

pub struct MockRepository {}

-
impl ReadRepository<'_> for MockRepository {
-
    type Remotes = std::iter::Empty<Result<(RemoteId, Remote<Verified>), refs::Error>>;
-

+
impl ReadRepository for MockRepository {
    fn is_empty(&self) -> Result<bool, git2::Error> {
        Ok(true)
    }
@@ -86,7 +88,7 @@ impl ReadRepository<'_> for MockRepository {
        todo!()
    }

-
    fn remotes(&self) -> Result<Self::Remotes, git2::Error> {
+
    fn remotes(&self) -> Result<Remotes<Verified>, refs::Error> {
        todo!()
    }

@@ -137,7 +139,7 @@ impl ReadRepository<'_> for MockRepository {
    }
}

-
impl WriteRepository<'_> for MockRepository {
+
impl WriteRepository for MockRepository {
    fn fetch(&mut self, _url: &Url) -> Result<Vec<RefUpdate>, FetchError> {
        Ok(vec![])
    }