Radish alpha
h
rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5
Radicle Heartwood Protocol & Stack
Radicle
Git
radicle-node: Allow Communication of Errors
Draft lorenz opened 11 months ago

A new control message for errors is introduced. It refers to a stream and carries the error that was observed on the stream.

7 files changed +302 -77 e0d18b86 989635e5
modified radicle-fetch/src/transport.rs
@@ -26,15 +26,15 @@ use crate::git::repository;
/// processes for communicating during their respective protocols.
pub trait ConnectionStream {
    type Read: io::Read;
-
    type Write: io::Write + SignalEof;
+
    type Write: io::Write + Close;
    type Error: std::error::Error + Send + Sync + 'static;

    fn open(&mut self) -> Result<(&mut Self::Read, &mut Self::Write), Self::Error>;
}

-
/// The ability to signal EOF to the server side so that it can stop
-
/// serving for this fetch request.
-
pub trait SignalEof {
+
/// The ability to signal to the server side so that it can stop
+
/// serving for this fetch request, i.e. close the stream.
+
pub trait Close {
    type Error: std::error::Error + Send + Sync + 'static;

    /// Since the git protocol is tunneled over an existing
@@ -46,9 +46,9 @@ pub trait SignalEof {
    /// Hence, there's no other way for the server to know that we're
    /// done sending requests than to send a special message outside
    /// the git protocol. This message can then be processed by the
-
    /// remote worker to end the protocol. We use the special "eof"
+
    /// remote worker to end the protocol. We use the special "close"
    /// control message for this.
-
    fn eof(&mut self) -> Result<(), Self::Error>;
+
    fn close(&mut self) -> Result<(), Self::Error>;
}

/// Configuration for running a Git `handshake`, `ls-refs`, or
@@ -175,7 +175,7 @@ where
    /// fetch commands.
    pub(crate) fn done(&mut self) -> io::Result<()> {
        let (_, w) = self.stream.open().map_err(io_other)?;
-
        w.eof().map_err(io_other)
+
        w.close().map_err(io_other)
    }
}

modified radicle-node/src/wire.rs
@@ -1,8 +1,10 @@
+
mod error;
mod frame;
mod message;
mod protocol;
mod varint;

+
pub use error::StreamError;
pub use frame::StreamId;
pub use message::{AddressType, MessageType};
pub use protocol::{Control, Wire, WireReader, WireSession, WireWriter};
added radicle-node/src/wire/error.rs
@@ -0,0 +1,166 @@
+
use std::{fmt::Display, io::ErrorKind};
+

+
use super::varint::VarInt;
+

+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+
#[non_exhaustive]
+
pub enum StreamError {
+
    Unknown,
+
    Io(std::io::ErrorKind),
+
    Git,
+
}
+

+
impl Display for StreamError {
+
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+
        match self {
+
            Self::Unknown => write!(f, "StreamError::Unknown"),
+
            Self::Io(kind) => write!(f, "StreamError::Io({})", kind),
+
            Self::Git => write!(f, "StreamError::Git"),
+
        }
+
    }
+
}
+

+
impl From<std::io::Error> for StreamError {
+
    fn from(value: std::io::Error) -> Self {
+
        Self::Io(value.kind())
+
    }
+
}
+

