| |
} => {
|
| |
log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
|
| |
|
| - |
let result = self.fetch(*rid, *remote, stream, namespaces, channels);
|
| - |
if let Err(err) = &result {
|
| - |
log::error!(target: "worker", "Fetch error: {err}");
|
| - |
}
|
| - |
result
|
| + |
self.fetch(*rid, *remote, stream, namespaces, channels)
|
| |
}
|
| |
Fetch::Responder { .. } => {
|
| |
log::debug!(target: "worker", "Worker processing incoming fetch..");
|
| |
mut channels: Channels,
|
| |
) -> Result<Vec<RefUpdate>, FetchError> {
|
| |
let staging = fetch::StagingPhaseInitial::new(&self.storage, rid, namespaces.clone())?;
|
| - |
|
| - |
self._fetch(
|
| + |
match self._fetch(
|
| |
&staging.repo,
|
| |
remote,
|
| |
staging.refspecs(),
|
| |
stream,
|
| |
&mut channels,
|
| - |
)?;
|
| - |
if let Err(e) = self.handle.flush(remote, stream) {
|
| - |
log::error!(target: "worker", "Error flushing worker stream: {e}");
|
| + |
) {
|
| + |
Ok(()) => log::debug!(target: "worker", "Initial fetch for {rid} exited successfully"),
|
| + |
Err(e) => log::error!(target: "worker", "Initial fetch for {rid} failed: {e}"),
|
| |
}
|
| + |
Self::eof(remote, stream, &mut channels.sender, &mut self.handle)?;
|
| |
|
| - |
let staging = match staging.into_final().map_err(FetchError::from) {
|
| - |
Ok(staging) => staging,
|
| - |
Err(e) => return Err(e),
|
| - |
};
|
| - |
self._fetch(
|
| + |
let staging = staging.into_final()?;
|
| + |
match self._fetch(
|
| |
&staging.repo,
|
| |
remote,
|
| |
staging.refspecs(),
|
| |
stream,
|
| |
&mut channels,
|
| - |
)?;
|
| - |
if let Err(e) = self.handle.flush(remote, stream) {
|
| - |
log::error!(target: "worker", "Error flushing worker stream: {e}");
|
| + |
) {
|
| + |
Ok(()) => log::debug!(target: "worker", "Final fetch for {rid} exited successfully"),
|
| + |
Err(e) => log::error!(target: "worker", "Final fetch for {rid} failed: {e}"),
|
| |
}
|
| + |
Self::eof(remote, stream, &mut channels.sender, &mut self.handle)?;
|
| + |
|
| |
staging.transfer().map_err(FetchError::from)
|
| |
}
|
| |
|
| |
tunnel.run(self.timeout)?;
|
| |
|
| |
let result = child.wait()?;
|
| - |
let result = if result.success() {
|
| - |
log::debug!(target: "worker", "Fetch for {} exited successfully", rid);
|
| + |
if result.success() {
|
| |
Ok(())
|
| |
} else {
|
| - |
log::error!(target: "worker", "Fetch for {} failed", rid);
|
| |
Err(FetchError::CommandFailed {
|
| |
code: result.code().unwrap_or(1),
|
| |
})
|
| - |
};
|
| + |
}
|
| + |
}
|
| |
|
| + |
fn eof(
|
| + |
remote: NodeId,
|
| + |
stream: StreamId,
|
| + |
sender: &mut ChannelWriter,
|
| + |
handle: &mut Handle,
|
| + |
) -> Result<(), FetchError> {
|
| |
log::debug!(target: "worker", "Sending `EOF` to remote..");
|
| |
|
| - |
if let Err(e) = channels.sender.eof() {
|
| + |
if let Err(e) = sender.eof() {
|
| |
log::error!(target: "worker", "Fetch error: error sending `EOF` message: {e}");
|
| + |
return Err(e.into());
|
| |
}
|
| - |
result
|
| + |
if let Err(e) = handle.flush(remote, stream) {
|
| + |
log::error!(target: "worker", "Error flushing worker stream: {e}");
|
| + |
}
|
| + |
Ok(())
|
| |
}
|
| |
}
|
| |
|