Radish alpha
r
Radicle terminal user interface
Radicle
Git (anonymous pull)
Log in to clone via SSH
lib: Introduce processors
Erik Kundt committed 5 months ago
commit a0b143e3a2e45e1368528ff1ceb597a5937cbea2
parent a3d9a4eed0bb156e8da95dd502b570cbad5fb3ab
9 files changed +320 -148
modified examples/basic_rmui.rs
@@ -1,13 +1,14 @@
use anyhow::Result;

-
use ratatui::Viewport;
use termion::event::Key;

use ratatui::layout::Constraint;
+
use ratatui::Viewport;

use radicle_tui as tui;

use tui::store;
+
use tui::task::EmptyProcessors;
use tui::ui::rm::widget::container::{Container, Header, HeaderProps};
use tui::ui::rm::widget::input::{TextView, TextViewProps, TextViewState};
use tui::ui::rm::widget::window::{Page, Shortcuts, ShortcutsProps, Window, WindowProps};
@@ -17,11 +18,11 @@ use tui::{BoxedAny, Channel, Exit};

const CONTENT: &str = r#"
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor
-
incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud 
-
exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure 
+
incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud
+
exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure
dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.

-
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt 
+
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt
mollit anim id est laborum.
"#;

@@ -108,7 +109,14 @@ pub async fn main() -> Result<()> {
        })
        .on_update(|_| WindowProps::default().current_page(0).to_boxed_any().into());

-
    tui::rm(app, window, Viewport::default(), channel).await?;
+
    tui::rm(
+
        app,
+
        window,
+
        Viewport::default(),
+
        channel,
+
        EmptyProcessors::new(),
+
    )
+
    .await?;

    Ok(())
}
modified examples/hello.rs
@@ -8,6 +8,7 @@ use ratatui::{Frame, Viewport};
use radicle_tui as tui;

use tui::store;
+
use tui::task::EmptyProcessors;
use tui::ui::im::widget::Window;
use tui::ui::im::Show;
use tui::ui::im::{Borders, Context};
@@ -73,7 +74,13 @@ pub async fn main() -> Result<()> {
        alien: ALIEN.to_string(),
    };

-
    tui::im(app, Viewport::default(), Channel::default()).await?;
+
    tui::im(
+
        app,
+
        Viewport::default(),
+
        Channel::default(),
+
        EmptyProcessors::new(),
+
    )
+
    .await?;

    Ok(())
}
modified examples/hello_rrmui.rs
@@ -9,22 +9,23 @@ use ratatui::text::Text;
use radicle_tui as tui;

use tui::store;
+
use tui::task::EmptyProcessors;
use tui::ui::rm::widget::input::{TextArea, TextAreaProps};
use tui::ui::rm::widget::ToWidget;
use tui::{BoxedAny, Channel, Exit};

const ALIEN: &str = r#"
-
     ///             ///    ,---------------------------------. 
+
     ///             ///    ,---------------------------------.
     ///             ///    | Hey there, press (q) to quit... |
-
        //         //       '---------------------------------'  
-
        //,,,///,,,//      .. 
-
     ///////////////////  .  
-
  //////@@@@@//////@@@@@///  
-
  //////@@###//////@@###///  
+
        //         //       '---------------------------------'
+
        //,,,///,,,//      ..
+
     ///////////////////  .
+
  //////@@@@@//////@@@@@///
+
  //////@@###//////@@###///
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
-
     ,,,  ///   ///  ,,,     
-
     ,,,  ///   ///  ,,,     
-
          ///   ///          
+
     ,,,  ///   ///  ,,,
+
     ,,,  ///   ///  ,,,
+
          ///   ///
        /////   /////
"#;

@@ -70,7 +71,14 @@ pub async fn main() -> Result<()> {
                .into()
        });

-
    tui::rm(app, scene, Viewport::default(), channel).await?;
+
    tui::rm(
+
        app,
+
        scene,
+
        Viewport::default(),
+
        channel,
+
        EmptyProcessors::new(),
+
    )