+
impl super::Encode for std::io::ErrorKind {
+
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
        VarInt::from(match self {
+
            ErrorKind::Other => 0u8,
+
            ErrorKind::NotFound => 1,
+
            ErrorKind::PermissionDenied => 2,
+
            ErrorKind::ConnectionRefused => 3,
+
            ErrorKind::ConnectionReset => 4,
+
            ErrorKind::HostUnreachable => 5,
+
            ErrorKind::NetworkUnreachable => 6,
+
            ErrorKind::ConnectionAborted => 7,
+
            ErrorKind::NotConnected => 8,
+
            ErrorKind::AddrInUse => 9,
+
            ErrorKind::AddrNotAvailable => 10,
+
            ErrorKind::NetworkDown => 11,
+
            ErrorKind::BrokenPipe => 12,
+
            ErrorKind::AlreadyExists => 13,
+
            ErrorKind::WouldBlock => 14,
+
            ErrorKind::NotADirectory => 15,
+
            ErrorKind::IsADirectory => 16,
+
            ErrorKind::DirectoryNotEmpty => 17,
+
            ErrorKind::ReadOnlyFilesystem => 18,
+
            ErrorKind::StaleNetworkFileHandle => 19,
+
            ErrorKind::InvalidInput => 20,
+
            ErrorKind::InvalidData => 21,
+
            ErrorKind::TimedOut => 22,
+
            ErrorKind::WriteZero => 23,
+
            ErrorKind::StorageFull => 24,
+
            ErrorKind::NotSeekable => 25,
+
            ErrorKind::QuotaExceeded => 26,
+
            ErrorKind::FileTooLarge => 27,
+
            ErrorKind::ResourceBusy => 28,
+
            ErrorKind::ExecutableFileBusy => 29,
+
            ErrorKind::Deadlock => 30,
+
            ErrorKind::CrossesDevices => 31,
+
            ErrorKind::TooManyLinks => 32,
+
            ErrorKind::ArgumentListTooLong => 33,
+
            ErrorKind::Interrupted => 34,
+
            ErrorKind::Unsupported => 35,
+
            ErrorKind::UnexpectedEof => 36,
+
            ErrorKind::OutOfMemory => 37,
+
            unknown => {
+
                // We conflate the value of "other" and something that we
+
                // don't know yet. This is under the assumption that these
+
                // errors will be really niche. The warning below will
+
                // hopefully point our attention here if this ever becomes
+
                // a problem.
+
                log::warn!(target: "wire", "Encountered unknown error kind: {}", unknown);
+
                0
+
            }
+
        })
+
        .encode(writer)
+
    }
+
}
+

+
impl super::Encode for StreamError {
+
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
        Ok(match self {
+
            Self::Io(kind) => 1u8.encode(writer)? + kind.encode(writer)?,
+
            Self::Git => 2u8.encode(writer)?,
+
            Self::Unknown => 0u8.encode(writer)?,
+
        })
+
    }
+
}
+

+
impl super::Decode for ErrorKind {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, super::Error> {
+
        Ok(match u8::decode(reader)? {
+
            1 => ErrorKind::NotFound,
+
            2 => ErrorKind::PermissionDenied,
+
            3 => ErrorKind::ConnectionRefused,
+
            4 => ErrorKind::ConnectionReset,
+
            5 => ErrorKind::HostUnreachable,
+
            6 => ErrorKind::NetworkUnreachable,
+
            7 => ErrorKind::ConnectionAborted,
+
            8 => ErrorKind::NotConnected,
+
            9 => ErrorKind::AddrInUse,
+
            10 => ErrorKind::AddrNotAvailable,
+
            11 => ErrorKind::NetworkDown,
+
            12 => ErrorKind::BrokenPipe,
+
            13 => ErrorKind::AlreadyExists,
+
            14 => ErrorKind::WouldBlock,
+
            15 => ErrorKind::NotADirectory,
+
            16 => ErrorKind::IsADirectory,
+
            17 => ErrorKind::DirectoryNotEmpty,
+
            18 => ErrorKind::ReadOnlyFilesystem,
+
            19 => ErrorKind::StaleNetworkFileHandle,
+
            20 => ErrorKind::InvalidInput,
+
            21 => ErrorKind::InvalidData,
+
            22 => ErrorKind::TimedOut,
+
            23 => ErrorKind::WriteZero,
+
            24 => ErrorKind::StorageFull,
+
            25 => ErrorKind::NotSeekable,
+
            26 => ErrorKind::QuotaExceeded,
+
            27 => ErrorKind::FileTooLarge,
+
            28 => ErrorKind::ResourceBusy,
+
            29 => ErrorKind::ExecutableFileBusy,
+
            30 => ErrorKind::Deadlock,
+
            31 => ErrorKind::CrossesDevices,
+
            32 => ErrorKind::TooManyLinks,
+
            33 => ErrorKind::ArgumentListTooLong,
+
            34 => ErrorKind::Interrupted,
+
            35 => ErrorKind::Unsupported,
+
            36 => ErrorKind::UnexpectedEof,
+
            37 => ErrorKind::OutOfMemory,
+
            0 | 38u8..=u8::MAX => ErrorKind::Other,
+
        })
+
    }
+
}
+

