Radish alpha
r
rad:z39mP9rQAaGmERfUMPULfPUi473tY
Radicle terminal user interface
Radicle
Git
radicle-tui src lib.rs
pub mod event;
pub mod store;
pub mod task;
pub mod terminal;
pub mod ui;

use std::fmt::Debug;

use anyhow::Result;

use serde::ser::{Serialize, SerializeStruct, Serializer};

#[cfg(unix)]
use tokio::signal::unix::signal;
use tokio::sync::broadcast;
use tokio::sync::mpsc::unbounded_channel;

use ratatui::Viewport;

use store::Update;
use terminal::StdinReader;
use ui::{Frontend, Show};

use crate::task::Process;

/// An optional return value.
#[derive(Clone, Debug)]
pub struct Exit<T> {
    pub value: Option<T>,
}

/// The output that is returned by all selection interfaces.
#[derive(Clone, Default, Debug, Eq, PartialEq)]
pub struct Selection<O>
where
    O: Serialize,
{
    pub operation: Option<O>,
    pub args: Vec<String>,
}

impl<O> Selection<O>
where
    O: Serialize,
{
    pub fn with_operation(mut self, operation: O) -> Self {
        self.operation = Some(operation);
        self
    }

    pub fn with_args(mut self, arg: String) -> Self {
        self.args.push(arg);
        self
    }
}

impl<O> Serialize for Selection<O>
where
    O: Serialize,
{
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        let mut state = serializer.serialize_struct("", 3)?;
        state.serialize_field("operation", &self.operation)?;
        state.serialize_field("args", &self.args)?;
        state.end()
    }
}

/// Implementors of `Share` can be used inside the multi-threaded
/// application environment.
pub trait Share: Clone + Debug + Send + Sync + 'static {}

/// Blanket implementation for all types that implement the required
/// traits.
impl<T: Clone + Debug + Send + Sync + 'static> Share for T {}

/// A multi-producer, multi-consumer message channel.
pub struct Channel<M> {
    pub tx: broadcast::Sender<M>,
    pub rx: broadcast::Receiver<M>,
}

impl<M: Clone> Default for Channel<M> {
    fn default() -> Self {
        let (tx, rx) = broadcast::channel(1000);
        Self { tx, rx }
    }
}

/// Initialize a `Store` with the `State` given and a `Frontend` with the `App` given,
/// and run their main loops concurrently. Connect them to the `Channel` and also to
/// an interrupt broadcast channel also initialized in this function.
/// Additionally, a list of processors can be passed. Processors will also receive all
/// applications messages and can emit new ones. They will be executed by an internal worker.
pub async fn im<S, T, M, R>(
    state: S,
    viewport: Viewport,
    channel: Channel<M>,
    processors: Vec<T>,
) -> Result<Option<R>>
where
    S: Update<M, Return = R> + Show<M> + Share,
    T: Process<M> + Share,
    M: Share,
    R: Share,
{
    let (terminator, mut interrupt_rx) = create_termination();
    let (state_tx, state_rx) = unbounded_channel();
    let (event_tx, event_rx) = unbounded_channel();
    let (work_tx, work_rx) = unbounded_channel();

    let store = store::Store::<S, M, R>::new(state_tx.clone());
    let worker = task::Worker::<T, M, R>::new(work_tx.clone());
    let frontend = Frontend::default();
    let stdin_reader = StdinReader::default();

    // TODO(erikli): Handle errors
    let _ = tokio::try_join!(
        worker.run(
            processors,
            channel.rx.resubscribe(),
            interrupt_rx.resubscribe()
        ),
        store.run(
            state,
            terminator,
            channel.rx.resubscribe(),
            work_rx,
            interrupt_rx.resubscribe(),
        ),
        frontend.run(
            channel.tx,
            state_rx,
            event_rx,
            interrupt_rx.resubscribe(),
            viewport
        ),
        stdin_reader.run(event_tx, interrupt_rx.resubscribe()),
    )?;

    if let Ok(reason) = interrupt_rx.recv().await {
        match reason {
            Interrupted::User { payload } => Ok(payload),
            Interrupted::OsSignal => anyhow::bail!("exited because of an os sig int"),
        }
    } else {
        anyhow::bail!("exited because of an unexpected error");
    }
}

/// An `Interrupt` message that is produced by either an OS signal (e.g. kill)
/// or the user by requesting the application to close.
#[derive(Debug, Clone)]
pub enum Interrupted<P>
where
    P: Share,
{
    OsSignal,
    User { payload: Option<P> },
}

/// The `Terminator` wraps a broadcast channel and can send an interrupt messages.
#[derive(Debug, Clone)]
pub struct Terminator<P>
where
    P: Share,
{
    interrupt_tx: broadcast::Sender<Interrupted<P>>,
}

impl<P> Terminator<P>
where
    P: Share,
{
    /// Create a `Terminator` that stores the sending end of a broadcast channel.
    pub fn new(interrupt_tx: broadcast::Sender<Interrupted<P>>) -> Self {
        Self { interrupt_tx }
    }

    /// Send interrupt message to the broadcast channel.
    pub fn terminate(&mut self, interrupted: Interrupted<P>) -> anyhow::Result<()> {
        self.interrupt_tx.send(interrupted)?;

        Ok(())
    }
}

/// Receive `SIGINT` and call terminator which sends the interrupt message to its broadcast channel.
#[cfg(unix)]
async fn terminate_by_unix_signal<P>(mut terminator: Terminator<P>)
where
    P: Share,
{
    let mut interrupt_signal = signal(tokio::signal::unix::SignalKind::interrupt())
        .expect("failed to create interrupt signal stream");

    interrupt_signal.recv().await;

    terminator
        .terminate(Interrupted::OsSignal)
        .expect("failed to send interrupt signal");
}

/// Create a broadcast channel and spawn a task for retrieving the applications' kill signal.
pub fn create_termination<P>() -> (Terminator<P>, broadcast::Receiver<Interrupted<P>>)
where
    P: Share,
{
    let (tx, rx) = broadcast::channel(1);
    let terminator = Terminator::new(tx);

    #[cfg(unix)]
    tokio::spawn(terminate_by_unix_signal(terminator.clone()));

    (terminator, rx)
}