+
    .await?;

    Ok(())
}
modified examples/selection.rs
@@ -12,6 +12,7 @@ use ratatui::{Frame, Viewport};

use radicle_tui as tui;

+
use tui::task::EmptyProcessors;
use tui::ui::im::widget::Window;
use tui::ui::im::{Borders, Context};
use tui::ui::{Column, ToRow};
@@ -144,7 +145,14 @@ pub async fn main() -> Result<()> {
        selector: TableState::new(Some(0)),
    };

-
    if let Some(exit) = tui::im(app, Viewport::Inline(12), Channel::default()).await? {
+
    if let Some(exit) = tui::im(
+
        app,
+
        Viewport::Inline(12),
+
        Channel::default(),
+
        EmptyProcessors::new(),
+
    )
+
    .await?
+
    {
        println!("{exit}");
    } else {
        anyhow::bail!("No selection");
modified src/lib.rs
@@ -7,19 +7,25 @@ pub mod ui;
use std::any::Any;
use std::fmt::Debug;

-
use ratatui::Viewport;
-
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
+
use anyhow::Result;
+

+
#[cfg(unix)]
+
use tokio::signal::unix::signal;
+

+
use tokio::sync::broadcast;
+
use tokio::sync::mpsc::unbounded_channel;

use serde::ser::{Serialize, SerializeStruct, Serializer};

-
use anyhow::Result;
+
use ratatui::Viewport;

use store::Update;
-
use task::Interrupted;
use ui::im;
use ui::im::Show;
use ui::rm;

+
use crate::task::Process;
+

/// An optional return value.
#[derive(Clone, Debug)]
pub struct Exit<T> {
@@ -76,6 +82,14 @@ where
    }
}

+
/// Implementors of `Share` can be used inside the multi-threaded
+
/// application environment.
+
pub trait Share: Clone + Debug + Send + Sync + 'static {}
+

+
/// Blanket implementation for all types that implement the required
+
/// traits.
+
impl<T: Clone + Debug + Send + Sync + 'static> Share for T {}
+

/// Provide implementations for conversions to and from `Box<dyn Any>`.
pub trait BoxedAny {
    fn from_boxed_any(any: Box<dyn Any>) -> Option<Self>
@@ -136,41 +150,75 @@ impl<T> PageStack<T> {
    }
}

-
/// A multi-producer, single-consumer message channel.
+
/// A multi-producer, multi-consumer message channel.
pub struct Channel<M> {
-
    pub tx: UnboundedSender<M>,
-
    pub rx: UnboundedReceiver<M>,
+
    pub tx: broadcast::Sender<M>,
+
    pub rx: broadcast::Receiver<M>,
}

-
impl<A> Default for Channel<A> {
+
impl<M: Clone> Default for Channel<M> {
    fn default() -> Self {
-
        let (tx, rx) = mpsc::unbounded_channel();
-
        Self { tx: tx.clone(), rx }
+
        let (tx, rx) = broadcast::channel(1000);
+
        Self { tx, rx }
    }
}

/// Initialize a `Store` with the `State` given and a `Frontend` with the `Widget` given,
-
/// and run their main loops concurrently. Connect them to the `Channel` and also to
+
/// and run their main loops in parallel. Connect them to the `Channel` and also to
/// an interrupt broadcast channel also initialized in this function.
-
pub async fn rm<S, M, P>(
+
/// Additionally, a list of processors can be passed. Processors will also receive all
+
/// applications messages and can emit new ones. They will be executed by an internal worker.
+
pub async fn rm<S, T, M, R>(
    state: S,
    root: rm::widget::Widget<S, M>,
    viewport: Viewport,
    channel: Channel<M>,
-
) -> Result<Option<P>>
+
    processors: Vec<T>,
+
) -> Result<Option<R>>
where
-
    S: Update<M, Return = P> + Clone + Debug + Send + Sync + 'static,
-
    M: Debug + Send + Sync + 'static,
-
    P: Clone + Debug + Send + Sync + 'static,
+
    S: Update<M, Return = R> + Share,
+
    T: Process<M> + Share,
+
    M: Share,
+
    R: Share,
{
-
    let (terminator, mut interrupt_rx) = task::create_termination();
+
    let (terminator, mut interrupt_rx) = create_termination();
+
    let (state_tx, state_rx) = unbounded_channel();
+
    let (work_tx, work_rx) = unbounded_channel();

-
    let (store, state_rx) = store::Store::<S, M, P>::new();
+
    let store = store::Store::<S, M, R>::new(state_tx.clone());
+
    let worker = task::Worker::<T, M, R>::new(work_tx.clone());
    let frontend = rm::Frontend::default();

-
    tokio::try_join!(
-
        store.run(state, terminator, channel.rx, interrupt_rx.resubscribe()),
-
        frontend.run(root, state_rx, interrupt_rx.resubscribe(), viewport),
+
    let worker_interrupt_rx = interrupt_rx.resubscribe();
+
    let store_interrupt_rx = interrupt_rx.resubscribe();
+
    let frontend_interrupt_rx = interrupt_rx.resubscribe();
+

+
    let worker_message_rx = channel.rx.resubscribe();
+
    let store_message_rx = channel.rx.resubscribe();
+

+
    // TODO(erikli): Handle errors properly
+
    let _ = tokio::try_join!(
+
        tokio::spawn(async move {
+
            worker
+
                .run(processors, worker_message_rx, worker_interrupt_rx)
+
                .await
+
        }),
+
        tokio::spawn(async move {
+
            store
+
                .run(
+
                    state,
+
                    terminator,
+
                    store_message_rx,
+
                    work_rx,
+
                    store_interrupt_rx,
+
                )
+
                .await
+
        }),
+
        tokio::spawn(async move {
+
            frontend
+
                .run(root, state_rx, frontend_interrupt_rx, viewport)
+
                .await
+
        }),
    )?;

    if let Ok(reason) = interrupt_rx.recv().await {
@@ -186,21 +234,42 @@ where
/// Initialize a `Store` with the `State` given and a `Frontend` with the `App` given,
/// and run their main loops concurrently. Connect them to the `Channel` and also to
/// an interrupt broadcast channel also initialized in this function.
-
pub async fn im<S, M, P>(state: S, viewport: Viewport, channel: Channel<M>) -> Result<Option<P>>
+
/// Additionally, a list of processors can be passed. Processors will also receive all
+
/// applications messages and can emit new ones. They will be executed by an internal worker.
+
pub async fn im<S, T, M, R>(
+
    state: S,
+
    viewport: Viewport,
+
    channel: Channel<M>,
+
    processors: Vec<T>,
+
) -> Result<Option<R>>
where
-
    S: Update<M, Return = P> + Show<M> + Clone + Send + Sync + 'static,
-
    M: Clone + Debug + Send + Sync + 'static,
-
    P: Clone + Debug + Send + Sync + 'static,
+
    S: Update<M, Return = R> + Show<M> + Share,
+
    T: Process<M> + Share,
+
    M: Share,
+
    R: Share,
{
-
    let (terminator, mut interrupt_rx) = task::create_termination();
+
    let (terminator, mut interrupt_rx) = create_termination();
+
    let (state_tx, state_rx) = unbounded_channel();
+
    let (work_tx, work_rx) = unbounded_channel();

-
    let state_tx = channel.tx.clone();
-
    let (store, state_rx) = store::Store::<S, M, P>::new();
+
    let store = store::Store::<S, M, R>::new(state_tx.clone());
+
    let worker = task::Worker::<T, M, R>::new(work_tx.clone());
    let frontend = im::Frontend::default();

    tokio::try_join!(
-
        store.run(state, terminator, channel.rx, interrupt_rx.resubscribe()),
-
        frontend.run(state_tx, state_rx, interrupt_rx.resubscribe(), viewport),
+
        worker.run(
+
            processors,
+
            channel.rx.resubscribe(),
+
            interrupt_rx.resubscribe()
+
        ),
+
        store.run(
+
            state,
+
            terminator,
+
            channel.rx.resubscribe(),
+
            work_rx,
+
            interrupt_rx.resubscribe()
+
        ),
+
        frontend.run(channel.tx, state_rx, interrupt_rx.resubscribe(), viewport),
    )?;

    if let Ok(reason) = interrupt_rx.recv().await {
@@ -212,3 +281,70 @@ where
        anyhow::bail!("exited because of an unexpected error");
    }
}
+

+
/// An `Interrupt` message that is produced by either an OS signal (e.g. kill)
+
/// or the user by requesting the application to close.
+
#[derive(Debug, Clone)]
+
pub enum Interrupted<P>
+
where
+
    P: Clone + Send + Sync + Debug,
+
{
+
    OsSignal,
+
    User { payload: Option<P> },
+
}
+

+
/// The `Terminator` wraps a broadcast channel and can send an interrupt messages.
+
#[derive(Debug, Clone)]
+
pub struct Terminator<P>
+
where
+
    P: Clone + Send + Sync + Debug,
+
{
+
    interrupt_tx: broadcast::Sender<Interrupted<P>>,
+
}
+

+
impl<P> Terminator<P>
+
where
+
    P: Clone + Send + Sync + Debug + 'static,
+
{
+
    /// Create a `Terminator` that stores the sending end of a broadcast channel.
+
    pub fn new(interrupt_tx: broadcast::Sender<Interrupted<P>>) -> Self {
+
        Self { interrupt_tx }
+
    }
+

+
    /// Send interrupt message to the broadcast channel.
+
    pub fn terminate(&mut self, interrupted: Interrupted<P>) -> anyhow::Result<()> {
+
        self.interrupt_tx.send(interrupted)?;
+

+
        Ok(())
+
    }
+
}
+

+
/// Receive `SIGINT` and call terminator which sends the interrupt message to its broadcast channel.
+
#[cfg(unix)]
+
async fn terminate_by_unix_signal<P>(mut terminator: Terminator<P>)
+
where
+
    P: Clone + Send + Sync + Debug + 'static,
+
{
+
    let mut interrupt_signal = signal(tokio::signal::unix::SignalKind::interrupt())
+
        .expect("failed to create interrupt signal stream");
+

+
    interrupt_signal.recv().await;
+

+
    terminator
+
        .terminate(Interrupted::OsSignal)
+
        .expect("failed to send interrupt signal");
+
}
+

+
/// Create a broadcast channel and spawn a task for retrieving the applications' kill signal.
+
pub fn create_termination<P>() -> (Terminator<P>, broadcast::Receiver<Interrupted<P>>)
+
where
+
    P: Clone + Send + Sync + Debug + 'static,
+
{
+
    let (tx, rx) = broadcast::channel(1);
+
    let terminator = Terminator::new(tx);
+

+
    #[cfg(unix)]
+
    tokio::spawn(terminate_by_unix_signal(terminator.clone()));
+

+
    (terminator, rx)
+
}
modified src/store.rs
@@ -1,13 +1,12 @@
-
use std::fmt::Debug;
use std::marker::PhantomData;
use std::time::Duration;

-
use tokio::sync::broadcast;
-
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
+
use tokio::sync::{
+
    broadcast,
+
    mpsc::{UnboundedReceiver, UnboundedSender},
+
};

-
use crate::Exit;
-

-
use super::task::{Interrupted, Terminator};
+
use super::{Exit, Interrupted, Share, Terminator};

const STORE_TICK_RATE: Duration = Duration::from_millis(1000);

@@ -26,37 +25,32 @@ pub trait Update<M> {

/// The `Store` updates the applications' state concurrently. It handles
/// messages coming from the frontend and updates the state accordingly.
-
pub struct Store<S, M, P>
+
pub struct Store<S, M, R>
where
-
    S: Update<M, Return = P> + Clone + Send + Sync,
-
    P: Clone + Debug + Send + Sync,
+
    S: Update<M, Return = R> + Share,
{
    state_tx: UnboundedSender<S>,
-
    _phantom: PhantomData<(M, P)>,
+
    _phantom: PhantomData<(M, R)>,
}

-
impl<S, M, P> Store<S, M, P>
+
impl<S, M, R> Store<S, M, R>
where
-
    S: Update<M, Return = P> + Clone + Send + Sync,
-
    P: Clone + Debug + Send + Sync,
+
    S: Update<M, Return = R> + Share,
+
    R: Share,
{
-
    pub fn new() -> (Self, UnboundedReceiver<S>) {
-
        let (state_tx, state_rx) = mpsc::unbounded_channel::<S>();
-

-
        (
-
            Store {
-
                state_tx,
-
                _phantom: PhantomData,
-
            },
-
            state_rx,
-
        )
+
    pub fn new(tx: UnboundedSender<S>) -> Self {
+
        Self {
+
            state_tx: tx,
+
            _phantom: PhantomData,
+
        }
    }
}

-
impl<S, M, P> Store<S, M, P>
+
impl<S, M, R> Store<S, M, R>
where
-
    S: Update<M, Return = P> + Clone + Send + Sync + 'static,
-
    P: Clone + Debug + Send + Sync + 'static,
+
    S: Update<M, Return = R> + Share,
+
    M: Share,
+
    R: Share,
{
    /// By calling `main_loop`, the store will wait for new messages coming
    /// from the frontend and update the applications' state accordingly. It will
@@ -65,10 +59,11 @@ where
    pub async fn run(
        self,
        mut state: S,
-
        mut terminator: Terminator<P>,
-
        mut message_rx: UnboundedReceiver<M>,
-
        mut interrupt_rx: broadcast::Receiver<Interrupted<P>>,
-
    ) -> anyhow::Result<Interrupted<P>> {
+
        mut terminator: Terminator<R>,
+
        mut message_rx: broadcast::Receiver<M>,
+
        mut work_rx: UnboundedReceiver<M>,
+
        mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
+
    ) -> anyhow::Result<Interrupted<R>> {
        // Send the initial state once
        self.state_tx.send(state.clone())?;

@@ -78,13 +73,18 @@ where
            tokio::select! {
                // Handle the messages coming from the frontend
                // and process them to do async operations
-
                Some(message) = message_rx.recv() => {
+
                Ok(message) = message_rx.recv() => {
                    if let Some(exit) = state.update(message) {
                        let interrupted = Interrupted::User { payload: exit.value };
                        let _ = terminator.terminate(interrupted.clone());

                        break interrupted;
                    }
+
                    self.state_tx.send(state.clone())?;
+
                },
+
                Some(message) = work_rx.recv() => {
+
                    state.update(message);
+
                    self.state_tx.send(state.clone())?;
                },
                // Tick to terminate the select every N milliseconds
                _ = ticker.tick() => {
@@ -95,8 +95,6 @@ where
                    break interrupted;
                }
            }
-

-
            self.state_tx.send(state.clone())?;
        };

        Ok(result)
modified src/task.rs
@@ -1,72 +1,79 @@
-
use std::fmt::Debug;
+
use std::marker::PhantomData;
+
use std::{fmt::Debug, future::Future};

-
#[cfg(unix)]
-
use tokio::signal::unix::signal;
-
use tokio::sync::broadcast;
+
use tokio::sync::{broadcast, mpsc::UnboundedSender};

-
/// An `Interrupt` message that is produced by either an OS signal (e.g. kill)
-
/// or the user by requesting the application to close.
-
#[derive(Debug, Clone)]
-
pub enum Interrupted<P>
-
where
-
    P: Clone + Send + Sync + Debug,
-
{
-
    OsSignal,
-
    User { payload: Option<P> },
-
}
+
use super::{Interrupted, Share};

-
/// The `Terminator` wraps a broadcast channel and can send an interrupt messages.
-
#[derive(Debug, Clone)]
-
pub struct Terminator<P>
-
where
-
    P: Clone + Send + Sync + Debug,
-
{
-
    interrupt_tx: broadcast::Sender<Interrupted<P>>,
-
}
+
pub type EmptyProcessors = Vec<EmptyProcessor>;

-
impl<P> Terminator<P>
-
where
-
    P: Clone + Send + Sync + Debug + 'static,
-
{
-
    /// Create a `Terminator` that stores the sending end of a broadcast channel.
-
    pub fn new(interrupt_tx: broadcast::Sender<Interrupted<P>>) -> Self {
-
        Self { interrupt_tx }
-
    }
+
/// A task that can be run.
+
pub trait Task: Debug + Send + Sync + 'static {
+
    type Return;

-
    /// Send interrupt message to the broadcast channel.
-
    pub fn terminate(&mut self, interrupted: Interrupted<P>) -> anyhow::Result<()> {
-
        self.interrupt_tx.send(interrupted)?;
+
    fn run(&self) -> anyhow::Result<Vec<Self::Return>>;
+
}

-
        Ok(())
-
    }
+
/// A processor that can be added to the application environment.
+
/// Processors will receive application messages and can produce new ones.
+
pub trait Process<M: Share> {
+
    fn process(&mut self, _message: M) -> impl Future<Output = anyhow::Result<Vec<M>>> + Send;
}

-
/// Receive `SIGINT` and call terminator which sends the interrupt message to its broadcast channel.
-
#[cfg(unix)]
-
async fn terminate_by_unix_signal<P>(mut terminator: Terminator<P>)
-
where
-
    P: Clone + Send + Sync + Debug + 'static,
-
{
-
    let mut interrupt_signal = signal(tokio::signal::unix::SignalKind::interrupt())
-
        .expect("failed to create interrupt signal stream");
+
/// An empty processor that does nothing.
+
#[derive(Debug, Clone)]
+
pub struct EmptyProcessor;

-
    interrupt_signal.recv().await;
+
impl<M: Share> Process<M> for EmptyProcessor {
+
    async fn process(&mut self, _message: M) -> anyhow::Result<Vec<M>> {
+
        Ok(vec![])
+
    }
+
}

-
    terminator
-
        .terminate(Interrupted::OsSignal)
-
        .expect("failed to send interrupt signal");
+
/// A worker that is spawned by the application. Invokes
+
/// all processors and sends received application messages.
+
pub struct Worker<P, M, R> {
+
    work_tx: UnboundedSender<M>,
+
    _phantom: PhantomData<(P, M, R)>,
}

-
/// Create a broadcast channel and spawn a task for retrieving the applications' kill signal.
-
pub fn create_termination<P>() -> (Terminator<P>, broadcast::Receiver<Interrupted<P>>)
+
impl<P, M, R> Worker<P, M, R>
where
-
    P: Clone + Send + Sync + Debug + 'static,
+
    P: Process<M> + Share,
+
    M: Share,
+
    R: Share,
{
-
    let (tx, rx) = broadcast::channel(1);
-
    let terminator = Terminator::new(tx);
+
    pub fn new(tx: UnboundedSender<M>) -> Self {
+
        Self {
+
            work_tx: tx,
+
            _phantom: PhantomData,
+
        }
+
    }

-
    #[cfg(unix)]
-
    tokio::spawn(terminate_by_unix_signal(terminator.clone()));
+
    pub async fn run(
+
        &self,
+
        processors: Vec<P>,
+
        mut message_rx: broadcast::Receiver<M>,
+
        mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
+
    ) -> anyhow::Result<Interrupted<R>> {
+
        let result = loop {
+
            tokio::select! {
+
                Ok(message) = message_rx.recv() => {
+
                    for mut p in processors.clone() {
+
                        for m in p.process(message.clone()).await? {
+
                            if let Err(err) = self.work_tx.send(m) {
+
                                log::error!(target: "worker", "Unable to send message: {err}")
+
                            }
+
                        }
+
                    }
+
                },
+
                // Catch and handle interrupt signal to gracefully shutdown
+
                Ok(interrupted) = interrupt_rx.recv() => {
+
                    break interrupted;
+
                }
+
            }
+
        };

-
    (terminator, rx)
+
        Ok(result)
+
    }
}
modified src/ui/im.rs
@@ -10,7 +10,7 @@ use anyhow::Result;
use ratatui::style::Stylize;
use ratatui::text::{Span, Text};
use tokio::sync::broadcast;
-
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
+
use tokio::sync::mpsc::UnboundedReceiver;

use termion::event::Key;

@@ -19,11 +19,11 @@ use ratatui::{Frame, Viewport};

use crate::event::Event;
use crate::store::Update;
-
use crate::task::Interrupted;
use crate::terminal;
use crate::terminal::Terminal;
use crate::ui::theme::Theme;
use crate::ui::{Column, ToRow};
+
use crate::Interrupted;

use crate::ui::im::widget::{HeaderedTable, Widget};

@@ -42,17 +42,17 @@ pub trait Show<M> {
pub struct Frontend {}

impl Frontend {
-
    pub async fn run<S, M, P>(
+
    pub async fn run<S, M, R>(
        self,
-
        state_tx: UnboundedSender<M>,
+
        message_tx: broadcast::Sender<M>,
        mut state_rx: UnboundedReceiver<S>,
-
        mut interrupt_rx: broadcast::Receiver<Interrupted<P>>,
+
        mut interrupt_rx: broadcast::Receiver<Interrupted<R>>,
        viewport: Viewport,
-
    ) -> anyhow::Result<Interrupted<P>>
+
    ) -> anyhow::Result<Interrupted<R>>
    where
-
        S: Update<M, Return = P> + Show<M>,
+
        S: Update<M, Return = R> + Show<M>,
        M: Clone,
-
        P: Clone + Send + Sync + Debug,
+
        R: Clone + Send + Sync + Debug,
    {
        let mut ticker = tokio::time::interval(RENDERING_TICK_RATE);

@@ -60,9 +60,9 @@ impl Frontend {
        let mut events_rx = terminal::events();

        let mut state = state_rx.recv().await.unwrap();
-
        let mut ctx = Context::default().with_sender(state_tx);
+
        let mut ctx = Context::default().with_sender(message_tx);

-
        let result: anyhow::Result<Interrupted<P>> = loop {
+
        let result: anyhow::Result<Interrupted<R>> = loop {
            tokio::select! {
                // Tick to terminate the select every N milliseconds
                _ = ticker.tick() => (),
@@ -131,7 +131,7 @@ pub struct Context<M> {
    /// Current frame of the application.
    pub(crate) frame_size: Rect,
    /// The message sender used by the `Ui` to send application messages.
-
    pub(crate) sender: Option<UnboundedSender<M>>,
+
    pub(crate) sender: Option<broadcast::Sender<M>>,
}

impl<M> Default for Context<M> {
@@ -162,7 +162,7 @@ impl<M> Context<M> {
        self
    }

-
    pub fn with_sender(mut self, sender: UnboundedSender<M>) -> Self {
+
    pub fn with_sender(mut self, sender: broadcast::Sender<M>) -> Self {
        self.sender = Some(sender);
        self
    }
modified src/ui/rm.rs
@@ -9,11 +9,11 @@ use tokio::sync::mpsc::UnboundedReceiver;

use crate::event::Event;
use crate::store::Update;
-
use crate::task::Interrupted;
use crate::terminal;
use crate::terminal::Terminal;
use crate::ui::rm::widget::RenderProps;
use crate::ui::rm::widget::Widget;
+
use crate::Interrupted;

const RENDERING_TICK_RATE: Duration = Duration::from_millis(250);