+
impl super::Decode for StreamError {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, super::Error> {
+
        Ok(match u8::decode(reader)? {
+
            1 => Self::Io(ErrorKind::decode(reader)?),
+
            2 => Self::Git,
+
            _ => Self::Unknown,
+
        })
+
    }
+
}
+

+
impl<T: super::Encode> super::Encode for Option<T> {
+
    fn encode<W: std::io::Write + ?Sized>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
+
        Ok(match self {
+
            None => u8::encode(&0, writer)?,
+
            Some(value) => u8::encode(&1, writer)? + value.encode(writer)?,
+
        })
+
    }
+
}
+

+
impl<T: super::Decode> super::Decode for Option<T> {
+
    fn decode<R: std::io::Read + ?Sized>(reader: &mut R) -> Result<Self, super::Error> {
+
        match u8::decode(reader)? {
+
            0 => Ok(None),
+
            1 => Ok(Some(T::decode(reader)?)),
+
            _ => Err(super::Error::UnexpectedBytes),
+
        }
+
    }
+
}
modified radicle-node/src/wire/frame.rs
@@ -4,6 +4,8 @@ use std::{fmt, io};

use crate::{wire, wire::varint, wire::varint::VarInt, wire::Message, Link, PROTOCOL_VERSION};

+
use super::error::StreamError;
+

/// Protocol version strings all start with the magic sequence `rad`, followed
/// by a version number.
pub const PROTOCOL_VERSION_STRING: Version = Version([b'r', b'a', b'd', PROTOCOL_VERSION]);
@@ -13,7 +15,13 @@ const CONTROL_OPEN: u8 = 0;
/// Control close byte.
const CONTROL_CLOSE: u8 = 1;
/// Control EOF byte.
+
/// This is here for backwards compatibility. Previous versions of would send
+
/// it to signal closing a particular stream. We now use [`CONTROL_CLOSE`]
+
/// instead. Errors, including unexpected end of file, are signaled via
+
/// [`CONTROL_ERROR`].
const CONTROL_EOF: u8 = 2;
+
/// Control error byte.
+
const CONTROL_ERROR: u8 = 3;

/// Protocol version.
#[derive(Debug, PartialEq, Eq)]
@@ -260,12 +268,13 @@ pub enum Control {
        /// The stream to close.
        stream: StreamId,
    },
-
    /// Signal an end-of-file. This can be used to simulate connections terminating
-
    /// without having to close the connection. These control messages are turned into
-
    /// [`io::ErrorKind::UnexpectedEof`] errors on read.
-
    Eof {
-
        /// The stream to send an EOF on.
+
    /// Signal that an error happened on a stream, which left it in an
+
    /// unrecoverable state.
+
    Error {
+
        /// The stream to report an error for.
        stream: StreamId,
+
        /// What happened to that stream.
+
        error: StreamError,
    },
}

@@ -277,13 +286,14 @@ impl wire::Decode for Control {
                let stream = StreamId::decode(reader)?;
                Ok(Control::Open { stream })
            }
-
            CONTROL_CLOSE => {
+
            CONTROL_CLOSE | CONTROL_EOF => {
                let stream = StreamId::decode(reader)?;
                Ok(Control::Close { stream })
            }
-
            CONTROL_EOF => {
+
            CONTROL_ERROR => {
                let stream = StreamId::decode(reader)?;
-
                Ok(Control::Eof { stream })
+
                let error = StreamError::decode(reader)?;
+
                Ok(Control::Error { stream, error })
            }
            other => Err(wire::Error::InvalidControlMessage(other)),
        }
@@ -299,14 +309,15 @@ impl wire::Encode for Control {
                n += CONTROL_OPEN.encode(writer)?;
                n += id.encode(writer)?;
            }
-
            Self::Eof { stream: id } => {
-
                n += CONTROL_EOF.encode(writer)?;
-
                n += id.encode(writer)?;
-
            }
            Self::Close { stream: id } => {
                n += CONTROL_CLOSE.encode(writer)?;
                n += id.encode(writer)?;
            }
+
            Self::Error { stream: id, error } => {
+
                n += CONTROL_ERROR.encode(writer)?;
+
                n += id.encode(writer)?;
+
                n += error.encode(writer)?;
+
            }
        }
        Ok(n)
    }
