Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
radicle: wire up radicle-fetch
Fintan Halpenny committed 2 years ago
commit ea7f26dbdf78c0827074f3567aa7bf298ad550bb
parent 981172aa4f3e84c076a994c887735dc392832e1d
23 files changed +694 -1340
modified Cargo.lock
@@ -2545,10 +2545,12 @@ dependencies = [
 "log",
 "netservices",
 "nonempty 0.8.1",
+
 "once_cell",
 "qcheck",
 "qcheck-macros",
 "radicle",
 "radicle-crypto",
+
 "radicle-fetch",
 "radicle-git-ext",
 "scrypt",
 "serde",
modified radicle-cli/examples/rad-id-conflict.md
@@ -20,7 +20,7 @@ $ rad id update --title "Edit project name" --description "" --payload "xyz.radi
```
``` ~bob
$ rad id update --title "Edit project name" --description "" --payload "xyz.radicle.project" "name" '"wood"' -q
-
fae22d07f7d386b89f14ac353b079c9eef71f948
+
2e7f195fde751b73aabe4730bd1047c87893a787
```

When Alice syncs with Bob, she notices the problem: there are two active
@@ -34,8 +34,8 @@ $ rad id list
╭─────────────────────────────────────────────────────────────────────────────────╮
│ ●   ID        Title               Author                     Status     Created │
├─────────────────────────────────────────────────────────────────────────────────┤
-
│ ●   fae22d0   Edit project name   bob      z6Mkt67…v4N1tRk   active     now     │
│ ●   6c07e4e   Edit project name   alice    (you)             active     now     │
+
│ ●   2e7f195   Edit project name   bob      z6Mkt67…v4N1tRk   active     now     │
│ ●   bd41a1c   Add Bob             alice    (you)             accepted   now     │
│ ●   2317f74   Initial revision    alice    (you)             accepted   now     │
╰─────────────────────────────────────────────────────────────────────────────────╯
@@ -45,13 +45,13 @@ This isn't a problem as long as we don't try to accept both. So let's accept
Bob's:

``` ~alice
-
$ rad id accept fae22d0 -q
+
$ rad id accept 2e7f195 -q
$ rad id list
╭─────────────────────────────────────────────────────────────────────────────────╮
│ ●   ID        Title               Author                     Status     Created │
├─────────────────────────────────────────────────────────────────────────────────┤
-
│ ●   fae22d0   Edit project name   bob      z6Mkt67…v4N1tRk   accepted   now     │
│ ●   6c07e4e   Edit project name   alice    (you)             stale      now     │
+
│ ●   2e7f195   Edit project name   bob      z6Mkt67…v4N1tRk   accepted   now     │
│ ●   bd41a1c   Add Bob             alice    (you)             accepted   now     │
│ ●   2317f74   Initial revision    alice    (you)             accepted   now     │
╰─────────────────────────────────────────────────────────────────────────────────╯
modified radicle-cli/examples/rad-id-multi-delegate.md
@@ -8,10 +8,10 @@ $ rad sync --fetch rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji
✓ Fetching rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji from z6MknSL…StBU8Vi..
✓ Fetched repository from 1 seed(s)
$ rad id update --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --title "Add Eve" --description "" --delegate did:key:z6MkedTZGJGqgQ2py2b8kGecfxdt2yRdHWF6JpaZC47fovFn --no-confirm
-
✓ Identity revision 9d9031417f1d86a6c0ed5ec2c4bf5820dca0eec9 created
+
✓ Identity revision e4b005ae70116fbe91340fef43127e86ab17e476 created
╭────────────────────────────────────────────────────────────────────────╮
│ Title    Add Eve                                                       │
-
│ Revision 9d9031417f1d86a6c0ed5ec2c4bf5820dca0eec9                      │
+
│ Revision e4b005ae70116fbe91340fef43127e86ab17e476                      │
│ Blob     4c7fd4c7b7d7fd5d7088a7c952556fab99a034e9                      │
│ Author   did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk      │
│ State    active                                                        │
@@ -47,11 +47,11 @@ $ rad sync --fetch rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji
$ rad inspect rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --delegates
did:key:z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi (alice)
did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk (bob)
-
$ rad id accept 9d9031417f1d86a6c0ed5ec2c4bf5820dca0eec9 --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --no-confirm
-
✓ Revision 9d9031417f1d86a6c0ed5ec2c4bf5820dca0eec9 accepted
+
$ rad id accept e4b005ae70116fbe91340fef43127e86ab17e476 --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --no-confirm
+
✓ Revision e4b005ae70116fbe91340fef43127e86ab17e476 accepted
╭────────────────────────────────────────────────────────────────────────╮
│ Title    Add Eve                                                       │
-
│ Revision 9d9031417f1d86a6c0ed5ec2c4bf5820dca0eec9                      │
+
│ Revision e4b005ae70116fbe91340fef43127e86ab17e476                      │
│ Blob     4c7fd4c7b7d7fd5d7088a7c952556fab99a034e9                      │
│ Author   did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk      │
│ State    accepted                                                      │
@@ -68,7 +68,7 @@ did:key:z6MkedTZGJGqgQ2py2b8kGecfxdt2yRdHWF6JpaZC47fovFn

``` ~alice
$ rad id update --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --title "Make private" --description "" --visibility private --no-confirm -q
-
1483814d54ad1321fc4ddb1f8bf7d90454de1790
+
3fba3b17851cde9814f35acce4f740f53fb63dd7
```

We can list all revisions:
@@ -78,8 +78,8 @@ $ rad id list
╭────────────────────────────────────────────────────────────────────────────────╮
│ ●   ID        Title              Author                     Status     Created │
├────────────────────────────────────────────────────────────────────────────────┤
-
│ ●   1483814   Make private       alice    (you)             active     now     │
-
│ ●   9d90314   Add Eve            bob      z6Mkt67…v4N1tRk   accepted   now     │
+
│ ●   3fba3b1   Make private       alice    (you)             active     now     │
+
│ ●   e4b005a   Add Eve            bob      z6Mkt67…v4N1tRk   accepted   now     │
│ ●   5666f74   Add Bob            alice    (you)             accepted   now     │
│ ●   2317f74   Initial revision   alice    (you)             accepted   now     │
╰────────────────────────────────────────────────────────────────────────────────╯
@@ -88,23 +88,23 @@ $ rad id list
Despite being a delegate, Bob can't edit or redact Alice's revision:

``` ~bob (fail)
-
$ rad id redact 1483814d54ad1321fc4ddb1f8bf7d90454de1790
+
$ rad id redact 3fba3b17851cde9814f35acce4f740f53fb63dd7
[..]
```
``` ~bob (fail)
-
$ rad id edit --title "Boo!" --description "Boo!" 1483814d54ad1321fc4ddb1f8bf7d90454de1790
+
$ rad id edit --title "Boo!" --description "Boo!" 3fba3b17851cde9814f35acce4f740f53fb63dd7
[..]
```

Alice can edit:

``` ~alice
-
$ rad id edit --title "Make private" --description "Privacy is cool." 1483814d54ad1321fc4ddb1f8bf7d90454de1790
-
✓ Revision 1483814d54ad1321fc4ddb1f8bf7d90454de1790 edited
-
$ rad id show 1483814d54ad1321fc4ddb1f8bf7d90454de1790
+
$ rad id edit --title "Make private" --description "Privacy is cool." 3fba3b17851cde9814f35acce4f740f53fb63dd7
+
✓ Revision 3fba3b17851cde9814f35acce4f740f53fb63dd7 edited
+
$ rad id show 3fba3b17851cde9814f35acce4f740f53fb63dd7
╭────────────────────────────────────────────────────────────────────────╮
│ Title    Make private                                                  │
-
│ Revision 1483814d54ad1321fc4ddb1f8bf7d90454de1790                      │
+
│ Revision 3fba3b17851cde9814f35acce4f740f53fb63dd7                      │
│ Blob     79bc5c39103e811a3c9f11744f9a4029f063a5de                      │
│ Author   did:key:z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi      │
│ State    active                                                        │
@@ -142,21 +142,21 @@ $ rad id show 1483814d54ad1321fc4ddb1f8bf7d90454de1790
And she can redact her revision:

``` ~alice
-
$ rad id redact 1483814d54ad1321fc4ddb1f8bf7d90454de1790
-
✓ Revision 1483814d54ad1321fc4ddb1f8bf7d90454de1790 redacted
+
$ rad id redact 3fba3b17851cde9814f35acce4f740f53fb63dd7
+
✓ Revision 3fba3b17851cde9814f35acce4f740f53fb63dd7 redacted
```
``` ~alice (fail)
-
$ rad id show 1483814d54ad1321fc4ddb1f8bf7d90454de1790
-
✗ Error: revision `1483814d54ad1321fc4ddb1f8bf7d90454de1790` not found
+
$ rad id show 3fba3b17851cde9814f35acce4f740f53fb63dd7
+
✗ Error: revision `3fba3b17851cde9814f35acce4f740f53fb63dd7` not found
```

Finally, Alice can also propose to remove Bob:
``` ~alice
$ rad id update --repo rad:z42hL2jL4XNk6K8oHQaSWfMgCL7ji --title "Remove Bob" --description "" --rescind did:key:z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk --no-confirm
-
✓ Identity revision ea60049b8265f60f3dcca21798ce50ef67779421 created
+
✓ Identity revision 572b3189cf5a9cdba5f77db183a29a50562ba318 created
╭────────────────────────────────────────────────────────────────────────╮
│ Title    Remove Bob                                                    │
-
│ Revision ea60049b8265f60f3dcca21798ce50ef67779421                      │
+
│ Revision 572b3189cf5a9cdba5f77db183a29a50562ba318                      │
│ Blob     7109c1c201c223dd4e9fdb10f7330dc6f0310258                      │
│ Author   did:key:z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi      │
│ State    active                                                        │
modified radicle-cli/examples/rad-init-private-clone.md
@@ -8,7 +8,7 @@ $ rad ls
```
``` ~bob (fail)
$ rad sync rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu --fetch --seed z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi --timeout 1
-
✗ Fetching rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu from z6MknSL…StBU8Vi.. error: connection reset
+
✗ Fetching rad:z2ug5mwNKZB8KGpBDRTrWHAMbvHCu from z6MknSL…StBU8Vi.. error: failed to perform fetch handshake
✗ Error: repository fetch from 1 seed(s) failed
```

modified radicle-cli/examples/rad-patch-pull-update.md
@@ -49,22 +49,22 @@ $ cd heartwood
$ git checkout -b bob/feature -q
$ git commit --allow-empty -m "Bob's commit #1" -q
$ git push rad -o sync -o patch.message="Bob's patch" HEAD:refs/patches
-
✓ Patch 6d260fc8388e74d8fefb5dabc5a798e125ec3cf9 opened
+
✓ Patch a84ea2e8626a86442910a2e70837561538c20efb opened
✓ Synced with 1 node(s)
To rad://zhbMU4DUXrzB8xT6qAJh6yZ7bFMK/z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk
 * [new reference]   HEAD -> refs/patches
```
``` ~bob
$ git status --short --branch
-
## bob/feature...rad/patches/6d260fc8388e74d8fefb5dabc5a798e125ec3cf9
+
## bob/feature...rad/patches/a84ea2e8626a86442910a2e70837561538c20efb
```

Alice checks it out.

``` ~alice
-
$ rad patch checkout 6d260fc8388e74d8fefb5dabc5a798e125ec3cf9
-
✓ Switched to branch patch/6d260fc
-
✓ Branch patch/6d260fc setup to track rad/patches/6d260fc8388e74d8fefb5dabc5a798e125ec3cf9
+
$ rad patch checkout a84ea2e
+
✓ Switched to branch patch/a84ea2e
+
✓ Branch patch/a84ea2e setup to track rad/patches/a84ea2e8626a86442910a2e70837561538c20efb
$ git show
commit bdcdb30b3c0f513620dd0f1c24ff8f4f71de956b
Author: radicle <radicle@localhost>
@@ -78,19 +78,19 @@ Bob then updates the patch.
``` ~bob (stderr)
$ git commit --allow-empty -m "Bob's commit #2" -q
$ git push rad -o sync -o patch.message="Updated."
-
✓ Patch 6d260fc updated to 750081b35a3f831f428653bd2240eb4674ccae71
+
✓ Patch a84ea2e updated to 9fd6bb156bc899aef7119e6d97c6cf850639a7df
✓ Synced with 1 node(s)
To rad://zhbMU4DUXrzB8xT6qAJh6yZ7bFMK/z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk
-
   bdcdb30..cad2666  bob/feature -> patches/6d260fc8388e74d8fefb5dabc5a798e125ec3cf9
+
   bdcdb30..cad2666  bob/feature -> patches/a84ea2e8626a86442910a2e70837561538c20efb
```

Alice pulls the update.

``` ~alice
-
$ rad patch show 6d260fc
+
$ rad patch show a84ea2e
╭─────────────────────────────────────────────────────────────────────╮
│ Title    Bob's patch                                                │
-
│ Patch    6d260fc8388e74d8fefb5dabc5a798e125ec3cf9                   │
+
│ Patch    a84ea2e8626a86442910a2e70837561538c20efb                   │
│ Author   bob z6Mkt67…v4N1tRk                                        │
│ Head     cad2666a8a2250e4dee175ed5044be2c251ff08b                   │
│ Commits  ahead 2, behind 0                                          │
@@ -100,16 +100,16 @@ $ rad patch show 6d260fc
│ bdcdb30 Bob's commit #1                                             │
├─────────────────────────────────────────────────────────────────────┤
│ ● opened by bob z6Mkt67…v4N1tRk now                                 │
-
│ ↑ updated to 750081b35a3f831f428653bd2240eb4674ccae71 (cad2666) now │
+
│ ↑ updated to 9fd6bb156bc899aef7119e6d97c6cf850639a7df (cad2666) now │
╰─────────────────────────────────────────────────────────────────────╯
$ git ls-remote rad
f2de534b5e81d7c6e2dcaf58c3dd91573c0a0354	refs/heads/master
-
cad2666a8a2250e4dee175ed5044be2c251ff08b	refs/heads/patches/6d260fc8388e74d8fefb5dabc5a798e125ec3cf9
+
cad2666a8a2250e4dee175ed5044be2c251ff08b	refs/heads/patches/a84ea2e8626a86442910a2e70837561538c20efb
```
``` ~alice
$ git fetch rad
$ git status --short --branch
-
## patch/6d260fc...rad/patches/6d260fc8388e74d8fefb5dabc5a798e125ec3cf9 [behind 1]
+
## patch/a84ea2e...rad/patches/a84ea2e8626a86442910a2e70837561538c20efb [behind 1]
```
``` ~alice
$ git pull
modified radicle-cli/examples/workflow/3-issues.md
@@ -7,7 +7,7 @@ Let's say the new car you are designing with your peers has a problem with its f
$ rad issue open --title "flux capacitor underpowered" --description "Flux capacitor power requirements exceed current supply" --no-announce
╭─────────────────────────────────────────────────────────╮
│ Title   flux capacitor underpowered                     │
-
│ Issue   d457c205de780d37d1c16efbe5333aed4662a6c3        │
+
│ Issue   b0c5579944e37d0bf3455bf1cec9c91596c6f47a        │
│ Author  bob (you)                                       │
│ Status  open                                            │
│                                                         │
@@ -22,7 +22,7 @@ $ rad issue list
╭──────────────────────────────────────────────────────────────────────────────────────────╮
│ ●   ID        Title                         Author           Labels   Assignees   Opened │
├──────────────────────────────────────────────────────────────────────────────────────────┤
-
│ ●   d457c20   flux capacitor underpowered   bob      (you)                        now    │
+
│ ●   b0c5579   flux capacitor underpowered   bob      (you)                        now    │
╰──────────────────────────────────────────────────────────────────────────────────────────╯
```

@@ -31,6 +31,6 @@ found an important detail about the car's power requirements. It will help
whoever works on a fix.

```
-
$ rad issue comment d457c205de780d37d1c16efbe5333aed4662a6c3 --message 'The flux capacitor needs 1.21 Gigawatts' -q
-
2e8b9538aedaea418e90e90d19e61edf3f022933
+
$ rad issue comment b0c5579944e37d0bf3455bf1cec9c91596c6f47a --message 'The flux capacitor needs 1.21 Gigawatts' -q
+
c45d2acdd1de992c0127fd44519b8a5260695497
```
modified radicle-cli/examples/workflow/4-patching-contributor.md
@@ -26,7 +26,7 @@ Once the code is ready, we open a patch with our changes.

``` (stderr)
$ git push rad -o no-sync -o patch.message="Define power requirements" -o patch.message="See details." HEAD:refs/patches
-
✓ Patch 4bfb6fe940f815e3fcce6a2796e051df85db9fe1 opened
+
✓ Patch f5e2be41827ee9f80bcdcd02f4f0451287b36229 opened
To rad://z42hL2jL4XNk6K8oHQaSWfMgCL7ji/z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk
 * [new reference]   HEAD -> refs/patches
```
@@ -38,12 +38,12 @@ $ rad patch
╭────────────────────────────────────────────────────────────────────────────────╮
│ ●  ID       Title                      Author         Head     +   -   Updated │
├────────────────────────────────────────────────────────────────────────────────┤
-
│ ●  4bfb6fe  Define power requirements  bob     (you)  3e674d1  +0  -0  now     │
+
│ ●  f5e2be4  Define power requirements  bob     (you)  3e674d1  +0  -0  now     │
╰────────────────────────────────────────────────────────────────────────────────╯
-
$ rad patch show 4bfb6fe940f815e3fcce6a2796e051df85db9fe1
+
$ rad patch show f5e2be41827ee9f80bcdcd02f4f0451287b36229
╭────────────────────────────────────────────────────╮
│ Title     Define power requirements                │
-
│ Patch     4bfb6fe940f815e3fcce6a2796e051df85db9fe1 │
+
│ Patch     f5e2be41827ee9f80bcdcd02f4f0451287b36229 │
│ Author    bob (you)                                │
│ Head      3e674d1a1df90807e934f9ae5da2591dd6848a33 │
│ Branches  flux-capacitor-power                     │
@@ -62,7 +62,7 @@ We can also confirm that the patch branch is in storage:

```
$ git ls-remote rad://z42hL2jL4XNk6K8oHQaSWfMgCL7ji/z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk refs/heads/patches/*
-
3e674d1a1df90807e934f9ae5da2591dd6848a33	refs/heads/patches/4bfb6fe940f815e3fcce6a2796e051df85db9fe1
+
3e674d1a1df90807e934f9ae5da2591dd6848a33	refs/heads/patches/f5e2be41827ee9f80bcdcd02f4f0451287b36229
```

Wait, let's add a README too! Just for fun.
@@ -77,14 +77,14 @@ $ git commit --message "Add README, just for the fun"
```
``` (stderr) RAD_SOCKET=/dev/null
$ git push -o patch.message="Add README, just for the fun"
-
✓ Patch 4bfb6fe updated to 7782e60eb51b6e852abb184b092249327354c625
+
✓ Patch f5e2be4 updated to 68143c579935d115e1551b88154cc41763c43e1f
To rad://z42hL2jL4XNk6K8oHQaSWfMgCL7ji/z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk
-
   3e674d1..27857ec  flux-capacitor-power -> patches/4bfb6fe940f815e3fcce6a2796e051df85db9fe1
+
   3e674d1..27857ec  flux-capacitor-power -> patches/f5e2be41827ee9f80bcdcd02f4f0451287b36229
```

And let's leave a quick comment for our team:

```
-
$ rad patch comment 4bfb6fe940f815e3fcce6a2796e051df85db9fe1 --message 'I cannot wait to get back to the 90s!' -q
-
a3ba1df99fbd874affc790c95cd18607940f1a90
+
$ rad patch comment f5e2be41827ee9f80bcdcd02f4f0451287b36229 --message 'I cannot wait to get back to the 90s!' -q
+
9b12eacdf5d326bdd16ed971809a9a88268ca435
```
modified radicle-cli/examples/workflow/5-patching-maintainer.md
@@ -22,7 +22,7 @@ $ git fetch bob
✓ Synced with 1 peer(s)
From rad://z42hL2jL4XNk6K8oHQaSWfMgCL7ji/z6Mkt67GdsW7715MEfRuP4pSZxJRJh6kj6Y48WRqVv4N1tRk
 * [new branch]      master     -> bob/master
-
 * [new branch]      patches/4bfb6fe940f815e3fcce6a2796e051df85db9fe1 -> bob/patches/4bfb6fe940f815e3fcce6a2796e051df85db9fe1
+
 * [new branch]      patches/f5e2be41827ee9f80bcdcd02f4f0451287b36229 -> bob/patches/f5e2be41827ee9f80bcdcd02f4f0451287b36229
```

The contributor's changes are now visible to us.
@@ -30,12 +30,12 @@ The contributor's changes are now visible to us.
```
$ git branch -r
  bob/master
-
  bob/patches/4bfb6fe940f815e3fcce6a2796e051df85db9fe1
+
  bob/patches/f5e2be41827ee9f80bcdcd02f4f0451287b36229
  rad/master
-
$ rad patch show 4bfb6fe
+
$ rad patch show f5e2be4
╭─────────────────────────────────────────────────────────────────────╮
│ Title    Define power requirements                                  │
-
│ Patch    4bfb6fe940f815e3fcce6a2796e051df85db9fe1                   │
+
│ Patch    f5e2be41827ee9f80bcdcd02f4f0451287b36229                   │
│ Author   bob z6Mkt67…v4N1tRk                                        │
│ Head     27857ec9eb04c69cacab516e8bf4b5fd36090f66                   │
│ Commits  ahead 2, behind 0                                          │
@@ -47,7 +47,7 @@ $ rad patch show 4bfb6fe
│ 3e674d1 Define power requirements                                   │
├─────────────────────────────────────────────────────────────────────┤
│ ● opened by bob z6Mkt67…v4N1tRk now                                 │
-
│ ↑ updated to 7782e60eb51b6e852abb184b092249327354c625 (27857ec) now │
+
│ ↑ updated to 68143c579935d115e1551b88154cc41763c43e1f (27857ec) now │
╰─────────────────────────────────────────────────────────────────────╯
```

@@ -57,20 +57,20 @@ way will tell others about the corrections we needed before merging the
changes.

```
-
$ rad patch checkout 4bfb6fe940f815e3fcce6a2796e051df85db9fe1
-
✓ Switched to branch patch/4bfb6fe
-
✓ Branch patch/4bfb6fe setup to track rad/patches/4bfb6fe940f815e3fcce6a2796e051df85db9fe1
+
$ rad patch checkout f5e2be41827ee9f80bcdcd02f4f0451287b36229
+
✓ Switched to branch patch/f5e2be4
+
✓ Branch patch/f5e2be4 setup to track rad/patches/f5e2be41827ee9f80bcdcd02f4f0451287b36229
$ git mv REQUIREMENTS REQUIREMENTS.md
$ git commit -m "Use markdown for requirements"
-
[patch/4bfb6fe f567f69] Use markdown for requirements
+
[patch/f5e2be4 f567f69] Use markdown for requirements
 1 file changed, 0 insertions(+), 0 deletions(-)
 rename REQUIREMENTS => REQUIREMENTS.md (100%)
```
``` (stderr)
$ git push rad -o no-sync -o patch.message="Use markdown for requirements"
-
✓ Patch 4bfb6fe updated to fab4fddf6bcae7d55432417cdf5a7d0270d0d7d3
+
✓ Patch f5e2be4 updated to 5dd1e6b73aa8c2dbfa192f8cf47cd1525c602ebf
To rad://z42hL2jL4XNk6K8oHQaSWfMgCL7ji/z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
-
 * [new branch]      patch/4bfb6fe -> patches/4bfb6fe940f815e3fcce6a2796e051df85db9fe1
+
 * [new branch]      patch/f5e2be4 -> patches/f5e2be41827ee9f80bcdcd02f4f0451287b36229
```

Great, all fixed up, lets merge the code.
@@ -78,7 +78,7 @@ Great, all fixed up, lets merge the code.
```
$ git checkout master
Your branch is up to date with 'rad/master'.
-
$ git merge patch/4bfb6fe
+
$ git merge patch/f5e2be4
Updating f2de534..f567f69
Fast-forward
 README.md       | 0
@@ -89,7 +89,7 @@ Fast-forward
```
``` (stderr)
$ git push rad master
-
✓ Patch 4bfb6fe940f815e3fcce6a2796e051df85db9fe1 merged at revision fab4fdd
+
✓ Patch f5e2be41827ee9f80bcdcd02f4f0451287b36229 merged at revision 5dd1e6b
✓ Synced with 1 node(s)
To rad://z42hL2jL4XNk6K8oHQaSWfMgCL7ji/z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkEStBU8Vi
   f2de534..f567f69  master -> master
@@ -98,10 +98,10 @@ To rad://z42hL2jL4XNk6K8oHQaSWfMgCL7ji/z6MknSLrJoTcukLrE435hVNQT4JUhbvWLX4kUzqkE
The patch is now merged and closed :).

