Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Properly close worker channels
Alexis Sellier committed 3 years ago
commit 31647521fa35643dbc1143752cd2b003922dd1b7
parent 267aba59016e312039a0724a1e98f89712b2adfc
5 files changed +105 -13
modified radicle-node/src/service.rs
@@ -604,8 +604,11 @@ where
    ) {
        let result = match result {
            Ok(updated) => {
-
                log::debug!(target: "service", "Fetched {rid} from {remote}");
+
                log::debug!(target: "service", "Fetched {rid} from {remote} successfully");

+
                for update in &updated {
+
                    log::debug!(target: "service", "Ref updated: {update} for {rid}");
+
                }
                self.emitter.emit(Event::RefsFetched {
                    remote,
                    rid,
modified radicle-node/src/tests/e2e.rs
@@ -2,6 +2,7 @@ use std::{collections::HashSet, thread, time};

use radicle::crypto::{test::signer::MockSigner, Signer};
use radicle::node::{FetchResult, Handle as _};
+
use radicle::prelude::Id;
use radicle::storage::{ReadRepository, ReadStorage};
use radicle::{assert_matches, rad};

@@ -458,6 +459,86 @@ fn test_fetch_up_to_date() {
}

#[test]
+
fn test_concurrent_fetches() {
+
    logger::init(log::Level::Debug);
+

+
    let tmp = tempfile::tempdir().unwrap();
+
    let mut alice = Node::init(tmp.path());
+
    let mut bob = Node::init(tmp.path());
+
    let mut bob_repos: HashSet<Id> = HashSet::from_iter([
+
        bob.project("bob-1", ""),
+
        bob.project("bob-2", ""),
+
        bob.project("bob-3", ""),
+
        bob.project("bob-4", ""),
+
        bob.project("bob-5", ""),
+
    ]);
+
    let mut alice_repos: HashSet<Id> = HashSet::from_iter([
+
        alice.project("alice-1", ""),
+
        alice.project("alice-2", ""),
+
        alice.project("alice-3", ""),
+
        alice.project("alice-4", ""),
+
        alice.project("alice-5", ""),
+
    ]);
+

+
    let mut alice = alice.spawn(service::Config::default());
+
    let mut bob = bob.spawn(service::Config::default());
+

+
    let alice_events = alice.handle.events();
+
    let bob_events = bob.handle.events();
+

+
    for rid in &bob_repos {
+
        alice.handle.track_repo(*rid, Scope::All).unwrap();
+
    }
+
    for rid in &alice_repos {
+
        bob.handle.track_repo(*rid, Scope::All).unwrap();
+
    }
+
    alice.connect(&bob);
+

+
    while !bob_repos.is_empty() {
+
        match alice_events.recv().unwrap() {
+
            service::Event::RefsFetched { rid, updated, .. } if !updated.is_empty() => {
+
                bob_repos.remove(&rid);
+
                log::debug!(target: "test", "{} fetched {rid} ({} left)",alice.id, bob_repos.len());
+
            }
+
            _ => {}
+
        }
+
    }
+

+
    while !alice_repos.is_empty() {
+
        match bob_events.recv().unwrap() {
+
            service::Event::RefsFetched { rid, updated, .. } if !updated.is_empty() => {
+
                alice_repos.remove(&rid);
+
                log::debug!(target: "test", "{} fetched {rid} ({} left)", bob.id, alice_repos.len());
+
            }
+
            _ => {}
+
        }
+
    }
+

+
    for rid in &bob_repos {
+
        let (_, doc) = alice
+
            .storage
+
            .repository(*rid)
+
            .unwrap()
+
            .identity_doc()
+
            .unwrap();
+
        let proj = doc.verified().unwrap().project().unwrap();
+

+
        assert!(proj.name().starts_with("bob"));
+
    }
+
    for rid in &alice_repos {
+
        let (_, doc) = bob
+
            .storage
+
            .repository(*rid)
+
            .unwrap()
+
            .identity_doc()
+
            .unwrap();
+
        let proj = doc.verified().unwrap().project().unwrap();
+

+
        assert!(proj.name().starts_with("alice"));
+
    }
+
}
+

+
#[test]
#[ignore = "failing"]
#[should_panic]
// TODO: This test currently passes but the behavior is wrong. The test should not panic.
modified radicle-node/src/wire/protocol.rs
@@ -532,7 +532,7 @@ where
                                data: FrameData::Control(frame::Control::Open { stream }),
                                ..
                            })) => {
-
                                log::debug!(target: "wire", "Received stream open for id={stream}");
+
                                log::debug!(target: "wire", "Received stream open for id={stream} from {nid}");

                                let Some(WorkerChannels {
                                    sender: work_send,
@@ -568,9 +568,11 @@ where
                                data: FrameData::Control(frame::Control::Close { stream }),
                                ..
                            })) => {
-
                                log::debug!(target: "wire", "Received stream close command for id={stream}");
+
                                log::debug!(target: "wire", "Received stream close command for id={stream} from {nid}");

-
                                streams.unregister(&stream);
+
                                if let Some(chans) = streams.unregister(&stream) {
+
                                    chans.sender.send(ChannelEvent::Close).ok();
+
                                }
                            }
                            Ok(Some(Frame {
                                data: FrameData::Gossip(msg),
@@ -812,7 +814,7 @@ where
                    remote,
                    namespaces,
                } => {
-
                    log::debug!(target: "wire", "Processing fetch..");
+
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");

                    let (fd, Peer::Connected { link, streams,  .. }) =
                        self.fd_by_id_mut(&remote) else {
@@ -820,7 +822,7 @@ where
                        };
                    let (stream, channels) = streams.open();

-
                    log::debug!(target: "wire", "Opened new stream with id={stream} for rid={rid}");
+
                    log::debug!(target: "wire", "Opened new stream with id={stream} for rid={rid} remote={remote}");

                    let link = *link;
                    let task = Task {
modified radicle-node/src/worker.rs
@@ -177,7 +177,7 @@ impl Worker {
        let channels = Channels::new(send, recv);
        let result = self._process(&fetch, stream, channels);

-
        log::debug!(target: "worker", "Sending response back to service..");
+
        log::trace!(target: "worker", "Sending response back to service..");

        if self
            .handle
@@ -222,6 +222,8 @@ impl Worker {
                        Err(e) => return Err(e.into()),
                    }
                }
+
                log::debug!(target: "worker", "Upload process on stream {stream} exited successfully");
+

                Ok(vec![])
            }
        }
@@ -277,13 +279,17 @@ impl Worker {
        pktline_r: &mut pktline::Reader<&mut ChannelReader>,
        stream_w: &mut ChannelWriter,
    ) -> Result<ControlFlow<()>, UploadError> {
-
        log::debug!(target: "worker", "Waiting for Git request pktline for..");
+
        log::debug!(target: "worker", "Waiting for Git request pktline from {}..", fetch.remote());

        // Read the request packet line to make sure the repository being requested matches what
        // we expect, and that the service requested is valid.
        let (rid, request) = match pktline_r.read_request_pktline() {
            Ok((req, pktline)) => (req.repo, pktline),
            Err(err) if err.kind() == io::ErrorKind::ConnectionReset => {
+
                log::debug!(
+
                    target: "worker",
+
                    "Upload process received stream `close` from {}", fetch.remote()
+
                );
                return Ok(ControlFlow::Break(()));
            }
            Err(err) => {
@@ -332,7 +338,7 @@ impl Worker {
                // This is the expected error when the daemon disconnects.
                if e.kind() == io::ErrorKind::UnexpectedEof {
                    log::debug!(target: "worker", "Daemon closed the git connection for {rid}");
-
                    log::debug!(target: "worker", "Waiting for EOF from remote..");
+
                    log::debug!(target: "worker", "Waiting for end-of-file from remote..");

                    stream_r.wait_for_eof()?;

@@ -437,10 +443,10 @@ impl Worker {
        sender: &mut ChannelWriter,
        handle: &mut Handle,
    ) -> Result<(), FetchError> {
-
        log::debug!(target: "worker", "Sending `EOF` to remote..");
+
        log::debug!(target: "worker", "Sending end-of-file to remote {remote}..");

        if let Err(e) = sender.eof() {
-
            log::error!(target: "worker", "Fetch error: error sending `EOF` message: {e}");
+
            log::error!(target: "worker", "Fetch error: error sending end-of-file message: {e}");
            return Err(e.into());
        }
        if let Err(e) = handle.flush(remote, stream) {
modified radicle-node/src/worker/fetch.rs
@@ -125,7 +125,7 @@ impl<'a> StagingPhaseInitial<'a> {
    pub fn into_final(self) -> Result<StagingPhaseFinal<'a>, error::Transition> {
        let trusted = match &self.repo {
            StagedRepository::Cloning(repo) => {
-
                log::debug!(target: "worker", "Loading remotes for clone");
+
                log::debug!(target: "worker", "Loading remotes for clone of {}", self.repo.id);
                let oid = ReadRepository::identity_head(repo)?;
                log::trace!(target: "worker", "Loading 'rad/id' @ {oid}");
                let (doc, _) = Doc::<Unverified>::load_at(oid, repo)?;
@@ -139,7 +139,7 @@ impl<'a> StagingPhaseInitial<'a> {
                trusted
            }
            StagedRepository::Fetching(repo) => {
-
                log::debug!(target: "worker", "Loading remotes for fetching");
+
                log::debug!(target: "worker", "Loading remotes for fetching of {}", self.repo.id);
                match self.namespaces.clone() {
                    Namespaces::All => {
                        let mut trusted = repo.remote_ids()?.collect::<Result<HashSet<_>, _>>()?;