modified radicle-node/src/wire/protocol.rs
@@ -31,6 +31,7 @@ use crate::service;
use crate::service::io::Io;
use crate::service::FETCH_TIMEOUT;
use crate::service::{session, DisconnectReason, Metrics, Service};
+
use crate::wire::error::StreamError;
use crate::wire::frame;
use crate::wire::frame::{Frame, FrameData, StreamId};
use crate::wire::Encode;
@@ -117,11 +118,6 @@ impl Streams {
        }
    }

-
    /// Get a known stream.
-
    fn get(&self, stream: &StreamId) -> Option<&Stream> {
-
        self.streams.get(stream)
-
    }
-

    /// Get a known stream, mutably.
    fn get_mut(&mut self, stream: &StreamId) -> Option<&mut Stream> {
        self.streams.get_mut(stream)
@@ -414,37 +410,50 @@ where
            // early "close" from the remote. Otherwise, we unregister it here and send the "close"
            // ourselves.
            if let Some(s) = streams.unregister(&task.stream) {
-
                log::debug!(
-
                    target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
-
                    task.stream, task.remote, s.sent_bytes, s.received_bytes
-
                );
-
                let frame = Frame::<service::Message>::control(
-
                    *link,
-
                    frame::Control::Close {
+
                let respond = match &task.result {
+
                    FetchResult::Responder { result: Err(e), .. } => Some(e),
+
                    FetchResult::Responder { result: Ok(()), .. }
+
                    | FetchResult::Initiator { .. } => None,
+
                };
+

+
                let ctrl = match respond {
+
                    Some(e) => frame::Control::Error {
                        stream: task.stream,
+
                        error: StreamError::from(e),
                    },
-
                );
+
                    None => {
+
                        log::debug!(
+
                            target: "wire", "Stream {} of {} closing with {} byte(s) sent and {} byte(s) received",
+
                            task.stream, task.remote, s.sent_bytes, s.received_bytes
+
                        );
+
                        frame::Control::Close {
+
                            stream: task.stream,
+
                        }
+
                    }
+
                };
+

+
                let frame = Frame::<service::Message>::control(*link, ctrl);
                self.actions.push_back(Action::Send(fd, frame.to_bytes()));
+
            } else {
+
                log::debug!(target: "wire", "Peer {nid} connected after a fetch, but stream {} is gone?! Cannot report back.", task.stream);
            }
        } else {
-
            // If the peer disconnected, we'll get here, but we still want to let the service know
-
            // about the fetch result, so we don't return here.
-
            log::warn!(target: "wire", "Peer {nid} is not connected; ignoring fetch result");
-
            return;
-
        };
+
            log::debug!(target: "wire", "Peer {nid} not connected after a fetch?! Cannot report back.");
+
        }

-
        // Only call into the service if we initiated this fetch.
        match task.result {
            FetchResult::Initiator { rid, result } => {
                self.service.fetched(rid, nid, result);
            }
            FetchResult::Responder { rid, result } => {
-
                if let Some(rid) = rid {
-
                    if let Some(err) = result.err() {
-
                        log::info!(target: "wire", "Peer {nid} failed to fetch {rid} from us: {err}");
-
                    } else {
-
                        log::info!(target: "wire", "Peer {nid} fetched {rid} from us successfully");
-
                    }
+
                let rid = match rid {
+
                    Some(rid) => rid.to_string(),
+
                    None => String::from("an unknown repo"),
+
                };
+
                if let Some(err) = result.err() {
+
                    log::info!(target: "wire", "Peer {nid} failed to fetch {rid} from us: {err}");
+
                } else {
+
                    log::info!(target: "wire", "Peer {nid} fetched {rid} from us successfully");
                }
            }
        }
@@ -473,7 +482,9 @@ where
                    Frame::<service::Message>::git(stream, data)
                }
                ChannelEvent::Close => Frame::control(*link, frame::Control::Close { stream }),
-
                ChannelEvent::Eof => Frame::control(*link, frame::Control::Eof { stream }),
+
                ChannelEvent::Error(e) => {
+
                    Frame::control(*link, frame::Control::Error { stream, error: e })
+
                }
            };
            self.actions
                .push_back(reactor::Action::Send(fd, frame.to_bytes()));