```
-
$ rad patch show 4bfb6fe
+
$ rad patch show f5e2be4
╭─────────────────────────────────────────────────────────────────────╮
│ Title    Define power requirements                                  │
-
│ Patch    4bfb6fe940f815e3fcce6a2796e051df85db9fe1                   │
+
│ Patch    f5e2be41827ee9f80bcdcd02f4f0451287b36229                   │
│ Author   bob z6Mkt67…v4N1tRk                                        │
│ Head     27857ec9eb04c69cacab516e8bf4b5fd36090f66                   │
│ Commits  ahead 0, behind 1                                          │
@@ -113,9 +113,9 @@ $ rad patch show 4bfb6fe
│ 3e674d1 Define power requirements                                   │
├─────────────────────────────────────────────────────────────────────┤
│ ● opened by bob z6Mkt67…v4N1tRk now                                 │
-
│ ↑ updated to 7782e60eb51b6e852abb184b092249327354c625 (27857ec) now │
-
│ * revised by alice (you) in fab4fdd (f567f69) now                   │
-
│ ✓ merged by alice (you) at revision fab4fdd (f567f69) now           │
+
│ ↑ updated to 68143c579935d115e1551b88154cc41763c43e1f (27857ec) now │
+
│ * revised by alice (you) in 5dd1e6b (f567f69) now                   │
+
│ ✓ merged by alice (you) at revision 5dd1e6b (f567f69) now           │
╰─────────────────────────────────────────────────────────────────────╯
```

modified radicle-node/Cargo.toml
@@ -26,6 +26,7 @@ log = { version = "0.4.17", features = ["std"] }
localtime = { version = "1.2.0" }
netservices = { version = "0.3.0", features = ["io-reactor", "socket2"] }
nonempty = { version = "0.8.1", features = ["serialize"] }
+
once_cell = { version = "1.13" }
qcheck = { version = "1", default-features = false, optional = true }
# N.b. this is required to use macros, even though it's re-exported
# through radicle
@@ -42,6 +43,10 @@ thiserror = { version = "1" }
path = "../radicle"
version = "0.2.0"

+
[dependencies.radicle-fetch]
+
path = "../radicle-fetch"
+
version = "0.1.0"
+

