Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Get replication test to pass
Alexis Sellier committed 3 years ago
commit 067b53215401a90e4554287e196c438c5715e7a6
parent 723cdcf2336b12a47b570853fdfa4cf03ef01d3e
6 files changed +60 -30
modified radicle-node/src/client.rs
@@ -1,5 +1,6 @@
use std::{io, net, thread, time};

+
use crossbeam_channel as chan;
use cyphernet::{Cert, EcSign};
use netservices::resource::NetAccept;
use radicle::profile::Home;
@@ -13,6 +14,7 @@ use crate::control;
use crate::crypto::Signature;
use crate::node::NodeId;
use crate::service::{routing, tracking};
+
use crate::wire;
use crate::wire::Wire;
use crate::worker::{WorkerPool, WorkerReq};
use crate::{crypto, service, LocalTime};
@@ -57,7 +59,7 @@ pub struct Runtime {
    pub id: NodeId,
    pub handle: Handle,
    pub control: thread::JoinHandle<Result<(), control::Error>>,
-
    pub reactor: Reactor<service::Command>,
+
    pub reactor: Reactor<wire::Control>,
    pub pool: WorkerPool,
    pub local_addrs: Vec<net::SocketAddr>,
}
@@ -113,14 +115,7 @@ impl Runtime {
            sig: EcSign::sign(&signer, id.as_slice()),
        };

-
        let (worker_send, worker_recv) = crossbeam_channel::unbounded::<WorkerReq<G>>();
-
        let pool = WorkerPool::with(
-
            8,
-
            time::Duration::from_secs(9),
-
            storage,
-
            worker_recv,
-
            id.to_human(),
-
        );
+
        let (worker_send, worker_recv) = chan::unbounded::<WorkerReq<G>>();
        let mut wire = Wire::new(service, worker_send, cert, signer, proxy, clock);
        let mut local_addrs = Vec::new();

@@ -140,6 +135,15 @@ impl Runtime {
            move || control::listen(node_sock, handle)
        });

+
        let pool = WorkerPool::with(
+
            8,
+
            time::Duration::from_secs(9),
+
            storage,
+
            worker_recv,
+
            handle.clone(),
+
            id.to_human(),
+
        );