@@ -788,32 +799,33 @@ where
                                }
                            }
                            Ok(Some(Frame {
-
                                data: FrameData::Control(frame::Control::Eof { stream }),
+
                                data: FrameData::Control(frame::Control::Close { stream }),
                                ..
                            })) => {
-
                                if let Some(s) = streams.get(&stream) {
-
                                    log::debug!(target: "wire", "Received `end-of-file` on stream {stream} from {nid}");
+
                                log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");

-
                                    if s.channels.send(ChannelEvent::Eof).is_err() {
-
                                        log::error!(target: "wire", "Worker is disconnected; cannot send `EOF`");
-
                                    }
-
                                } else {
-
                                    log::debug!(target: "wire", "Ignoring frame on closed or unknown stream {stream}");
+
                                if let Some(s) = streams.unregister(&stream) {
+
                                    log::debug!(
+
                                        target: "wire",
+
                                        "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
+
                                        s.sent_bytes, s.received_bytes
+
                                    );
+
                                    s.channels.close().ok();
                                }
                            }
                            Ok(Some(Frame {
-
                                data: FrameData::Control(frame::Control::Close { stream }),
+
                                data: FrameData::Control(frame::Control::Error { stream, error }),
                                ..
                            })) => {
-
                                log::debug!(target: "wire", "Received `close` command for stream {stream} from {nid}");
+
                                log::debug!(target: "wire", "Received `error` for stream {stream} from {nid}: {error}");

-
                                if let Some(s) = streams.unregister(&stream) {
+
                                if let Some(mut s) = streams.unregister(&stream) {
                                    log::debug!(
                                        target: "wire",
-
                                        "Stream {stream} of {nid} closed with {} byte(s) sent and {} byte(s) received",
+
                                        "Stream {stream} of {nid} errored with {} byte(s) sent and {} byte(s) received",
                                        s.sent_bytes, s.received_bytes
                                    );
-
                                    s.channels.close().ok();
+
                                    s.channels.error(error).ok();
                                }
                            }
                            Ok(Some(Frame {
modified radicle-node/src/worker.rs
@@ -6,6 +6,7 @@ pub mod fetch;
pub mod garbage;

use std::io;
+
use std::io::ErrorKind;
use std::path::PathBuf;

use crossbeam_channel as chan;
@@ -21,7 +22,7 @@ use radicle_fetch::FetchLimit;
use crate::runtime::{thread, Emitter, Handle};
use crate::service::policy;
use crate::service::policy::SeedingPolicy;
-
use crate::wire::StreamId;
+
use crate::wire::{StreamError, StreamId};

pub use channels::{ChannelEvent, Channels, ChannelsConfig};

@@ -86,10 +87,22 @@ pub enum UploadError {
    PolicyStore(#[from] radicle::node::policy::store::Error),
}

-
impl UploadError {
-
    /// Check if it's an end-of-file error.
-
    pub fn is_eof(&self) -> bool {
-
        matches!(self, UploadError::Io(e) if e.kind() == io::ErrorKind::UnexpectedEof)
+
impl From<&UploadError> for StreamError {
+
    fn from(value: &UploadError) -> Self {
+
        match value {
+
            UploadError::PacketLine(_)
+
            | UploadError::Storage(radicle::storage::Error::InvalidId(_)) => {
+
                StreamError::Io(ErrorKind::InvalidData)
+
            }
+
            UploadError::Unauthorized(_, _)
+
            | UploadError::Storage(radicle::storage::Error::InvalidRef) => {
+
                StreamError::Io(ErrorKind::NotFound)
+
            }
+
            UploadError::Io(e)
+
            | UploadError::Storage(radicle::storage::Error::Io(e))
+
            | UploadError::PolicyStore(policy::Error::Io(e)) => StreamError::Io(e.kind()),
+
            _ => StreamError::Unknown,
+
        }
    }
}

@@ -243,7 +256,7 @@ impl Worker {
                    Err(e) => {
                        return FetchResult::Responder {
                            rid: None,
-
                            result: Err(e.into()),
+
                            result: Err(UploadError::PacketLine(e)),
                        }
                    }
                };
modified radicle-node/src/worker/channels.rs
@@ -8,7 +8,7 @@ use radicle::node::config::FetchPackSizeLimit;
use radicle::node::NodeId;

use crate::runtime::Handle;
-
use crate::wire::StreamId;
+
use crate::wire::{StreamError, StreamId};

/// Maximum size of channel used to communicate with a worker.
/// Note that as long as we're using [`std::io::copy`] to copy data from the
@@ -83,14 +83,13 @@ impl radicle_fetch::transport::ConnectionStream for ChannelsFlush {
}

/// Data that can be sent and received on worker channels.
-
pub enum ChannelEvent<T = Vec<u8>> {
+
pub enum ChannelEvent<T = Vec<u8>, E = StreamError> {
    /// Git protocol data.
    Data(T),
-
    /// A request to close the channel.
-
    Close,
    /// A signal that the git protocol has ended, eg. when the remote fetch closes the
    /// connection.
-
    Eof,
+
    Close,
+
    Error(E),
}

impl<T> From<T> for ChannelEvent<T> {
@@ -104,7 +103,7 @@ impl<T> fmt::Debug for ChannelEvent<T> {
        match self {
            Self::Data(_) => write!(f, "ChannelEvent::Data(..)"),
            Self::Close => write!(f, "ChannelEvent::Close"),
-
            Self::Eof => write!(f, "ChannelEvent::Eof"),
+
            Self::Error(err) => write!(f, "ChannelEvent::Error({})", err),
        }
    }
}
@@ -151,6 +150,10 @@ impl<T: AsRef<[u8]>> Channels<T> {
    pub fn close(self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
        self.sender.close()
    }
+

+
    pub fn error(&mut self, error: StreamError) -> Result<(), chan::SendError<ChannelEvent<T>>> {
+
        self.sender.error(error)
+
    }
}

#[derive(Clone, Copy, Debug)]
@@ -226,8 +229,21 @@ impl Read for ChannelReader<Vec<u8>> {
                self.buffer = io::Cursor::new(data);
                self.buffer.read(buf)
            }
-
            Ok(ChannelEvent::Eof) => Err(io::ErrorKind::UnexpectedEof.into()),
-
            Ok(ChannelEvent::Close) => Err(io::ErrorKind::ConnectionReset.into()),
+
            Ok(ChannelEvent::Close) => Err(io::ErrorKind::UnexpectedEof.into()),
+
            Ok(ChannelEvent::Error(StreamError::Io(kind))) => Err(io::Error::new(
+
                kind,
+
                format!(
+
                    "error reading from stream: other side reported i/o error: {}",
+
                    kind
+
                ),
+
            )),
+
            Ok(ChannelEvent::Error(err)) => Err(io::Error::new(
+
                io::ErrorKind::Other,
+
                format!(
+
                    "error reading from stream: other side reported error: {}",
+
                    err
+
                ),
+
            )),

            Err(chan::RecvTimeoutError::Timeout) => Err(io::Error::new(
                io::ErrorKind::TimedOut,
@@ -243,8 +259,8 @@ impl Read for ChannelReader<Vec<u8>> {

/// Wraps a [`chan::Sender`] and provides it with [`io::Write`].
#[derive(Clone)]
-
struct ChannelWriter<T = Vec<u8>> {
-
    sender: chan::Sender<ChannelEvent<T>>,
+
struct ChannelWriter<T = Vec<u8>, E = StreamError> {
+
    sender: chan::Sender<ChannelEvent<T, E>>,
    timeout: time::Duration,
}

@@ -260,11 +276,11 @@ pub struct ChannelFlushWriter<T = Vec<u8>> {
    remote: NodeId,
}

-
impl radicle_fetch::transport::SignalEof for ChannelFlushWriter<Vec<u8>> {
+
impl radicle_fetch::transport::Close for ChannelFlushWriter<Vec<u8>> {
    type Error = io::Error;

-
    fn eof(&mut self) -> io::Result<()> {
-
        self.writer.send(ChannelEvent::Eof)?;
+
    fn close(&mut self) -> io::Result<()> {
+
        self.writer.send(ChannelEvent::Close)?;
        self.flush()
    }
}
@@ -301,4 +317,9 @@ impl<T: AsRef<[u8]>> ChannelWriter<T> {
    pub fn close(self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
        self.sender.send(ChannelEvent::Close)
    }
+

+
    /// Mark this stream as errored.
+
    pub fn error(&mut self, error: StreamError) -> Result<(), chan::SendError<ChannelEvent<T>>> {
+
        self.sender.send(ChannelEvent::Error(error))
+
    }
}