Radish alpha
r
rad:z39mP9rQAaGmERfUMPULfPUi473tY
Radicle terminal user interface
Radicle
Git
lib: Properly terminate event listener thread
Erik Kundt committed 4 months ago
commit 3c18f6885620d432ae81826c66dfa29e1f585e2b
parent f157fa9
7 files changed +136 -71
modified Cargo.lock
@@ -204,6 +204,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16"

[[package]]
+
name = "async-stdin"
+
version = "0.3.1"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "b1ff8b5d9b5ec29e0f49583ba71847b8c8888b67a8510133048a380903aa6822"
+
dependencies = [
+
 "tokio",
+
]
+

+
[[package]]
name = "autocfg"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1007,6 +1016,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"

[[package]]
+
name = "futures-sink"
+
version = "0.3.31"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7"
+

+
[[package]]
name = "fuzzy-matcher"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2345,6 +2360,7 @@ version = "0.6.0"
dependencies = [
 "ansi-to-tui",
 "anyhow",
+
 "async-stdin",
 "fuzzy-matcher",
 "git2",
 "homedir",
@@ -2374,6 +2390,7 @@ dependencies = [
 "timeago 0.5.0",
 "tokio",
 "tokio-stream",
+
 "tokio-util",
 "tui-textarea",
 "tui-tree-widget",
]
@@ -3283,6 +3300,19 @@ dependencies = [
]

[[package]]
+
name = "tokio-util"
+
version = "0.7.17"
+
source = "registry+https://github.com/rust-lang/crates.io-index"
+
checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594"
+
dependencies = [
+
 "bytes",
+
 "futures-core",
+
 "futures-sink",
+
 "pin-project-lite",
+
 "tokio",
+
]
+

+
[[package]]
name = "toml"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
modified Cargo.toml
@@ -24,6 +24,7 @@ required-features = ["bin"]
[dependencies]
ansi-to-tui = { version = "7.0.0" }
anyhow = { version = "1" }
+
async-stdin = { version = "0.3.1" }
inquire = { version = "0.7.4", default-features = false, features = [
    "termion",
    "editor",
@@ -56,6 +57,7 @@ terminal-light = { version = "1.4.0" }
textwrap = { version = "0.16.0" }
thiserror = { version = "2.0.16" }
tokio = { version = "1.32.0", features = ["full"] }
+
tokio-util = { version = "0.7.17" }
tokio-stream = { version = "0.1.14" }
tui-textarea = { version = "0.7.0", default-features = false, features = [
    "termion",
modified bin/commands/inbox/list/ui.rs
@@ -85,7 +85,7 @@ impl From<&State> for BrowserProps<'_> {
        Self {
            mode: context.mode.clone(),
            header,
-
            notifications: notifications,
+
            notifications,
            selected: state.browser.selected,
            stats,
            columns: [
modified src/lib.rs
@@ -9,17 +9,17 @@ use std::fmt::Debug;

use anyhow::Result;

+
use serde::ser::{Serialize, SerializeStruct, Serializer};
+

#[cfg(unix)]
use tokio::signal::unix::signal;
-

use tokio::sync::broadcast;
use tokio::sync::mpsc::unbounded_channel;

-
use serde::ser::{Serialize, SerializeStruct, Serializer};
-

use ratatui::Viewport;

use store::Update;
+
use terminal::StdinReader;
use ui::im;
use ui::im::Show;
use ui::rm;
@@ -183,42 +183,36 @@ where
{
    let (terminator, mut interrupt_rx) = create_termination();
    let (state_tx, state_rx) = unbounded_channel();
+
    let (event_tx, event_rx) = unbounded_channel();
    let (work_tx, work_rx) = unbounded_channel();

    let store = store::Store::<S, M, R>::new(state_tx.clone());
    let worker = task::Worker::<T, M, R>::new(work_tx.clone());
    let frontend = rm::Frontend::default();
+
    let stdin_reader = StdinReader::default();

-
    let worker_interrupt_rx = interrupt_rx.resubscribe();
-
    let store_interrupt_rx = interrupt_rx.resubscribe();
-
    let frontend_interrupt_rx = interrupt_rx.resubscribe();
-

-
    let worker_message_rx = channel.rx.resubscribe();
-
    let store_message_rx = channel.rx.resubscribe();
-

-
    // TODO(erikli): Handle errors properly
+
    // TODO(erikli): Handle errors
    let _ = tokio::try_join!(
-
        tokio::spawn(async move {
-
            worker
-
                .run(processors, worker_message_rx, worker_interrupt_rx)
-
                .await
-
        }),
-
        tokio::spawn(async move {
-
            store
-
                .run(
-
                    state,
-
                    terminator,
-
                    store_message_rx,
-
                    work_rx,
-
                    store_interrupt_rx,
-
                )
-
                .await
-
        }),
-
        tokio::spawn(async move {
-
            frontend
-
                .run(root, state_rx, frontend_interrupt_rx, viewport)
-
                .await
-
        }),
+
        worker.run(
+
            processors,
+
            channel.rx.resubscribe(),
+
            interrupt_rx.resubscribe()
+
        ),
+
        store.run(
+
            state,
+
            terminator,
+
            channel.rx.resubscribe(),
+
            work_rx,
+
            interrupt_rx.resubscribe(),
+
        ),
+
        frontend.run(
+
            root,
+
            state_rx,
+
            event_rx,
+
            interrupt_rx.resubscribe(),
+
            viewport
+
        ),
+
        stdin_reader.run(event_tx, interrupt_rx.resubscribe()),
    )?;

    if let Ok(reason) = interrupt_rx.recv().await {
@@ -250,13 +244,16 @@ where
{
    let (terminator, mut interrupt_rx) = create_termination();
    let (state_tx, state_rx) = unbounded_channel();
+
    let (event_tx, event_rx) = unbounded_channel();
    let (work_tx, work_rx) = unbounded_channel();

    let store = store::Store::<S, M, R>::new(state_tx.clone());
    let worker = task::Worker::<T, M, R>::new(work_tx.clone());
    let frontend = im::Frontend::default();
+
    let stdin_reader = StdinReader::default();

-
    tokio::try_join!(
+
    // TODO(erikli): Handle errors
+
    let _ = tokio::try_join!(
        worker.run(
            processors,
            channel.rx.resubscribe(),
@@ -267,9 +264,16 @@ where
            terminator,
            channel.rx.resubscribe(),
            work_rx,
-
            interrupt_rx.resubscribe()
+
            interrupt_rx.resubscribe(),
+
        ),
+
        frontend.run(
+
            channel.tx,
+
            state_rx,
+
            event_rx,
+
            interrupt_rx.resubscribe(),
+
            viewport
        ),
-
        frontend.run(channel.tx, state_rx, interrupt_rx.resubscribe(), viewport),
+
        stdin_reader.run(event_tx, interrupt_rx.resubscribe()),
    )?;

    if let Ok(reason) = interrupt_rx.recv().await {
modified src/terminal.rs
@@ -1,20 +1,28 @@
-
use std::io::{self, Write};
+
use std::fmt::Debug;
+
use std::io;
+
use std::io::Write;
use std::thread;
-
use std::time::Instant;
+
use std::time::Duration;

-
use ratatui::termion::screen::{AlternateScreen, IntoAlternateScreen};
+
use signal_hook::iterator::Signals;
+

+
use tokio::sync::broadcast;
+
use tokio::sync::mpsc::UnboundedSender;
+

+
use tokio_util::sync::CancellationToken;
+

+
use termion::async_stdin;
use termion::input::TermRead;
use termion::raw::{IntoRawMode, RawTerminal};

+
use ratatui::termion::screen::{AlternateScreen, IntoAlternateScreen};
use ratatui::{prelude::*, CompletedFrame};
use ratatui::{TerminalOptions, Viewport};

-
use tokio::sync::mpsc::{self};
-

use super::event::Event;
+
use super::Interrupted;

pub type Backend<S> = TermionBackendExt<S>;
-

pub type InlineTerminal = ratatui::Terminal<Backend<RawTerminal<io::Stdout>>>;
pub type FullscreenTerminal = ratatui::Terminal<Backend<AlternateScreen<RawTerminal<io::Stdout>>>>;

@@ -159,32 +167,57 @@ impl<W: Write> ratatui::backend::Backend for TermionBackendExt<W> {
    }
}

-
/// Spawn one thread that polls `stdin` for new user input and another thread
-
/// that polls UNIX signals, e.g. `SIGWINCH` when the terminal window size is
-
/// being changed.
-
pub fn events() -> mpsc::UnboundedReceiver<Event> {
-
    let (tx, rx) = mpsc::unbounded_channel();
-
    let events_tx = tx.clone();
-
    thread::spawn(move || {
-
        let start = Instant::now();
-
        let stdin = io::stdin();
-
        for key in stdin.keys().flatten() {
-
            // TODO(erikli): Remove this hack! Perhaps use `tokio::CancellationToken`?
-
            if start.elapsed().as_millis() > 200 && events_tx.send(Event::Key(key)).is_err() {
-
                return;
+
#[derive(Default)]
+
pub struct StdinReader {}
+

+
impl StdinReader {
+
    pub async fn run<P: Clone + Send + Sync + Debug>(
+
        self,
+
        event_tx: UnboundedSender<Event>,
+
        mut interrupt_rx: broadcast::Receiver<Interrupted<P>>,
+
    ) -> anyhow::Result<Interrupted<P>> {
+
        let token = CancellationToken::new();
+
        let token_clone = token.clone();
+

+
        let key_event_tx = event_tx.clone();
+
        let key_listener = tokio::spawn(async move {
+
            let mut stdin = async_stdin().keys();
+
            loop {
+
                if let Some(Ok(key)) = stdin.next() {
+
                    if key_event_tx.send(Event::Key(key)).is_err() {
+
                        return;
+
                    }
+
                }
+
                tokio::select! {
+
                    _ = token_clone.cancelled() => {
+
                        break;
+
                    }
+
                    _ = tokio::time::sleep(Duration::from_millis(50)) => {}
+
                }
            }
-
        }
-
    });
+
        });

-
    let events_tx = tx.clone();
-
    if let Ok(mut signals) = signal_hook::iterator::Signals::new([libc::SIGWINCH]) {
+
        let mut signals = Signals::new([libc::SIGWINCH])?;
+
        let signal_handle = signals.handle();
+
        let signal_event_tx = event_tx.clone();
        thread::spawn(move || {
            for _ in signals.forever() {
-
                if events_tx.send(Event::Resize).is_err() {
+
                if signal_event_tx.send(Event::Resize).is_err() {
                    return;
                }
            }
        });
+

+
        let result: anyhow::Result<Interrupted<P>> = tokio::select! {
+
            Ok(interrupted) = interrupt_rx.recv() => {
+
                token.cancel();
+
                Ok(interrupted)
+
            }
+
        };
+

+
        key_listener.await?;
+
        signal_handle.close();
+

+
        result
    }
-
    rx
}
modified src/ui/im.rs
@@ -19,7 +19,6 @@ use ratatui::{Frame, Viewport};

use crate::event::Event;
use crate::store::Update;
-
use crate::terminal;
use crate::terminal::Terminal;
use crate::ui::theme::Theme;
use crate::ui::{Column, ToRow};
@@ -46,6 +45,7 @@ impl Frontend {
        self,
        message_tx: broadcast::Sender<M>,
        mut state_rx: UnboundedReceiver<S>,
+
        mut event_rx: UnboundedReceiver<Event>,
        mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
        viewport: Viewport,
    ) -> anyhow::Result<Interrupted<R>>
@@ -55,9 +55,7 @@ impl Frontend {
        R: Clone + Send + Sync + Debug,
    {
        let mut ticker = tokio::time::interval(RENDERING_TICK_RATE);
-

        let mut terminal = Terminal::try_from(viewport)?;
-
        let mut events_rx = terminal::events();

        let mut state = state_rx.recv().await.unwrap();
        let mut ctx = Context::default().with_sender(message_tx);
@@ -67,8 +65,7 @@ impl Frontend {
                // Tick to terminate the select every N milliseconds
                _ = ticker.tick() => (),
                // Handle input events
-
                Some(event) = events_rx.recv() => {
-
                    log::info!("Received event: {event:?}");
+
                Some(event) = event_rx.recv() => {
                    match event {
                        Event::Key(key) => ctx.store_input(key),
                        Event::Resize => (),
modified src/ui/rm.rs
@@ -9,7 +9,6 @@ use tokio::sync::mpsc::UnboundedReceiver;

use crate::event::Event;
use crate::store::Update;
-
use crate::terminal;
use crate::terminal::Terminal;
use crate::ui::rm::widget::RenderProps;
use crate::ui::rm::widget::Widget;
@@ -44,6 +43,7 @@ impl Frontend {
        self,
        mut root: Widget<S, M>,
        mut state_rx: UnboundedReceiver<S>,
+
        mut events_rx: UnboundedReceiver<Event>,
        mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
        viewport: Viewport,
    ) -> anyhow::Result<Interrupted<R>>
@@ -53,10 +53,7 @@ impl Frontend {
        R: Clone + Send + Sync + Debug,
    {
        let mut ticker = tokio::time::interval(RENDERING_TICK_RATE);
-

        let mut terminal = Terminal::try_from(viewport)?;
-
        let mut events_rx = terminal::events();
-

        let mut root = {
            let state = state_rx.recv().await.unwrap();

@@ -72,7 +69,9 @@ impl Frontend {
                // Handle input events
                Some(event) = events_rx.recv() => match event {
                    Event::Key(key) => root.handle_event(key),
-
                    Event::Resize => (),
+
                    Event::Resize => {
+
                        log::info!("Resizing frontend...");
+
                    },
                },
                // Handle state updates
                Some(state) = state_rx.recv() => {