+

        Ok(Runtime {
            id,
            control,
modified radicle-node/src/client/handle.rs
@@ -10,6 +10,7 @@ use crate::profile::Home;
use crate::service;
use crate::service::{CommandError, FetchLookup, QueryState};
use crate::service::{NodeId, Sessions};
+
use crate::wire;

/// An error resulting from a handle method.
#[derive(Error, Debug)]
@@ -51,7 +52,7 @@ impl<T> From<chan::SendError<T>> for Error {

pub struct Handle {
    pub(crate) home: Home,
-
    pub(crate) controller: reactor::Controller<service::Command>,
+
    pub(crate) controller: reactor::Controller<wire::Control>,
}

impl Clone for Handle {
@@ -64,12 +65,19 @@ impl Clone for Handle {
}

impl Handle {
-
    pub fn new(home: Home, controller: reactor::Controller<service::Command>) -> Self {
+
    pub fn new(home: Home, controller: reactor::Controller<wire::Control>) -> Self {
        Self { home, controller }
    }

+
    pub fn wakeup(&mut self) -> Result<(), Error> {
+
        // TODO: Handle channel disconnect error correctly.
+
        //       This just returns `BrokenPipe`.
+
        self.controller.cmd(wire::Control::Wakeup)?;
+
        Ok(())
+
    }
+

    fn command(&self, cmd: service::Command) -> Result<(), Error> {
-
        self.controller.cmd(cmd)?;
+
        self.controller.cmd(wire::Control::Command(cmd))?;
        Ok(())
    }
}
modified radicle-node/src/tests/e2e.rs
@@ -317,7 +317,6 @@ fn test_inventory_sync_star() {
}

#[test]
-
#[ignore]
fn test_replication() {
    logger::init(log::Level::Debug);

@@ -353,6 +352,8 @@ fn test_replication() {
    assert_eq!(from, bob.id);
    assert_eq!(updated, vec![]);

+
    log::debug!(target: "test", "Fetch complete with {}", from);
+

    let inventory = alice.handle.inventory().unwrap();
    let alice_refs = alice
        .storage
modified radicle-node/src/wire.rs
@@ -2,7 +2,7 @@ mod message;
mod protocol;

pub use message::{AddressType, MessageType};
-
pub use protocol::{Wire, WireReader, WireSession, WireWriter};
+
pub use protocol::{Control, Wire, WireReader, WireSession, WireWriter};

use std::collections::BTreeMap;
use std::convert::TryFrom;
modified radicle-node/src/wire/protocol.rs
@@ -31,6 +31,12 @@ use crate::worker::{WorkerReq, WorkerResp};
use crate::Link;
use crate::{address, service};

+
#[derive(Debug)]
+
pub enum Control {
+
    Command(service::Command),
+
    Wakeup,
+
}
+

/// Peer session type.
pub type WireSession<G> = CypherSession<G, Sha256>;
/// Peer session type (read-only).
@@ -307,6 +313,20 @@ where
        }
    }

+
    fn wakeup(&mut self) {
+
        let mut completed = Vec::new();
+
        for peer in self.peers.values() {
+
            if let Peer::Upgraded { response, .. } = peer {
+
                if let Ok(resp) = response.try_recv() {
+
                    completed.push(resp);
+
                }
+
            }
+
        }
+
        for resp in completed {
+
            self.fetch_complete(resp);
+
        }
+
    }
+

    fn fetch_complete(&mut self, resp: WorkerResp<G>) {
        log::debug!(target: "transport", "Fetch completed: {:?}", resp.result);

@@ -344,23 +364,11 @@ where
{
    type Listener = NetAccept<WireSession<G>>;
    type Transport = NetTransport<WireSession<G>>;
-
    type Command = service::Command;
+
    type Command = Control;

    fn tick(&mut self, _time: Duration) {
        // FIXME: Change this once a proper timestamp is passed into the function.
        self.service.tick(LocalTime::from(SystemTime::now()));
-

-
        let mut completed = Vec::new();
-
        for peer in self.peers.values() {
-
            if let Peer::Upgraded { response, .. } = peer {
-
                if let Ok(resp) = response.try_recv() {
-
                    completed.push(resp);
-
                }
-
            }
-
        }
-
        for resp in completed {
-
            self.fetch_complete(resp);
-
        }
    }

    fn handle_wakeup(&mut self) {
@@ -487,7 +495,10 @@ where
    }

    fn handle_command(&mut self, cmd: Self::Command) {
-
        self.service.command(cmd);
+
        match cmd {
+
            Control::Command(cmd) => self.service.command(cmd),
+
            Control::Wakeup => self.wakeup(),
+
        }
    }

    fn handle_error(
modified radicle-node/src/worker.rs
@@ -13,6 +13,7 @@ use radicle::storage::{ReadRepository, RefUpdate, WriteRepository, WriteStorage}
use radicle::{git, Storage};
use reactor::poller::popol;

+
use crate::client::handle::Handle;
use crate::service::reactor::Fetch;
use crate::service::{FetchError, FetchResult};
use crate::wire::{WireReader, WireSession, WireWriter};
@@ -36,19 +37,20 @@ struct Worker<G: Signer + EcSign> {
    storage: Storage,
    tasks: chan::Receiver<WorkerReq<G>>,
    timeout: time::Duration,
+
    handle: Handle,
}

impl<G: Signer + EcSign> Worker<G> {
    /// Waits for tasks and runs them. Blocks indefinitely unless there is an error receiving
    /// the next task.
-
    fn run(self) -> Result<(), chan::RecvError> {
+
    fn run(mut self) -> Result<(), chan::RecvError> {
        loop {
            let task = self.tasks.recv()?;
            self.process(task);
        }
    }

-
    fn process(&self, task: WorkerReq<G>) {
+
    fn process(&mut self, task: WorkerReq<G>) {
        let WorkerReq {
            fetch,
            session,
@@ -71,6 +73,8 @@ impl<G: Signer + EcSign> Worker<G> {

        if channel.send(WorkerResp { result, session }).is_err() {
            log::error!("Unable to report fetch result: worker channel disconnected");
+
        } else {
+
            self.handle.wakeup().unwrap();
        }
    }

@@ -249,6 +253,7 @@ impl WorkerPool {
        timeout: time::Duration,
        storage: Storage,
        tasks: chan::Receiver<WorkerReq<G>>,
+
        handle: Handle,
        name: String,
    ) -> Self {
        let mut pool = Vec::with_capacity(capacity);
@@ -256,6 +261,7 @@ impl WorkerPool {
            let worker = Worker {
                tasks: tasks.clone(),
                storage: storage.clone(),
+
                handle: handle.clone(),
                timeout,
            };
            let thread = thread::Builder::new()