[dev-dependencies]
radicle = { path = "../radicle", version = "*", features = ["test"] }
radicle-crypto = { path = "../radicle-crypto", version = "*", features = ["test", "cyphernet"] }
modified radicle-node/src/runtime.rs
@@ -10,6 +10,7 @@ use std::{fs, io, net, time};
use crossbeam_channel as chan;
use cyphernet::Ecdh;
use netservices::resource::NetAccept;
+
use radicle_fetch::FetchLimit;
use reactor::poller::popol;
use reactor::Reactor;
use thiserror::Error;
@@ -130,6 +131,7 @@ impl Runtime {
        G: Ecdh<Pk = NodeId> + Clone,
    {
        let id = *signer.public_key();
+
        let alias = config.alias.clone();
        let node_dir = home.node();
        let network = config.network;
        let rng = fastrand::Rng::new();
@@ -138,6 +140,8 @@ impl Runtime {
        let address_db = node_dir.join(ADDRESS_DB_FILE);
        let routing_db = node_dir.join(ROUTING_DB_FILE);
        let tracking_db = node_dir.join(TRACKING_DB_FILE);
+
        let scope = config.scope;
+
        let policy = config.policy;

        log::info!(target: "node", "Opening address book {}..", address_db.display());
        let mut addresses = address::Book::open(address_db)?;
@@ -146,10 +150,10 @@ impl Runtime {
        let routing = routing::Table::open(routing_db)?;

        log::info!(target: "node", "Opening tracking policy table {}..", tracking_db.display());
-
        let tracking = tracking::Store::open(tracking_db)?;
-
        let tracking = tracking::Config::new(config.policy, config.scope, tracking);
+
        let tracking = tracking::Store::open(tracking_db.clone())?;
+
        let tracking = tracking::Config::new(policy, scope, tracking);

-
        log::info!(target: "node", "Default tracking policy set to '{}'", &config.policy);
+
        log::info!(target: "node", "Default tracking policy set to '{}'", &policy);
        log::info!(target: "node", "Initializing service ({:?})..", network);

        let announcement = if let Some(ann) = fs::read(&node_dir.join(NODE_ANNOUNCEMENT_FILE))
@@ -211,7 +215,7 @@ impl Runtime {
        );

        let (worker_send, worker_recv) = chan::unbounded::<worker::Task>();
-
        let mut wire = Wire::new(service, worker_send, signer, proxy, clock);
+
        let mut wire = Wire::new(service, worker_send, signer.clone(), proxy, clock);
        let mut local_addrs = Vec::new();

        for addr in listen {
@@ -234,16 +238,25 @@ impl Runtime {
            );
        }

+
        let nid = *signer.public_key();
+
        let fetch = worker::FetchConfig {
+
            policy,
+
            scope,
+
            tracking_db,
+
            limit: FetchLimit::default(),
+
            info: git::UserInfo { alias, key: nid },
+
            local: nid,
+
        };
        let pool = worker::Pool::with(
-
            id,
            worker_recv,
+
            nid,
            handle.clone(),
            worker::Config {
                capacity: 8,
                timeout: time::Duration::from_secs(9),
                storage: storage.clone(),
                daemon,
-
                atomic,
+
                fetch,
            },
        );
        let control = match UnixListener::bind(home.socket()) {
modified radicle-node/src/service.rs
@@ -22,7 +22,6 @@ use nonempty::NonEmpty;
use radicle::node::address;
use radicle::node::address::{AddressBook, KnownAddress};
use radicle::node::config::PeerConfig;
-
pub use radicle::node::tracking::config as tracking;
use radicle::node::ConnectOptions;
use radicle::storage::RepositoryError;

@@ -39,7 +38,8 @@ use crate::service::message::{NodeAnnouncement, RefsAnnouncement};
use crate::service::tracking::{store::Write, Scope};
use crate::storage;
use crate::storage::{Namespaces, ReadStorage};
-
use crate::storage::{ReadRepository, RefUpdate, RemoteRepository as _};
+
use crate::storage::{ReadRepository, RemoteRepository as _};
+
use crate::worker::fetch;
use crate::worker::FetchError;
use crate::Link;

@@ -48,6 +48,8 @@ pub use crate::node::{config::Network, Config, NodeId};
pub use crate::service::message::{Message, ZeroBytes};
pub use crate::service::session::Session;

+
pub use radicle::node::tracking::config as tracking;
+

use self::gossip::Gossip;
use self::io::Outbox;
use self::limitter::RateLimiter;
@@ -647,10 +649,13 @@ where
        &mut self,
        rid: Id,
        remote: NodeId,
-
        result: Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>,
+
        result: Result<fetch::FetchResult, FetchError>,
    ) {
        let result = match result {
-
            Ok((updated, namespaces)) => {
+
            Ok(fetch::FetchResult {
+
                updated,
+
                namespaces,
+
            }) => {
                debug!(target: "service", "Fetched {rid} from {remote} successfully");

                for update in &updated {
modified radicle-node/src/test/environment.rs
@@ -334,6 +334,7 @@ impl Node<MockSigner> {
        let tracking = home.tracking_mut().unwrap();
        let routing = home.routing_mut().unwrap();

+
        log::debug!(target: "test", "Node::init {}: {}", config.alias, signer.public_key());
        Self {
            id: *signer.public_key(),
            home,
modified radicle-node/src/test/simulator.rs
@@ -18,10 +18,10 @@ use crate::crypto::Signer;
use crate::prelude::{Address, Id};
use crate::service::io::Io;
use crate::service::{DisconnectReason, Event, Message, NodeId};
+
use crate::storage::Namespaces;
use crate::storage::WriteStorage;
-
use crate::storage::{Namespaces, RefUpdate};
use crate::test::peer::Service;
-
use crate::worker::FetchError;
+
use crate::worker::{fetch, FetchError};
use crate::Link;

/// Minimum latency between peers.
@@ -66,11 +66,7 @@ pub enum Input {
    /// Received a message from a remote peer.
    Received(NodeId, Vec<Message>),
    /// Fetch completed for a node.
-
    Fetched(
-
        Id,
-
        NodeId,
-
        Rc<Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>>,
-
    ),
+
    Fetched(Id, NodeId, Rc<Result<fetch::FetchResult, FetchError>>),
    /// Used to advance the state machine after some wall time has passed.
    Wake,
}
@@ -422,11 +418,11 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            Err(e) => panic!("Failed to open repository: {e}"),
                        };
                        match &result {
-
                            Ok((_, remotes)) => {
+
                            Ok(fetch::FetchResult { namespaces, .. }) => {
                                radicle::test::fetch(
                                    &repo,
                                    &nid,
-
                                    Namespaces::Trusted(remotes.clone()),
+
                                    Namespaces::Trusted(namespaces.clone()),
                                )
                                .unwrap();
                            }
@@ -656,13 +652,13 @@ impl<S: WriteStorage + 'static, G: Signer> Simulation<S, G> {
                            input: Input::Fetched(
                                rid,
                                remote,
-
                                Rc::new(Ok((
-
                                    vec![],
-
                                    match namespaces {
+
                                Rc::new(Ok(fetch::FetchResult {
+
                                    updated: vec![],
+
                                    namespaces: match namespaces {
                                        Namespaces::Trusted(hs) => hs,
                                        Namespaces::All => HashSet::new(),
                                    },
-
                                ))),
+
                                })),
                            ),
                        },
                    );
modified radicle-node/src/tests.rs
@@ -40,6 +40,7 @@ use crate::test::simulator::{Peer as _, Simulation};
use crate::test::storage::MockStorage;
use crate::wire::Decode;
use crate::wire::Encode;
+
use crate::worker::fetch;
use crate::LocalTime;
use crate::{git, identity, rad, runtime, service, test};

@@ -1186,14 +1187,14 @@ fn test_queued_fetch() {
    alice.elapse(KEEP_ALIVE_DELTA);

    // Finish the 1st fetch.
-
    alice.fetched(rid1, bob.id, Ok((vec![], Default::default())));
+
    alice.fetched(rid1, bob.id, Ok(fetch::FetchResult::default()));
    // Now the 1st fetch is done, the 2nd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid2);
    // ... but not the third.
    assert_matches!(alice.fetches().next(), None);

    // Finish the 2nd fetch.
-
    alice.fetched(rid2, bob.id, Ok((vec![], Default::default())));
+
    alice.fetched(rid2, bob.id, Ok(fetch::FetchResult::default()));
    // Now the 2nd fetch is done, the 3rd fetch is dequeued.
    assert_matches!(alice.fetches().next(), Some((rid, _, _)) if rid == rid3);
}
modified radicle-node/src/tests/e2e.rs
@@ -4,7 +4,7 @@ use radicle::crypto::{test::signer::MockSigner, Signer};
use radicle::git;
use radicle::node::{Alias, FetchResult, Handle as _, DEFAULT_TIMEOUT};
use radicle::storage::{
-
    ReadRepository, ReadStorage, RemoteRepository, ValidateRepository, WriteRepository,
+
    ReadRepository, ReadStorage, RefUpdate, RemoteRepository, ValidateRepository, WriteRepository,
    WriteStorage,
};
use radicle::test::fixtures;
@@ -205,10 +205,8 @@ fn test_replication() {
    );
}

-
// TODO: ignoring as this is being fixed in incoming fetch changes
#[test]
-
#[ignore]
-
fn test_replication_no_delegates() {
+
fn test_replication_ref_in_sigrefs() {
    logger::init(log::Level::Debug);

    let tmp = tempfile::tempdir().unwrap();
@@ -234,11 +232,19 @@ fn test_replication_no_delegates() {
    alice.handle.track_repo(acme, Scope::All).unwrap();
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();

-
    assert_matches!(
-
        result,
-
        FetchResult::Failed {
-
            reason
-
        } if reason == "no delegates in transfer"
+
    assert_matches!(result, FetchResult::Success { .. });
+

+
    // alice still sees bob's master branch since it was in his
+
    // sigrefs.
+
    assert!(
+
        alice
+
            .storage
+
            .repository(acme)
+
            .unwrap()
+
            .reference(&bob.id, &git::qualified!("refs/heads/master"))
+
            .is_ok(),
+
        "refs/namespaces/{}/refs/heads/master does not exist",
+
        bob.id
    );
}

@@ -584,9 +590,9 @@ fn test_fetch_up_to_date() {

    // Fetch again! This time, everything's up to date.
    let result = alice.handle.fetch(acme, bob.id, DEFAULT_TIMEOUT).unwrap();
-
    assert_eq!(
+
    assert_matches!(
        result.success(),
-
        Some((vec![], HashSet::from_iter([bob.id])))
+
        Some((updates, _fetched)) if updates.iter().all(|update| matches!(update, RefUpdate::Skipped { .. }))
    );
}

@@ -774,6 +780,9 @@ fn test_connection_crossing() {
    assert!(s1 ^ s2, "Exactly one session should be established");
}

+
// TODO(finto): I witnessed a flaky error, but can't replicate.
+
// We log some values until it's clearer where this flake is coming
+
// from.
#[test]
/// Alice is going to try to fetch outdated refs of Bob, from Eve. This is a non-fastfoward fetch
/// on the sigrefs branch.
@@ -806,6 +815,17 @@ fn test_non_fastforward_sigrefs() {
    // Alice fetches it too.
    alice.handle.fetch(rid, bob.id, DEFAULT_TIMEOUT).unwrap();

+
    // Log the before Oid value of bob's 'rad/sigrefs', for debugging purposes.
+
    {
+
        let before = alice
+
            .storage
+
            .repository(rid)
+
            .unwrap()
+
            .reference_oid(&bob.id, &radicle::storage::refs::SIGREFS_BRANCH)
+
            .unwrap();
+
        log::debug!(target: "test", "bob's old 'rad/sigrefs': {}", before);
+
    }
+

    // Now Eve disconnects from Bob so she doesn't fetch his update.
    eve.handle
        .command(service::Command::Disconnect(bob.id))
@@ -820,11 +840,22 @@ fn test_non_fastforward_sigrefs() {
    // Alice fetches from Bob.
    alice.handle.fetch(rid, bob.id, DEFAULT_TIMEOUT).unwrap();

+
    // Log the after Oid value of bob's 'rad/sigrefs', for debugging purposes.
+
    {
+
        let after = alice
+
            .storage
+
            .repository(rid)
+
            .unwrap()
+
            .reference_oid(&bob.id, &radicle::storage::refs::SIGREFS_BRANCH)
+
            .unwrap();
+
        log::debug!(target: "test", "bob's old 'rad/sigrefs': {}", after);
+
    }
+

    // Now Alice has the latest, and when she tries to fetch from Eve, it breaks because
    // Eve has old refs.
    assert_matches!(
        alice.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
-
        FetchResult::Success { .. }
+
        FetchResult::Failed { .. }
    );
}

@@ -867,6 +898,7 @@ fn test_outdated_sigrefs() {
    let repo = alice.storage.repository(rid).unwrap();
    assert!(repo.remote(&eve.id).is_ok());

+
    log::debug!(target: "test", "Bob fetches from Eve..");
    assert_matches!(
        bob.handle.fetch(rid, eve.id, DEFAULT_TIMEOUT).unwrap(),
        FetchResult::Success { .. }
modified radicle-node/src/wire/protocol.rs
@@ -827,8 +827,8 @@ where
                Io::Fetch {
                    rid,
                    remote,
-
                    namespaces,
                    timeout,
+
                    ..
                } => {
                    log::trace!(target: "wire", "Processing fetch for {rid} from {remote}..");

@@ -849,7 +849,6 @@ where
                    let task = Task {
                        fetch: FetchRequest::Initiator {
                            rid,
-
                            namespaces,
                            remote,
                            timeout,
                        },
modified radicle-node/src/worker.rs
@@ -1,24 +1,23 @@
#![allow(clippy::too_many_arguments)]
mod channels;
-
mod fetch;
-
mod tunnel;
+
mod upload_pack;

-
use std::collections::{BTreeSet, HashSet};
-
use std::io::{prelude::*, BufReader};
-
use std::ops::ControlFlow;
-
use std::{env, io, net, process, time};
+
pub mod fetch;
+

+
use std::path::PathBuf;
+
use std::{io, net, time};

use crossbeam_channel as chan;

use radicle::identity::Id;
use radicle::prelude::NodeId;
-
use radicle::storage::{Namespaces, ReadRepository, ReadStorage, RefUpdate};
-
use radicle::{git, storage, Storage};
+
use radicle::storage::{ReadRepository, ReadStorage};
+
use radicle::{crypto, git, Storage};
+
use radicle_fetch::FetchLimit;

use crate::runtime::{thread, Handle};
+
use crate::service::tracking;
use crate::wire::StreamId;
-
use channels::{ChannelReader, ChannelWriter};
-
use tunnel::Tunnel;

pub use channels::{ChannelEvent, Channels};

@@ -26,14 +25,14 @@ pub use channels::{ChannelEvent, Channels};
pub struct Config {
    /// Number of worker threads.
    pub capacity: usize,
-
    /// Whether to use atomic fetches.
-
    pub atomic: bool,
    /// Timeout for all operations.
    pub timeout: time::Duration,
    /// Git daemon address.
    pub daemon: net::SocketAddr,
    /// Git storage.
    pub storage: Storage,
+
    /// Configuration for performing fetched.
+
    pub fetch: FetchConfig,
}

/// Error returned by fetch.
@@ -44,11 +43,17 @@ pub enum FetchError {
    #[error(transparent)]
    Io(#[from] io::Error),
    #[error(transparent)]
-
    StagingInit(#[from] fetch::error::Init),
+
    Fetch(#[from] fetch::error::Fetch),
+
    #[error(transparent)]
+
    Handle(#[from] fetch::error::Handle),
+
    #[error(transparent)]
+
    Storage(#[from] radicle::storage::Error),
+
    #[error(transparent)]
+
    TrackingConfig(#[from] radicle::node::tracking::store::Error),
    #[error(transparent)]
-
    StagingTransition(#[from] fetch::error::Transition),
+
    Tracked(#[from] radicle_fetch::tracking::error::Tracking),
    #[error(transparent)]
-
    StagingTransfer(#[from] fetch::error::Transfer),
+
    Blocked(#[from] radicle_fetch::tracking::error::Blocked),
}

impl FetchError {
@@ -87,13 +92,11 @@ impl UploadError {
/// Fetch job sent to worker thread.
#[derive(Debug, Clone)]
pub enum FetchRequest {
-
    /// Client is initiating a fetch in order to receive the specified
-
    /// `refspecs` determined by [`Namespaces`].
+
    /// Client is initiating a fetch for the repository identified by
+
    /// `rid` from the peer identified by `remote`.
    Initiator {
        /// Repo to fetch.
        rid: Id,
-
        /// Namespaces to fetch.
-
        namespaces: Namespaces,
        /// Remote peer we are interacting with.
        remote: NodeId,
        /// Fetch timeout.
@@ -122,7 +125,7 @@ pub enum FetchResult {
        /// Repo fetched.
        rid: Id,
        /// Fetch result, including remotes fetched.
-
        result: Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError>,
+
        result: Result<fetch::FetchResult, FetchError>,
    },
    Responder {
        /// Upload result.
@@ -146,15 +149,29 @@ pub struct TaskResult {
    pub stream: StreamId,
}

+
#[derive(Debug, Clone)]
+
pub struct FetchConfig {
+
    /// Default policy, if a policy for a specific node or repository was not found.
+
    pub policy: tracking::Policy,
+
    /// Default scope, if a scope for a specific repository was not found.
+
    pub scope: tracking::Scope,
+
    /// Path to the tracking database.
+
    pub tracking_db: PathBuf,
+
    /// Data limits when fetching from a remote.
+
    pub limit: FetchLimit,
+
    /// Information of the local peer.
+
    pub info: git::UserInfo,
+
    /// Public key of the local peer.
+
    pub local: crypto::PublicKey,
+
}
+

/// A worker that replicates git objects.
struct Worker {
    nid: NodeId,
    storage: Storage,
+
    fetch_config: FetchConfig,
    tasks: chan::Receiver<Task>,
-
    daemon: net::SocketAddr,
-
    timeout: time::Duration,
    handle: Handle,
-
    atomic: bool,
}

impl Worker {
@@ -174,6 +191,7 @@ impl Worker {
            stream,
        } = task;
        let remote = fetch.remote();
+
        let channels = channels::ChannelsFlush::new(self.handle.clone(), channels, remote, stream);
        let result = self._process(fetch, stream, channels);

        log::trace!(target: "worker", "Sending response back to service..");
@@ -195,395 +213,91 @@ impl Worker {
        &mut self,
        fetch: FetchRequest,
        stream: StreamId,
-
        mut channels: Channels,
+
        mut channels: channels::ChannelsFlush,
    ) -> FetchResult {
        match fetch {
            FetchRequest::Initiator {
                rid,
-
                namespaces,
                remote,
-
                timeout,
+
                // TODO: nowhere to use this currently
+
                timeout: _timeout,
            } => {
                log::debug!(target: "worker", "Worker processing outgoing fetch for {}", rid);
-
                let result = self.fetch(rid, remote, stream, &namespaces, channels, timeout);
+
                let result = self.fetch(rid, remote, channels);

                FetchResult::Initiator { rid, result }
            }
            FetchRequest::Responder { remote } => {
-
                log::debug!(target: "worker", "Worker processing incoming fetch..");
-

-
                let (stream_w, stream_r) = channels.split();
-
                // Nb. two fetches are usually expected: one for the *special* refs,
-
                // followed by another for the signed refs.
-
                let result = loop {
-
                    match self.upload_pack(remote, stream, stream_r, stream_w) {
-
                        Ok(ControlFlow::Continue(())) => continue,
-
                        Ok(ControlFlow::Break(rid)) => break Ok(rid),
-
                        Err(e) => break Err(e),
+
                log::debug!(target: "worker", "Worker processing incoming fetch for {remote}..");
+

+
                let (mut stream_r, stream_w) = channels.split();
+

+
                let header = match upload_pack::pktline::git_request(&mut stream_r) {
+
                    Ok(header) => header,
+
                    Err(e) => {
+
                        return FetchResult::Responder {
+
                            result: Err(e.into()),
+
                        }
                    }
                };
-
                log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");
-

-
                FetchResult::Responder { result }
-
            }
-
        }
-
    }

-
    fn fetch(
-
        &mut self,
-
        rid: Id,
-
        remote: NodeId,
-
        stream: StreamId,
-
        namespaces: &Namespaces,
-
        mut channels: Channels,
-
        timeout: time::Duration,
-
    ) -> Result<(Vec<RefUpdate>, HashSet<NodeId>), FetchError> {
-
        let staging =
-
            fetch::StagingPhaseInitial::new(&self.storage, rid, self.nid, namespaces.clone())?;
-
        let refs = if staging.repo.is_cloning() {
-
            match self._fetch(
-
                &staging.repo,
-
                staging.repo.is_cloning(),
-
                remote,
-
                staging.refspecs(),
-
                stream,
-
                &mut channels,
-
                timeout,
-
            ) {
-
                Ok(_) => {
-
                    log::debug!(target: "worker", "Initial fetch for {rid} exited successfully")
-
                }
-
                Err(e) => {
-
                    log::error!(target: "worker", "Initial fetch for {rid} failed: {e}");
-
                    return Err(e);
+
                if let Err(e) = self.is_authorized(remote, header.repo) {
+
                    return FetchResult::Responder { result: Err(e) };
                }
-
            }
-

-
            // TODO(finto): when cloning we simply fetch the special
-
            // rad refs from the remote side, however, when the
-
            // repository already exists we need to `ls-remote` (see
-
            // below). The result of the ls-remote is a BTreeSet of
-
            // refs and so we need return an empty set here so that we
-
            // can pass them into `into_final`. This is seems like a
-
            // code smell to me due to bad boundaries between the
-
            // logic in this module and the logic in the fetch module.
-
            BTreeSet::new()
-
        } else {
-
            self.ls_refs(
-
                &staging.repo,
-
                staging.ls_remote_refs(),
-
                remote,
-
                stream,
-
                &mut channels,
-
            )?
-
        };

-
        let staging = staging.into_final(refs)?;
+
                let result =
+
                    upload_pack::upload_pack(&self.nid, &self.storage, &header, stream_r, stream_w)
+
                        .map(|_| ())
+
                        .map_err(|e| e.into());
+
                log::debug!(target: "worker", "Upload process on stream {stream} exited with result {result:?}");

-
        match self._fetch(
-
            &staging.repo,
-
            staging.repo.is_cloning(),
-
            remote,
-
            staging.refspecs(),
-
            stream,
-
            &mut channels,
-
            timeout,
-
        ) {
-
            Ok(()) => log::debug!(target: "worker", "Final fetch for {rid} exited successfully"),
-
            Err(e) => {
-
                log::error!(target: "worker", "Final fetch for {rid} failed: {e}");
-
                return Err(e);
+
                FetchResult::Responder { result }
            }
        }
-

-
        staging.transfer().map_err(FetchError::from)
    }

-
    fn upload_pack(
-
        &mut self,
-
        remote: NodeId,
-
        stream: StreamId,
-
        stream_r: &mut ChannelReader,
-
        stream_w: &mut ChannelWriter,
-
    ) -> Result<ControlFlow<()>, UploadError> {
-
        log::debug!(target: "worker", "Waiting for Git request pktline from {remote}..");
-

-
        // Read the request packet line to know what repository we're uploading.
-
        let (rid, request) = match pktline::Reader::new(stream_r).read_request_pktline() {
-
            Ok((req, pktline)) => (req.repo, pktline),
-
            Err(err) if err.kind() == io::ErrorKind::ConnectionReset => {
-
                log::debug!(
-
                    target: "worker",
-
                    "Upload process received stream `close` from {remote}"
-
                );
-
                return Ok(ControlFlow::Break(()));
-
            }
-
            Err(err) => {
-
                return Err(UploadError::PacketLine(err));
-
            }
-
        };
-
        log::debug!(target: "worker", "Received Git request pktline for {rid}..");
-

+
    fn is_authorized(&self, remote: NodeId, rid: Id) -> Result<(), UploadError> {
        let repo = self.storage.repository(rid)?;
-
        let doc = repo.identity_doc()?;
-

+
        let doc = repo.canonical_identity_doc()?;
        if !doc.is_visible_to(&remote) {
-
            return Err(UploadError::Unauthorized(remote, rid));
-
        }
-

-
        match self._upload_pack(rid, remote, request, stream, stream_r, stream_w) {
-
            Ok(()) => {
-
                log::debug!(target: "worker", "Upload of {rid} to {remote} on stream {stream} exited successfully");
-

-
                Ok(ControlFlow::Continue(()))
-
            }
-
            Err(e) => Err(e),
+
            Err(UploadError::Unauthorized(remote, rid))
+
        } else {
+
            Ok(())
        }
    }

-
    fn _upload_pack(
+
    fn fetch(
        &mut self,
        rid: Id,
        remote: NodeId,
-
        request: Vec<u8>,
-
        stream: StreamId,
-
        stream_r: &mut ChannelReader,
-
        stream_w: &mut ChannelWriter,
-
    ) -> Result<(), UploadError> {
-
        log::debug!(target: "worker", "Connecting to daemon..");
-

-
        // Connect to our local git daemon, running as a child process.
-
        let daemon = net::TcpStream::connect_timeout(&self.daemon, self.timeout)
-
            .map_err(UploadError::DaemonConnectionFailed)?;
-
        let (mut daemon_r, mut daemon_w) = (daemon.try_clone()?, daemon);
-

-
        daemon_r.set_read_timeout(Some(self.timeout))?;
-
        daemon_w.set_write_timeout(Some(self.timeout))?;
-

-
        // Write the raw request to the daemon, once we've parsed it.
-
        daemon_w.write_all(&request)?;
-

-
        log::debug!(target: "worker", "Entering Git protocol loop for {rid}..");
-

-
        thread::scope(|s| {
-
            let daemon_to_stream = thread::spawn_scoped(&self.nid, "upload-pack", s, || {
-
                let mut buffer = [0; u16::MAX as usize + 1];
-

-
                loop {
-
                    match daemon_r.read(&mut buffer) {
-
                        Ok(0) => break,
-
                        Ok(n) => {
-
                            stream_w.send(buffer[..n].to_vec())?;
-

-
                            if let Err(e) = self.handle.flush(remote, stream) {
-
                                log::error!(target: "worker", "Worker channel disconnected; aborting");
-
                                return Err(e);
-
                            }
-
                        }
-
                        Err(e) => {
-
                            if e.kind() == io::ErrorKind::UnexpectedEof {
-
                                log::debug!(target: "worker", "Daemon closed the git connection for {rid}");
-
                                break;
-
                            }
-
                            return Err(e);
-
                        }
-
                    }
-
                }
-
                Self::eof(remote, stream, stream_w, &mut self.handle)
-
            });
-

-
            let stream_to_daemon = thread::spawn_scoped(&self.nid, "upload-pack", s, move || {
-
                match stream_r
-
                    .pipe(&mut daemon_w)
-
                    .and_then(|()| daemon_w.shutdown(net::Shutdown::Both))
-
                {
-
                    Ok(()) => Ok(()),
-
                    // On macOS, this error is returned if the socket is already closed.
-
                    // We don't consider that a problem, as it just returns `Ok(())` on
-
                    // Linux.
-
                    Err(e) if e.kind() == io::ErrorKind::NotConnected => Ok(()),
-
                    Err(e) => Err(e),
-
                }
-
            });
-

-
            stream_to_daemon.join().unwrap()?;
-
            daemon_to_stream.join().unwrap()?;
-

-
            Ok::<(), UploadError>(())
-
        })
-
    }
-

-
    fn ls_refs(
-
        &self,
-
        repo: &fetch::StagedRepository,
-
        namespaces: impl IntoIterator<Item = git::PatternString>,
-
        remote: NodeId,
-
        stream: StreamId,
-
        channels: &mut Channels,
-
    ) -> Result<BTreeSet<git::Namespaced<'static>>, FetchError> {
-
        let tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
-
        let tunnel_addr = tunnel.local_addr();
-
        let mut cmd = process::Command::new("git");
-
        cmd.current_dir(repo.path())
-
            .env_clear()
-
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
-
            .envs(git::env::GIT_DEFAULT_CONFIG)
-
            .args(["-c", "protocol.version=2"])
-
            .arg("ls-remote")
-
            .arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()));
-

-
        for ns in namespaces.into_iter() {
-
            cmd.arg(ns.as_str());
-
        }
-

-
        cmd.stdout(process::Stdio::piped())
-
            .stderr(process::Stdio::piped())
-
            .stdin(process::Stdio::piped());
-

-
        log::debug!(target: "worker", "Running command: {:?}", cmd);
-

-
        let mut refs = BTreeSet::new();
-
        let mut child = cmd.spawn()?;
-
        let stderr = child.stderr.take().unwrap();
-
        let stdout = child.stdout.take().unwrap();
-

-
        // Since `ls-remote` may return a lot of data, we read the child's stdout concurrently, to
-
        // prevent deadlocks that could arise if we fill the pipe buffer before the process exits.
-
        thread::scope(|s| {
-
            thread::spawn_scoped(&self.nid, "ls-refs", s, || {
-
                for line in BufReader::new(stderr).lines().flatten() {
-
                    log::debug!(target: "worker", "Git: {}", line);
-
                }
-
            });
-
            thread::spawn_scoped(&self.nid, "ls-refs", s, || {
-
                for line in BufReader::new(stdout).lines().flatten() {
-
                    log::debug!(target: "worker", "Git: {}", line);
-

-
                    let r = match line.split_whitespace().next_back() {
-
                        Some(r) => r,
-
                        None => {
-
                            log::trace!(target: "worker", "Git: ls-remote returned unexpected format {line}");
-
                            continue;
-
                        }
-
                    };
-
                    match git::RefString::try_from(r) {
-
                        Ok(r) => {
-
                            if let Some(ns) = r.to_namespaced() {
-
                                refs.insert(ns.to_owned());
-
                            } else {
-
                                log::debug!(target: "worker", "Git: non-namespaced ref '{r}'")
-
                            }
-
                        }
-
                        Err(err) => {
-
                            log::warn!(target: "worker", "Git: invalid refname '{r}' {err}")
-
                        }
-
                    }
-
                }
-
            });
-

-
            tunnel.run(self.timeout)?;
-

-
            Ok::<_, FetchError>(())
-
        })?;
-

-
        let result = child.wait()?;
-

-
        if result.success() {
-
            Ok(refs)
-
        } else {
-
            Err(FetchError::CommandFailed {
-
                code: result.code().unwrap_or(1),
-
            })
-
        }
-
    }
-

-
    fn _fetch<S>(
-
        &self,
-
        repo: &storage::git::Repository,
-
        is_cloning: bool,
-
        remote: NodeId,
-
        specs: S,
-
        stream: StreamId,
-
        channels: &mut Channels,
-
        timeout: time::Duration,
-
    ) -> Result<(), FetchError>
-
    where
-
        S: IntoIterator<Item = fetch::Refspec>,
-
    {
-
        let tunnel = Tunnel::with(channels, stream, self.nid, remote, self.handle.clone())?;
-
        let tunnel_addr = tunnel.local_addr();
-
        let mut cmd = process::Command::new("git");
-
        cmd.current_dir(repo.path())
-
            .env_clear()
-
            .envs(env::vars().filter(|(k, _)| k == "PATH" || k.starts_with("GIT_TRACE")))
-
            .envs(git::env::GIT_DEFAULT_CONFIG)
-
            .args(["-c", "protocol.version=2"])
-
            .arg("fetch")
-
            .arg("--verbose");
-

-
        if self.atomic {
-
            // Enable atomic fetch. Only works with Git 2.31 and later.
-
            cmd.arg("--atomic");
-
        }
-

-
        let namespace = self.nid.to_namespace();
-
        let mut fetchspecs = specs
-
            .into_iter()
-
            // Filter out our own refs, if we aren't cloning.
-
            .filter(|fs| is_cloning || !fs.dst.starts_with(namespace.as_str()))
-
            .map(|spec| spec.to_string())
-
            .collect::<Vec<_>>();
-

-
        if !is_cloning {
-
            // Make sure we don't fetch our own refs via a glob pattern.
-
            fetchspecs.push(format!("^refs/namespaces/{}/*", self.nid));
-
        }
-

-
        cmd.arg(format!("git://{tunnel_addr}/{}", repo.id.canonical()))
-
            .args(&fetchspecs)
-
            .stdout(process::Stdio::piped())
-
            .stderr(process::Stdio::piped())
-
            .stdin(process::Stdio::piped());
-

-
        log::debug!(target: "worker", "Running command: {:?}", cmd);
-

-
        let mut child = cmd.spawn()?;
-
        let stderr = child.stderr.take().unwrap();
-

-
        thread::spawn(&self.nid, "fetch", || {
-
            for line in BufReader::new(stderr).lines().flatten() {
-
                log::debug!(target: "worker", "Git: {}", line);
-
            }
-
        });
-

-
        tunnel.run(timeout)?;
+
        channels: channels::ChannelsFlush,
+
    ) -> Result<fetch::FetchResult, FetchError> {
+
        let FetchConfig {
+
            policy,
+
            scope,
+
            tracking_db,
+
            limit,
+
            info,
+
            local,
+
        } = &self.fetch_config;
+
        let tracking =
+
            tracking::Config::new(*policy, *scope, tracking::Store::reader(tracking_db)?);
+
        // N.b. if the `rid` is blocked this will return an error, so
+
        // we won't continue with any further set up of the fetch.
+
        let tracked = radicle_fetch::Tracked::from_config(rid, &tracking)?;
+
        let blocked = radicle_fetch::BlockList::from_config(&tracking)?;
+

+
        let handle = fetch::Handle::new(
+
            rid,
+
            *local,
+
            info.clone(),
+
            &self.storage,
+
            tracked,
+
            blocked,
+
            channels,
+
        )?;

-
        let result = child.wait()?;
-
        if result.success() {
-
            Ok(())
-
        } else {
-
            Err(FetchError::CommandFailed {
-
                code: result.code().unwrap_or(1),
-
            })
-
        }
-
    }
-

-
    fn eof(
-
        remote: NodeId,
-
        stream: StreamId,
-
        sender: &mut ChannelWriter,
-
        handle: &mut Handle,
-
    ) -> Result<(), io::Error> {
-
        log::debug!(target: "worker", "Sending end-of-file to remote {remote}..");
-

-
        if sender.eof().is_err() {
-
            log::error!(target: "worker", "Fetch error: error sending end-of-file message: channel disconnected");
-
            return Err(io::ErrorKind::BrokenPipe.into());
-
        }
-
        if let Err(e) = handle.flush(remote, stream) {
-
            log::error!(target: "worker", "Error flushing worker stream: {e}");
-
        }
-
        Ok(())
+
        Ok(handle.fetch(rid, &self.storage, *limit, remote)?)
    }
}

@@ -594,7 +308,7 @@ pub struct Pool {

impl Pool {
    /// Create a new worker pool with the given parameters.
-
    pub fn with(nid: NodeId, tasks: chan::Receiver<Task>, handle: Handle, config: Config) -> Self {
+
    pub fn with(tasks: chan::Receiver<Task>, nid: NodeId, handle: Handle, config: Config) -> Self {
        let mut pool = Vec::with_capacity(config.capacity);
        for i in 0..config.capacity {
            let worker = Worker {
@@ -602,9 +316,7 @@ impl Pool {
                tasks: tasks.clone(),
                handle: handle.clone(),
                storage: config.storage.clone(),
-
                daemon: config.daemon,
-
                timeout: config.timeout,
-
                atomic: config.atomic,
+
                fetch_config: config.fetch.clone(),
            };
            let thread = thread::spawn(&nid, format!("worker#{i}"), || worker.run());

@@ -627,105 +339,3 @@ impl Pool {
        Ok(())
    }
}
-

-
pub mod pktline {
-
    use std::io;
-
    use std::io::Read;
-
    use std::str;
-

-
    use super::Id;
-

-
    pub const HEADER_LEN: usize = 4;
-

-
    pub struct Reader<'a, R> {
-
        stream: &'a mut R,
-
    }
-

-
    impl<'a, R: io::Read> Reader<'a, R> {
-
        /// Create a new packet-line reader.
-
        pub fn new(stream: &'a mut R) -> Self {
-
            Self { stream }
-
        }
-

-
        /// Parse a Git request packet-line.
-
        ///
-
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
-
        ///
-
        pub fn read_request_pktline(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
-
            let mut pktline = [0u8; 1024];
-
            let length = self.read_pktline(&mut pktline)?;
-
            let Some(cmd) = GitRequest::parse(&pktline[4..length]) else {
-
                return Err(io::ErrorKind::InvalidInput.into());
-
            };
-
            Ok((cmd, Vec::from(&pktline[..length])))
-
        }
-

-
        /// Parse a Git packet-line.
-
        fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
            self.read_exact(&mut buf[..HEADER_LEN])?;
-

-
            let length = str::from_utf8(&buf[..HEADER_LEN])
-
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
-
            let length = usize::from_str_radix(length, 16)
-
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
-

-
            self.read_exact(&mut buf[HEADER_LEN..length])?;
-

-
            Ok(length)
-
        }
-
    }
-

-
    impl<'a, R: io::Read> io::Read for Reader<'a, R> {
-
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
-
            self.stream.read(buf)
-
        }
-
    }
-

-
    #[derive(Debug)]
-
    pub struct GitRequest {
-
        pub repo: Id,
-
        pub path: String,
-
        pub host: Option<(String, Option<u16>)>,
-
        pub extra: Vec<(String, Option<String>)>,
-
    }
-

-
    impl GitRequest {
-
        /// Parse a Git command from a packet-line.
-
        fn parse(input: &[u8]) -> Option<Self> {
-
            let input = str::from_utf8(input).ok()?;
-
            let mut parts = input
-
                .strip_prefix("git-upload-pack ")?
-
                .split_terminator('\0');
-

-
            let path = parts.next()?.to_owned();
-
            let repo = path.strip_prefix('/')?.parse().ok()?;
-
            let host = match parts.next() {
-
                None | Some("") => None,
-
                Some(host) => {
-
                    let host = host.strip_prefix("host=")?;
-
                    match host.split_once(':') {
-
                        None => Some((host.to_owned(), None)),
-
                        Some((host, port)) => {
-
                            let port = port.parse::<u16>().ok()?;
-
                            Some((host.to_owned(), Some(port)))
-
                        }
-
                    }
-
                }
-
            };
-
            let extra = parts
-
                .skip_while(|part| part.is_empty())
-
                .map(|part| match part.split_once('=') {
-
                    None => (part.to_owned(), None),
-
                    Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
-
                })
-
                .collect();
-

-
            Some(Self {
-
                repo,
-
                path,
-
                host,
-
                extra,
-
            })
-
        }
-
    }
-
}
modified radicle-node/src/worker/channels.rs
@@ -1,8 +1,50 @@
-
use std::io::Read;
+
use std::convert::Infallible;
+
use std::io::{Read, Write};
use std::ops::Deref;
use std::{fmt, io, time};

use crossbeam_channel as chan;
+
use radicle::node::NodeId;
+

+
use crate::runtime::Handle;
+
use crate::wire::StreamId;
+

+
/// A reader and writer pair that can be used in the fetch protocol.
+
///
+
/// It implements [`radicle::fetch::transport::ConnectionStream`] to
+
/// provide its underlying channels for reading and writing.
+
pub struct ChannelsFlush {
+
    receiver: ChannelReader,
+
    sender: ChannelFlushWriter,
+
}
+

+
impl ChannelsFlush {
+
    pub fn new(handle: Handle, channels: Channels, remote: NodeId, stream: StreamId) -> Self {
+
        Self {
+
            receiver: channels.receiver,
+
            sender: ChannelFlushWriter {
+
                writer: channels.sender,
+
                stream,
+
                handle,
+
                remote,
+
            },
+
        }
+
    }
+

+
    pub fn split(&mut self) -> (&mut ChannelReader, &mut ChannelFlushWriter) {
+
        (&mut self.receiver, &mut self.sender)
+
    }
+
}
+

+
impl radicle_fetch::transport::ConnectionStream for ChannelsFlush {
+
    type Read = ChannelReader;
+
    type Write = ChannelFlushWriter;
+
    type Error = Infallible;
+

+
    fn open(&mut self) -> Result<(&mut Self::Read, &mut Self::Write), Self::Error> {
+
        Ok((&mut self.receiver, &mut self.sender))
+
    }
+
}

/// Data that can be sent and received on worker channels.
pub enum ChannelEvent<T = Vec<u8>> {
@@ -63,10 +105,6 @@ impl<T: AsRef<[u8]>> Channels<T> {
        self.receiver.try_iter()
    }

-
    pub fn split(&mut self) -> (&mut ChannelWriter<T>, &mut ChannelReader<T>) {
-
        (&mut self.sender, &mut self.receiver)
-
    }
-

    pub fn send(&self, event: ChannelEvent<T>) -> io::Result<()> {
        self.sender.send(event)
    }
@@ -100,28 +138,6 @@ impl<T: AsRef<[u8]>> ChannelReader<T> {
            timeout,
        }
    }
-

-
    pub fn pipe<W: io::Write>(&mut self, mut writer: W) -> io::Result<()> {
-
        loop {
-
            match self.receiver.recv_timeout(self.timeout) {
-
                Ok(ChannelEvent::Data(data)) => writer.write_all(data.as_ref())?,
-
                Ok(ChannelEvent::Eof) => return Ok(()),
-
                Ok(ChannelEvent::Close) => return Err(io::ErrorKind::ConnectionReset.into()),
-
                Err(chan::RecvTimeoutError::Timeout) => {
-
                    return Err(io::Error::new(
-
                        io::ErrorKind::TimedOut,
-
                        "error reading from stream: channel timed out",
-
                    ));
-
                }
-
                Err(chan::RecvTimeoutError::Disconnected) => {
-
                    return Err(io::Error::new(
-
                        io::ErrorKind::BrokenPipe,
-
                        "error reading from stream: channel is disconnected",
-
                    ));
-
                }
-
            }
-
        }
-
    }
}

impl Read for ChannelReader<Vec<u8>> {
@@ -153,11 +169,61 @@ impl Read for ChannelReader<Vec<u8>> {

/// Wraps a [`chan::Sender`] and provides it with [`io::Write`].
#[derive(Clone)]
-
pub struct ChannelWriter<T = Vec<u8>> {
+
struct ChannelWriter<T = Vec<u8>> {
    sender: chan::Sender<ChannelEvent<T>>,
    timeout: time::Duration,
}

+
/// Wraps a [`ChannelWriter`] alongside the associated [`Handle`] and [`NodeId`].
+
///
+
/// This allows the channel to [`Write::flush`] when calling
+
/// [`Write::write_all`], which is necessary to signal to the
+
/// controller to send the wire data.
+
pub struct ChannelFlushWriter<T = Vec<u8>> {
+
    writer: ChannelWriter<T>,
+
    handle: Handle,
+
    stream: StreamId,
+
    remote: NodeId,
+
}
+

+
impl radicle_fetch::transport::SignalEof for ChannelFlushWriter<Vec<u8>> {
+
    type Error = io::Error;
+

+
    fn eof(&mut self) -> io::Result<()> {
+
        self.writer.send(ChannelEvent::Eof)?;
+
        self.flush()
+
    }
+
}
+

+
impl Write for ChannelFlushWriter<Vec<u8>> {
+
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+
        let n = buf.len();
+
        self.writer.send(buf.to_vec())?;
+
        Ok(n)
+
    }
+

+
    fn flush(&mut self) -> io::Result<()> {
+
        self.handle.flush(self.remote, self.stream)
+
    }
+

+
    fn write_all(&mut self, mut buf: &[u8]) -> io::Result<()> {
+
        while !buf.is_empty() {
+
            match self.write(buf) {
+
                Ok(0) => {
+
                    return Err(io::Error::new(
+
                        io::ErrorKind::WriteZero,
+
                        "failed to write whole buffer",
+
                    ));
+
                }
+
                Ok(n) => buf = &buf[n..],
+
                Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
+
                Err(e) => return Err(e),
+
            }
+
        }
+
        self.flush()
+
    }
+
}
+

impl<T: AsRef<[u8]>> ChannelWriter<T> {
    pub fn send(&self, event: impl Into<ChannelEvent<T>>) -> io::Result<()> {
        match self.sender.send_timeout(event.into(), self.timeout) {
@@ -173,17 +239,6 @@ impl<T: AsRef<[u8]>> ChannelWriter<T> {
        }
    }

-
    /// Since the git protocol is tunneled over an existing connection, we can't signal the end of
-
    /// the protocol via the usual means, which is to close the connection. Git also doesn't have
-
    /// any special message we can send to signal the end of the protocol.
-
    ///
-
    /// Hence, there's no other way for the server to know that we're done sending requests
-
    /// than to send a special message outside the git protocol. This message can then be processed
-
    /// by the remote worker to end the protocol. We use the special "eof" control message for this.
-
    pub fn eof(&self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
-
        self.sender.send(ChannelEvent::Eof)
-
    }
-

    /// Permanently close this stream.
    pub fn close(self) -> Result<(), chan::SendError<ChannelEvent<T>>> {
        self.sender.send(ChannelEvent::Close)
modified radicle-node/src/worker/fetch.rs
@@ -1,572 +1,116 @@
-
mod refspecs;
-
pub use refspecs::SpecialRefs;
-

pub mod error;

-
use std::collections::{BTreeMap, BTreeSet, HashSet};
-
use std::ops::Deref;
+
use std::collections::HashSet;

-
use radicle::crypto::{PublicKey, Verified};
-
use radicle::git::refspec;
-
use radicle::git::{url, Namespaced};
-
use radicle::prelude::{Doc, Id, NodeId};
+
use radicle::crypto::PublicKey;
+
use radicle::git::UserInfo;
+
use radicle::prelude::Id;
use radicle::storage::git::Repository;
-
use radicle::storage::refs::IDENTITY_BRANCH;
-
use radicle::storage::{Namespaces, RefUpdate, Remote, RemoteId, Validation, Validations};
-
use radicle::storage::{
-
    ReadRepository, ReadStorage, RemoteRepository, ValidateRepository, WriteRepository,
-
    WriteStorage,
-
};
-
use radicle::{git, Storage};
-

-
pub type Refspec = refspec::Refspec<git::PatternString, git::PatternString>;
-

-
/// The initial phase of staging a fetch from a remote.
-
///
-
/// The [`StagingPhaseInitial::refpsecs`] generated are to fetch the
-
/// `rad/id` and/or `rad/sigrefs` references from the remote end.
-
///
-
/// It is then expected to convert this into [`StagingPhaseFinal`]
-
/// using [`StagingRad::into_final`] to continue the rest of the
-
/// references.
-
pub struct StagingPhaseInitial<'a> {
-
    /// The inner [`Repository`] for staging fetches into.
-
    pub(super) repo: StagedRepository,
-
    /// The original [`Storage`] we are finalising changes into.
-
    production: &'a Storage,
-
    /// The local Node ID.
-
    nid: NodeId,
-
    /// The `Namespaces` passed by the fetching caller.
-
    pub(super) namespaces: Namespaces,
-
    _tmp: tempfile::TempDir,
-
}
-

-
/// Indicates whether the innner [`Repository`] is being cloned into
-
/// or fetched into.
-
pub enum StagedRepository {
-
    Cloning(Repository),
-
    Fetching(Repository),
-
}
-

-
impl StagedRepository {
-
    pub fn is_cloning(&self) -> bool {
-
        matches!(self, Self::Cloning(_))
-
    }
-
}
-

-
impl Deref for StagedRepository {
-
    type Target = Repository;
-

-
    fn deref(&self) -> &Self::Target {
-
        match self {
-
            Self::Cloning(repo) => repo,
-
            Self::Fetching(repo) => repo,
-
        }
-
    }
-
}
-

-
pub enum FinalStagedRepository {
-
    Cloning {
-
        repo: Repository,
-
        trusted: HashSet<NodeId>,
-
    },
-
    Fetching {
-
        repo: Repository,
-
        refs: BTreeSet<Namespaced<'static>>,
-
    },
-
}
-

-
impl FinalStagedRepository {
-
    pub fn is_cloning(&self) -> bool {
-
        matches!(self, Self::Cloning { .. })
-
    }
-
}
-

-
impl Deref for FinalStagedRepository {
-
    type Target = Repository;
-

-
    fn deref(&self) -> &Self::Target {
-
        match self {
-
            Self::Cloning { repo, .. } => repo,
-
            Self::Fetching { repo, .. } => repo,
-
        }
-
    }
-
}
-

-
/// The second, and final, phase of staging a fetch from a remote.
-
///
-
/// The [`StagingPhaseFinal::refpsecs`] generated are to fetch any follow-up
-
/// references after the fetch on [`StagingPhaseInitial`]. This may be all the
-
/// delegate's references in the case of cloning the new repository,
-
/// or it could be fetching the latest updates in the case of fetching
-
/// an existing repository.
-
///
-
/// It is then expected to finalise the process by transferring the
-
/// fetched references into the production storage, via
-
/// [`StagingPhaseFinal::transfer`].
-
pub struct StagingPhaseFinal<'a> {
-
    /// The inner [`Repository`] for staging fetches into.
-
    pub(super) repo: FinalStagedRepository,
-
    /// The original [`Storage`] we are finalising changes into.
-
    production: &'a Storage,
-
    /// The local Node ID.
-
    nid: NodeId,
-
    _tmp: tempfile::TempDir,
+
use radicle::storage::{ReadStorage as _, RefUpdate, WriteRepository as _, WriteStorage as _};
+
use radicle::Storage;
+
use radicle_fetch::{BlockList, FetchLimit, Tracked};
+

+
use super::channels::ChannelsFlush;
+

+
#[derive(Debug, Default)]
+
pub struct FetchResult {
+
    /// The set of updates references.
+
    pub updated: Vec<RefUpdate>,
+
    /// The set of remote namespaces that were updated.
+
    pub namespaces: HashSet<PublicKey>,
}

-
enum VerifiedRemote {
-
    Failed {
-
        reason: String,
+
pub enum Handle {
+
    Clone {
+
        handle: radicle_fetch::Handle<ChannelsFlush>,
    },
-
    Success {
-
        // Nb. unused but we want to ensure that we verify the identity
-
        _doc: Doc<Verified>,
-
        remote: Remote<Verified>,
-
        /// Validation errors
-
        validations: Validations,
+
    Pull {
+
        handle: radicle_fetch::Handle<ChannelsFlush>,
    },
-
    UpToDate,
}

-
impl<'a> StagingPhaseInitial<'a> {
-
    /// Construct a [`StagingPhaseInitial`] which sets up its
-
    /// [`StagedRepository`] in a new, temporary directory.
+
impl Handle {
    pub fn new(
-
        production: &'a Storage,
        rid: Id,
-
        nid: NodeId,
-
        namespaces: Namespaces,
-
    ) -> Result<Self, error::Init> {
-
        let tmp = tempfile::TempDir::new()?;
-
        log::debug!(target: "worker", "Staging fetch in {:?}", tmp.path());
-
        let staging = Storage::open(tmp.path())?;
-
        let repo = Self::repository(&staging, production, rid)?;
-
        Ok(Self {
-
            repo,
-
            nid,
-
            production,
-
            namespaces,
-
            _tmp: tmp,
-
        })
-
    }
-

-
    /// Return the fetch refspecs for fetching the necessary `rad`
-
    /// references.
-
    pub fn refspecs(&self) -> Vec<Refspec> {
-
        let id = git::PatternString::from(IDENTITY_BRANCH.clone().into_refstring());
-
        match self.repo {
-
            StagedRepository::Cloning(_) => vec![Refspec {
-
                src: id.clone(),
-
                dst: id,
-
                force: false,
-
            }],
-
            StagedRepository::Fetching(_) => SpecialRefs(self.namespaces.clone()).into_refspecs(),
+
        local: PublicKey,
+
        info: UserInfo,
+
        storage: &Storage,
+
        tracked: Tracked,
+
        blocked: BlockList,
+
        channels: ChannelsFlush,
+
    ) -> Result<Self, error::Handle> {
+
        let exists = storage.contains(&rid)?;
+
        if exists {
+
            let repo = storage.repository(rid)?;
+
            let handle = radicle_fetch::Handle::new(local, repo, tracked, blocked, channels)?;
+
            Ok(Handle::Pull { handle })
+
        } else {
+
            let repo = storage.create(rid)?;
+
            repo.set_user(&info)?;
+
            let handle = radicle_fetch::Handle::new(local, repo, tracked, blocked, channels)?;
+
            Ok(Handle::Clone { handle })
        }
    }

-
    pub fn ls_remote_refs(&self) -> Vec<git::PatternString> {
-
        match &self.namespaces {
-
            Namespaces::All => {
-
                vec![git::refspec::pattern!("refs/namespaces/*")]
-
            }
-
            Namespaces::Trusted(trusted) => trusted
-
                .iter()
-
                .map(|ns| {
-
                    git::refname!("refs/namespaces")
-
                        .join(git::Component::from(ns))
-
                        .with_pattern(git::refspec::STAR)
-
                })
-
                .collect::<Vec<_>>(),
-
        }
-
    }
-

-
    /// Convert the [`StagingPhaseInitial`] into [`StagingPhaseFinal`] to continue
-
    /// the fetch process.
-
    pub fn into_final(
-
        self,
-
        refs: BTreeSet<Namespaced<'static>>,
-
    ) -> Result<StagingPhaseFinal<'a>, error::Transition> {
-
        let repo = match self.repo {
-
            StagedRepository::Cloning(repo) => {
-
                log::debug!(target: "worker", "Loading remotes for clone of {}", repo.id);
-
                let oid = ReadRepository::identity_head(&repo)?;
-
                log::trace!(target: "worker", "Loading 'rad/id' @ {oid}");
-
                let doc = Doc::<Verified>::load_at(oid, &repo)?.doc;
-
                let mut trusted = match self.namespaces.clone() {
-
                    Namespaces::All => HashSet::new(),
-
                    Namespaces::Trusted(trusted) => trusted,
-
                };
-
                let delegates = doc.delegates.map(PublicKey::from);
-
                trusted.extend(delegates);
-
                FinalStagedRepository::Cloning { repo, trusted }
-
            }
-
            StagedRepository::Fetching(repo) => FinalStagedRepository::Fetching { repo, refs },
-
        };
-

-
        Ok(StagingPhaseFinal {
-
            repo,
-
            nid: self.nid,
-
            production: self.production,
-
            _tmp: self._tmp,
-
        })
-
    }
-

-
    fn repository(
-
        staging: &Storage,
-
        production: &Storage,
+
    pub fn fetch(
+
        mut self,
        rid: Id,
-
    ) -> Result<StagedRepository, error::Setup> {
-
        match production.contains(&rid) {
-
            Ok(true) => {
-
                let url = url::File::new(production.path_of(&rid)).to_string();
-
                log::debug!(target: "worker", "Setting up fetch for existing repository: {}", url);
-

-
                let to = staging.path_of(&rid);
-
                let copy = git::raw::build::RepoBuilder::new()
-
                    .bare(true)
-
                    .clone_local(git::raw::build::CloneLocal::Local)
-
                    .clone(&url, &to)?;
-

-
                {
-
                    // The clone doesn't actually clone all refs, it only creates a ref for the
-
                    // default branch; so we explicitly fetch the rest of the refs, so they
-
                    // don't need to be re-fetched from the remote.
-
                    let mut remote = copy.remote_anonymous(&url)?;
-
                    let refspecs: Vec<_> = Namespaces::All
-
                        .to_refspecs()
-
                        .into_iter()
-
                        .map(|s| s.to_string())
-
                        .collect();
-
                    remote.fetch(&refspecs, None, None)?;
+
        storage: &Storage,
+
        limit: FetchLimit,
+
        remote: PublicKey,
+
    ) -> Result<FetchResult, error::Fetch> {
+
        let result = match &mut self {
+
            Self::Clone { handle } => {
+
                log::debug!(target: "worker", "{} cloning from {remote}", handle.local());
+
                match radicle_fetch::clone(handle, limit, remote) {
+
                    Ok(result) => result,
+
                    Err(e) => {
+
                        // N.b. the clone failed so we remove the
+
                        // repository from the storage
+
                        storage.remove(rid)?;
+
                        return Err(e.into());
+
                    }
                }
-
                log::debug!(target: "worker", "Local clone successful for {rid}");
-

-
                Ok(StagedRepository::Fetching(Repository {
-
                    id: rid,
-
                    backend: copy,
-
                }))
-
            }
-
            Ok(false) => {
-
                log::debug!(target: "worker", "Setting up clone for new repository {}", rid);
-
                let repo = staging.create(rid)?;
-

-
                Ok(StagedRepository::Cloning(repo))
-
            }
-
            Err(e) => Err(e.into()),
-
        }
-
    }
-
}
-

-
impl<'a> StagingPhaseFinal<'a> {
-
    /// Return the fetch refspecs for fetching the necessary
-
    /// references.
-
    pub fn refspecs(&self) -> Vec<Refspec> {
-
        match &self.repo {
-
            FinalStagedRepository::Cloning { trusted, .. } => {
-
                Namespaces::Trusted(trusted.clone()).to_refspecs()
            }
-
            FinalStagedRepository::Fetching { refs, .. } => refs
-
                .iter()
-
                .map(|r| Refspec {
-
                    src: r.clone().to_ref_string().into(),
-
                    dst: r.clone().to_ref_string().into(),
-
                    force: true,
-
                })
-
                .collect(),
-
        }
-
    }
-

-
    /// Finalise the fetching process via the following steps.
-
    ///
-
    /// Verify all `rad/id` and `rad/sigrefs` from fetched
-
    /// remotes. Any remotes that fail will be ignored and not fetched
-
    /// into the production repository.
-
    ///
-
    /// For each remote that verifies, fetch from the staging storage
-
    /// into the production storage using the refspec:
-
    ///
-
    /// ```text
-
    /// refs/namespaces/<remote>/*:refs/namespaces/<remote>/*
-
    /// ```
-
    ///
-
    /// All references that were updated are returned as a
-
    /// [`RefUpdate`].
-
    pub fn transfer(self) -> Result<(Vec<RefUpdate>, HashSet<NodeId>), error::Transfer> {
-
        // Nb. we have to verify in a different order when fetching vs. cloning, due to needing
-
        // access to the existing repository in the fetching case.
-
        let (production, verifications) = match &self.repo {
-
            FinalStagedRepository::Cloning { repo, .. } => {
-
                let verifications = self.verify::<Repository>(None)?;
-
                let prod = self.production.create(repo.id)?;
-

-
                (prod, verifications)
-
            }
-
            FinalStagedRepository::Fetching { repo, .. } => {
-
                let prod = self.production.repository(repo.id)?;
-
                let verifications = self.verify(Some(&prod))?;
-

-
                (prod, verifications)
+
            Self::Pull { handle } => {
+
                log::debug!(target: "worker", "{} pulling from {remote}", handle.local());
+
                radicle_fetch::pull(handle, limit, remote)?
            }
        };
-
        let url = url::File::new(self.repo.path().to_path_buf()).to_string();
-
        let mut remote = production.backend.remote_anonymous(&url)?;
-
        let mut updates = Vec::new();
-
        let mut delete = HashSet::new();
-
        let mut skipped = HashSet::new();
-

-
        let callbacks = ref_updates(&mut updates);
-
        let mut remotes = {
-
            let specs = verifications
-
                .into_iter()
-
                .flat_map(|(remote, verified)| match verified {
-
                    VerifiedRemote::UpToDate => {
-
                        log::debug!(target: "worker", "{remote} is up-to-date");
-
                        skipped.insert(remote);
-

-
                        vec![]
-
                    }
-
                    VerifiedRemote::Failed { reason } => {
-
                        // TODO: We should include the skipped remotes in the fetch result,
-
                        // with the reason why they're skipped.
-
                        log::warn!(
-
                            target: "worker",
-
                            "{remote} failed to verify, ignoring ref updates: {reason}",
-
                        );
-
                        vec![]
-
                    }
-
                    VerifiedRemote::Success {
-
                        remote,
-
                        validations,
-
                        ..
-
                    } => {
-
                        let ns = remote.id.to_namespace();
-
                        let mut refspecs = vec![];
-

-
                        let mut unsigned = Vec::new();
-
                        // Unsigned refs should be deleted.
-
                        for validation in validations {
-
                            if let Validation::UnsignedRef(name) = validation {
-
                                unsigned.push(name);
-
                            }
-
                        }
-
                        delete.insert((remote.id, unsigned));
-

-
                        //  First add the standard git refs.
-
                        let heads = ns.join(git::refname!("refs/heads"));
-
                        let cobs = ns.join(git::refname!("refs/cobs"));
-
                        let tags = ns.join(git::refname!("refs/tags"));
-
                        let notes = ns.join(git::refname!("refs/notes"));
-

-
                        for refname in [heads, cobs, tags, notes] {
-
                            let pattern = refname.with_pattern(git::refspec::STAR);
-
                            refspecs.push((
-
                                remote.id,
-
                                Refspec {
-
                                    src: pattern.clone(),
-
                                    dst: pattern,
-
                                    force: true,
-
                                }
-
                                .to_string(),
-
                            ));
-
                        }
-

-
                        // Then add the special refs.
-
                        let id = ns.join(&*radicle::git::refs::storage::IDENTITY_BRANCH);
-
                        let sigrefs = ns.join(&*radicle::git::refs::storage::SIGREFS_BRANCH);
-

-
                        refspecs.push((
-
                            remote.id,
-
                            Refspec {
-
                                src: id.clone().into(),
-
                                dst: id.into(),
-
                                // Nb. The identity branch is allowed to be force-updated.
-
                                force: true,
-
                            }
-
                            .to_string(),
-
                        ));
-
                        refspecs.push((
-
                            remote.id,
-
                            Refspec {
-
                                src: sigrefs.clone().into(),
-
                                dst: sigrefs.into(),
-
                                // Nb. Sigrefs are never force-updated.
-
                                force: false,
-
                            }
-
                            .to_string(),
-
                        ));
-
                        refspecs
-
                    }
-
                })
-
                .collect::<Vec<_>>();
-

-
            let (fetching, specs): (HashSet<_>, Vec<_>) = specs.into_iter().unzip();
-

-
            if self.repo.is_cloning()
-
                && !self
-
                    .repo
-
                    .delegates()?
-
                    .iter()
-
                    .all(|d| fetching.contains(d.as_key()))
-
            {
-
                return Err(error::Transfer::NoDelegates);
-
            }
-
            log::debug!(target: "worker", "Transferring staging to production {url}");
-

-
            let mut opts = git::raw::FetchOptions::default();
-
            opts.remote_callbacks(callbacks);
-
            // Nb. To prevent refs owned by the local node from being deleted from the stored
-
            // copy if they are not on the remote side, we turn pruning off.
-
            // However, globally turning off pruning isn't a ideal either, so a better solution
-
            // should be devised.
-
            opts.prune(git::raw::FetchPrune::Off);
-

-
            // Fetch into production copy.
-
            remote.fetch(&specs, Some(&mut opts), None)?;

-
            // Delete unsigned refs.
-
            for (namespace, unsigned) in delete {
-
                for refstr in unsigned {
-
                    let q = git::Qualified::from_refstr(&refstr)
-
                        .expect("StagingPhaseFinal::transfer: unsigned references are qualified");
+
        for rejected in result.rejected() {
+
            log::warn!(target: "worker", "Rejected update for {}", rejected.refname())
+
        }

-
                    if let Ok(mut r) = production.reference(&namespace, &q) {
-
                        log::debug!(target: "worker", "Deleting unsigned ref {namespace}/{q}..");
+
        for warn in result.warnings() {
+
            log::warn!(target: "worker", "Validation error: {}", warn);
+
        }

-
                        r.delete()?;
-
                    }
+
        match result {
+
            radicle_fetch::FetchResult::Failed { failures, .. } => {
+
                for fail in failures.iter() {
+
                    log::error!(target: "worker", "Validation error: {}", fail);
                }
+
                Err(error::Fetch::Validation)
            }
-
            fetching
-
        };
-
        let head = production.set_head()?;
-
        log::debug!(target: "worker", "Head for {} set to {head}", production.id);
-

-
        let head = production.set_identity_head()?;
-
        log::debug!(target: "worker", "'refs/rad/id' for {} set to {head}", production.id);
-

-
        #[cfg(test)]
-
        // N.b. This is to prevent us from shooting ourselves in the
-
        // foot with storage inconsistencies.
-
        radicle::debug_assert_matches!(
-
            production.validate(),
-
            Ok(validations) if validations.is_empty(),
-
            "repository {} is not valid",
-
            production.id,
-
        );
-

-
        // Extend the list of remotes we attempted to fetch from with the skipped remotes.
-
        // This confirms to the user that the remote was indeed tried.
-
        remotes.extend(skipped);
-

-
        Ok((updates, remotes))
-
    }
-

-
    fn remotes(&self) -> Result<Box<dyn Iterator<Item = Remote> + '_>, git::raw::Error> {
-
        match &self.repo {
-
            FinalStagedRepository::Cloning { trusted, .. } => Ok(Box::new(
-
                trusted
-
                    .iter()
-
                    .filter_map(|remote| self.repo.remote(remote).ok()),
-
            )),
-
            FinalStagedRepository::Fetching { repo, refs } => {
-
                // Only verify remotes we're fetching refs from.
-
                let remotes = refs
-
                    .iter()
-
                    .filter_map(|r| NodeId::from_namespaced(r).ok())
-
                    .collect::<HashSet<_>>();
-
                let remotes = remotes.into_iter().filter_map(|r| repo.remote(&r).ok());
-

-
                Ok(Box::new(remotes))
+
            radicle_fetch::FetchResult::Success {
+
                applied, remotes, ..
+
            } => {
+
                self.repository_mut().set_head()?;
+
                self.repository_mut().set_identity_head()?;
+

+
                Ok(FetchResult {
+
                    updated: applied.updated,
+
                    namespaces: remotes.into_iter().collect(),
+
                })
            }
        }
    }

-
    fn verify<R: ReadRepository>(
-
        &self,
-
        local: Option<&R>,
-
    ) -> Result<BTreeMap<RemoteId, VerifiedRemote>, git::raw::Error> {
-
        let result = self
-
            .remotes()?
-
            .filter(|remote| remote.id != self.nid || self.repo.is_cloning())
-
            .map(|remote| {
-
                let remote_id = remote.id;
-

-
                log::debug!(target: "worker", "Verifying remote {remote_id}..");
-

-
                // If we have a local copy, ie. we're not cloning, we check that the signed refs
-
                // are being fast-forwarded.
-
                if let Some(local) = local {
-
                    if let (Ok(local), Ok(staging)) = (
-
                        local.reference_oid(&remote_id, &git::refs::storage::SIGREFS_BRANCH),
-
                        self.repo.reference_oid(&remote_id, &git::refs::storage::SIGREFS_BRANCH),
-
                    ) {
-
                        if local != staging  {
-
                            match self
-
                                .repo
-
                                .backend
-
                                .graph_descendant_of(staging.into(), local.into())
-
                            {
-
                                Ok(true) => {
-
                                    log::debug!(target: "worker", "Signed refs for {remote_id} fast-foward: {local} -> {staging}");
-
                                }
-
                                Ok(false) => {
-
                                    return (
-
                                        remote_id,
-
                                        VerifiedRemote::Failed {
-
                                            reason: "signed refs have diverged".to_owned()
-
                                        }
-
                                    );
-
                                }
-
                                Err(e) => {
-
                                    return (
-
                                        remote_id,
-
                                        VerifiedRemote::Failed { reason: e.to_string() },
-
                                    );
-
                                }
-
                            }
-
                        } else {
-
                            return (remote_id, VerifiedRemote::UpToDate);
-
                        }
-
                    }
-
                }
-

-
                // Nb. We aren't verifying this specific remote's identity branch.
-
                let verification = match self.repo.identity_doc() {
-
                    Ok(doc) => match self.repo.validate_remote(&remote) {
-
                        Ok(validations) => VerifiedRemote::Success {
-
                            _doc: doc.into(),
-
                            remote,
-
                            validations,
-
                        },
-
                        Err(e) => VerifiedRemote::Failed {
-
                            reason: e.to_string(),
-
                        },
-
                    },
-
                    Err(e) => VerifiedRemote::Failed {
-
                        reason: e.to_string(),
-
                    },
-
                };
-
                (remote_id, verification)
-
            })
-
            .collect();
-

-
        Ok(result)
-
    }
-
}
-

-
fn ref_updates(updates: &mut Vec<RefUpdate>) -> git::raw::RemoteCallbacks<'_> {
-
    let mut callbacks = git::raw::RemoteCallbacks::new();
-
    callbacks.update_tips(|name, old, new| {
-
        if let Ok(name) = git::RefString::try_from(name) {
-
            if name.to_namespaced().is_some() {
-
                updates.push(RefUpdate::from(name, old, new));
-
                // Returning `true` ensures the process is not aborted.
-
                return true;
-
            }
+
    fn repository_mut(&mut self) -> &mut Repository {
+
        match self {
+
            Self::Clone { handle } => handle.repository_mut(),
+
            Self::Pull { handle } => handle.repository_mut(),
        }
-
        log::warn!(target: "worker", "Invalid ref `{}` detected; aborting fetch", name);
-

-
        false
-
    });
-
    callbacks
+
    }
}
modified radicle-node/src/worker/fetch/error.rs
@@ -2,50 +2,35 @@ use std::io;

use thiserror::Error;

-
use radicle::{git, identity, storage, storage::refs};
+
use radicle::{git, identity, storage};
+
use radicle_fetch as fetch;

#[derive(Debug, Error)]
-
pub enum Init {
+
pub enum Fetch {
    #[error(transparent)]
-
    Io(#[from] io::Error),
-
    #[error(transparent)]
-
    Setup(#[from] Setup),
-
}
-

-
#[derive(Debug, Error)]
-
pub enum Setup {
+
    Run(#[from] fetch::Error),
    #[error(transparent)]
    Git(#[from] git::raw::Error),
    #[error(transparent)]
-
    Identity(#[from] identity::DocError),
-
    #[error(transparent)]
    Storage(#[from] storage::Error),
    #[error(transparent)]
-
    Repository(#[from] radicle::storage::RepositoryError),
-
}
-

-
#[derive(Debug, Error)]
-
pub enum Transfer {
-
    #[error(transparent)]
-
    Git(#[from] git::raw::Error),
-
    #[error(transparent)]
-
    Identity(#[from] identity::DocError),
-
    #[error(transparent)]
-
    Storage(#[from] storage::Error),
-
    #[error("no delegates in transfer")]
-
    NoDelegates,
+
    StorageCopy(#[from] io::Error),
    #[error(transparent)]
    Repository(#[from] radicle::storage::RepositoryError),
+
    #[error("validation of storage repository failed")]
+
    Validation,
}

#[derive(Debug, Error)]
-
pub enum Transition {
+
pub enum Handle {
    #[error(transparent)]
-
    Git(#[from] git::raw::Error),
+
    Doc(#[from] identity::DocError),
+
    #[error(transparent)]
+
    Io(#[from] io::Error),
    #[error(transparent)]
-
    Identity(#[from] identity::DocError),
+
    Init(#[from] fetch::handle::error::Init),
    #[error(transparent)]
-
    Refs(#[from] refs::Error),
+
    Storage(#[from] storage::Error),
    #[error(transparent)]
    Repository(#[from] radicle::storage::RepositoryError),
}
deleted radicle-node/src/worker/fetch/refspecs.rs
@@ -1,48 +0,0 @@
-
use radicle::crypto::PublicKey;
-
use radicle::git;
-
use radicle::git::refs::storage::{IDENTITY_BRANCH, SIGREFS_BRANCH};
-
use radicle::storage::git::NAMESPACES_GLOB;
-
use radicle::storage::Namespaces;
-

-
use super::Refspec;
-

-
/// Radicle special refs, i.e. `refs/rad/*`.
-
pub struct SpecialRefs(pub(super) Namespaces);
-

-
impl SpecialRefs {
-
    pub fn into_refspecs(self) -> Vec<Refspec> {
-
        match &self.0 {
-
            Namespaces::All => {
-
                let id = NAMESPACES_GLOB.join(&*IDENTITY_BRANCH);
-
                let sigrefs = NAMESPACES_GLOB.join(&*SIGREFS_BRANCH);
-

-
                [id, sigrefs]
-
                    .into_iter()
-
                    .map(|spec| Refspec {
-
                        src: spec.clone(),
-
                        dst: spec,
-
                        force: true,
-
                    })
-
                    .collect()
-
            }
-
            Namespaces::Trusted(pks) => pks.iter().flat_map(rad_refs).collect(),
-
        }
-
    }
-
}
-

-
fn rad_refs(pk: &PublicKey) -> Vec<Refspec> {
-
    let ns = pk.to_namespace();
-
    let id = git::PatternString::from(ns.join(&*IDENTITY_BRANCH));
-
    let id = Refspec {
-
        src: id.clone(),
-
        dst: id,
-
        force: true,
-
    };
-
    let sigrefs = git::PatternString::from(ns.join(&*SIGREFS_BRANCH));
-
    let sigrefs = Refspec {
-
        src: sigrefs.clone(),
-
        dst: sigrefs,
-
        force: true,
-
    };
-
    vec![id, sigrefs]
-
}
deleted radicle-node/src/worker/tunnel.rs
@@ -1,89 +0,0 @@
-
use std::{io, io::Read, net, time};
-

-
use super::channels::Channels;
-
use super::{Handle, NodeId, StreamId, Worker};
-
use crate::runtime::thread;
-

-
/// Tunnels fetches to a remote peer.
-
pub struct Tunnel<'a> {
-
    channels: &'a mut Channels,
-
    listener: net::TcpListener,
-
    local_addr: net::SocketAddr,
-
    stream: StreamId,
-
    local: NodeId,
-
    remote: NodeId,
-
    handle: Handle,
-
}
-

-
impl<'a> Tunnel<'a> {
-
    pub(super) fn with(
-
        channels: &'a mut Channels,
-
        stream: StreamId,
-
        local: NodeId,
-
        remote: NodeId,
-
        handle: Handle,
-
    ) -> io::Result<Self> {
-
        let listener = net::TcpListener::bind(net::SocketAddr::from(([0, 0, 0, 0], 0)))?;
-
        let local_addr = listener.local_addr()?;
-

-
        Ok(Self {
-
            channels,
-
            listener,
-
            local_addr,
-
            stream,
-
            local,
-
            remote,
-
            handle,
-
        })
-
    }
-

-
    pub fn local_addr(&self) -> net::SocketAddr {
-
        self.local_addr
-
    }
-

-
    /// Run the tunnel until the connection is closed.
-
    pub fn run(mut self, timeout: time::Duration) -> io::Result<()> {
-
        let (remote_w, remote_r) = self.channels.split();
-
        let (local, _) = self.listener.accept()?;
-
        let (mut local_r, local_w) = (local.try_clone()?, local);
-

-
        local_r.set_read_timeout(Some(timeout))?;
-
        local_w.set_write_timeout(Some(timeout))?;
-

-
        let nid = self.remote;
-
        let stream_id = self.stream;
-

-
        thread::scope(|s| {
-
            let remote_to_local =
-
                thread::spawn_scoped(&self.local, "tunnel", s, || remote_r.pipe(local_w));
-

-
            let local_to_remote = thread::spawn_scoped(&self.local, "tunnel", s, || {
-
                let mut buffer = [0; u16::MAX as usize + 1];
-

-
                loop {
-
                    match local_r.read(&mut buffer) {
-
                        Ok(0) => break,
-
                        Ok(n) => {
-
                            remote_w.send(buffer[..n].to_vec())?;
-

-
                            if let Err(e) = self.handle.flush(nid, stream_id) {
-
                                log::error!(
-
                                    target: "worker", "Worker channel disconnected; aborting"
-
                                );
-
                                return Err(e);
-
                            }
-
                        }
-
                        Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => break,
-
                        Err(e) => return Err(e),
-
                    }
-
                }
-
                Worker::eof(nid, stream_id, remote_w, &mut self.handle)
-
            });
-

-
            remote_to_local.join().unwrap()?;
-
            local_to_remote.join().unwrap()?;
-

-
            Ok::<(), io::Error>(())
-
        })
-
    }
-
}
added radicle-node/src/worker/upload_pack.rs
@@ -0,0 +1,243 @@
+
use std::io;
+
use std::io::Write;
+
use std::process::{Command, ExitStatus, Stdio};
+

+
use radicle::node::NodeId;
+
use radicle::storage::git::paths;
+
use radicle::Storage;
+

+
use crate::runtime::thread;
+

+
/// Perform the Git upload-pack process, given that the Git request
+
/// `header` has already been read and parsed.
+
///
+
/// N.b. The upload-pack process itself is strict, i.e. it will read
+
/// requests from the client indefinitely, and so the client side MUST
+
/// send the EOF file message.
+
pub fn upload_pack<R, W>(
+
    nid: &NodeId,
+
    storage: &Storage,
+
    header: &pktline::GitRequest,
+
    mut recv: R,
+
    mut send: W,
+
) -> io::Result<ExitStatus>
+
where
+
    R: io::Read + Send,
+
    W: io::Write + Send,
+
{
+
    let protocol_version = header
+
        .extra
+
        .iter()
+
        .find_map(|kv| match kv {
+
            (ref k, Some(v)) if k == "version" => {
+
                let version = match v.as_str() {
+
                    "2" => 2,
+
                    "1" => 1,
+
                    _ => 0,
+
                };
+
                Some(version)
+
            }
+
            _ => None,
+
        })
+
        .unwrap_or(0);
+

+
    if protocol_version != 2 {
+
        return Err(io::Error::new(
+
            io::ErrorKind::InvalidData,
+
            "only Git protocol version 2 is supported",
+
        ));
+
    }
+

+
    let git_dir = paths::repository(storage, &header.repo);
+
    let mut child = {
+
        let mut cmd = Command::new("git");
+
        cmd.current_dir(git_dir)
+
            .env_clear()
+
            .envs(std::env::vars().filter(|(key, _)| key == "PATH" || key.starts_with("GIT_TRACE")))
+
            .env("GIT_PROTOCOL", format!("version={protocol_version}"))
+
            .args([
+
                "-c",
+
                "uploadpack.allowAnySha1InWant=true",
+
                "-c",
+
                "uploadpack.allowRefInWant=true",
+
                "-c",
+
                "lsrefs.unborn=ignore",
+
                "upload-pack",
+
                "--strict",
+
                ".",
+
            ])
+
            .stdout(Stdio::piped())
+
            .stdin(Stdio::piped())
+
            .stderr(Stdio::inherit());
+

+
        cmd.spawn()?
+
    };
+

+
    let mut stdin = child.stdin.take().unwrap();
+
    let mut stdout = io::BufReader::new(child.stdout.take().unwrap());
+
    thread::scope(|s| {
+
        thread::spawn_scoped(nid, "upload-pack", s, || {
+
            // N.b. we indefinitely copy stdout to the sender,
+
            // i.e. there's no need for a loop.
+
            match io::copy(&mut stdout, &mut send) {
+
                Ok(_) => {}
+
                Err(e) => {
+
                    log::error!(target: "worker", "Worker channel disconnected; aborting: {e}");
+
                }
+
            }
+
        });
+

+
        let reader = thread::spawn_scoped(nid, "upload-pack", s, || {
+
            let mut buffer = [0; u16::MAX as usize + 1];
+
            loop {
+
                match recv.read(&mut buffer) {
+
                    Ok(0) => break,
+
                    Ok(n) => {
+
                        if let Err(e) = stdin.write_all(&buffer[..n]) {
+
                            log::warn!(target: "worker", "upload-pack stdin write error: {e}");
+
                            break;
+
                        }
+
                    }
+
                    Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
+
                        log::debug!(target: "worker", "exiting upload-pack receive thread");
+
                        break;
+
                    }
+
                    Err(e) => {
+
                        log::error!(target: "worker", "upload-pack channel read error: {e}");
+
                        break;
+
                    }
+
                }
+
            }
+
        });
+

+
        // N.b. we only care if the `reader` is finished. We then kill
+
        // the child which will end the thread for the sender.
+
        loop {
+
            if reader.is_finished() {
+
                child.kill()?;
+
                break;
+
            } else {
+
                std::thread::sleep(std::time::Duration::from_millis(100));
+
            }
+
        }
+
        Ok::<_, io::Error>(())
+
    })?;
+

+
    let status = child.wait()?;
+
    Ok(status)
+
}
+

+
pub(super) mod pktline {
+
    use std::io;
+
    use std::io::Read;
+
    use std::str;
+

+
    use radicle::prelude::Id;
+

+
    pub const HEADER_LEN: usize = 4;
+

+
    /// Read and parse the `GitRequest` data from the client side.
+
    pub fn git_request<R>(reader: &mut R) -> io::Result<GitRequest>
+
    where
+
        R: io::Read,
+
    {
+
        let mut reader = Reader::new(reader);
+
        let (header, _) = reader.read_request_pktline()?;
+
        Ok(header)
+
    }
+

+
    struct Reader<'a, R> {
+
        stream: &'a mut R,
+
    }
+

+
    impl<'a, R: io::Read> Reader<'a, R> {
+
        /// Create a new packet-line reader.
+
        pub fn new(stream: &'a mut R) -> Self {
+
            Self { stream }
+
        }
+

+
        /// Parse a Git request packet-line.
+
        ///
+
        /// Example: `0032git-upload-pack /project.git\0host=myserver.com\0`
+
        ///
+
        fn read_request_pktline(&mut self) -> io::Result<(GitRequest, Vec<u8>)> {
+
            let mut pktline = [0u8; 1024];
+
            let length = self.read_pktline(&mut pktline)?;
+
            let Some(cmd) = GitRequest::parse(&pktline[4..length]) else {
+
                return Err(io::ErrorKind::InvalidInput.into());
+
            };
+
            Ok((cmd, Vec::from(&pktline[..length])))
+
        }
+

+
        /// Parse a Git packet-line.
+
        fn read_pktline(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
            self.read_exact(&mut buf[..HEADER_LEN])?;
+

+
            let length = str::from_utf8(&buf[..HEADER_LEN])
+
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
+
            let length = usize::from_str_radix(length, 16)
+
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e.to_string()))?;
+

+
            self.read_exact(&mut buf[HEADER_LEN..length])?;
+

+
            Ok(length)
+
        }
+
    }
+

+
    impl<'a, R: io::Read> io::Read for Reader<'a, R> {
+
        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
+
            self.stream.read(buf)
+
        }
+
    }
+

+
    /// The Git request packet-line for a Heartwood repository.
+
    ///
+
    /// Example: `0032git-upload-pack /rad:z3gqcJUoA1n9HaHKufZs5FCSGazv5.git\0host=myserver.com\0`
+
    #[derive(Debug)]
+
    pub struct GitRequest {
+
        pub repo: Id,
+
        pub path: String,
+
        pub host: Option<(String, Option<u16>)>,
+
        pub extra: Vec<(String, Option<String>)>,
+
    }
+

+
    impl GitRequest {
+
        /// Parse a Git command from a packet-line.
+
        fn parse(input: &[u8]) -> Option<Self> {
+
            let input = str::from_utf8(input).ok()?;
+
            let mut parts = input
+
                .strip_prefix("git-upload-pack ")?
+
                .split_terminator('\0');
+

+
            let path = parts.next()?.to_owned();
+
            let repo = path.strip_prefix('/')?.parse().ok()?;
+
            let host = match parts.next() {
+
                None | Some("") => None,
+
                Some(host) => {
+
                    let host = host.strip_prefix("host=")?;
+
                    match host.split_once(':') {
+
                        None => Some((host.to_owned(), None)),
+
                        Some((host, port)) => {
+
                            let port = port.parse::<u16>().ok()?;
+
                            Some((host.to_owned(), Some(port)))
+
                        }
+
                    }
+
                }
+
            };
+
            let extra = parts
+
                .skip_while(|part| part.is_empty())
+
                .map(|part| match part.split_once('=') {
+
                    None => (part.to_owned(), None),
+
                    Some((k, v)) => (k.to_owned(), Some(v.to_owned())),
+
                })
+
                .collect();
+

+
            Some(Self {
+
                repo,
+
                path,
+
                host,
+
                extra,
+
            })
+
        }
+
    }
+
}