Radish alpha
h
Radicle Heartwood Protocol & Stack
Radicle
Git (anonymous pull)
Log in to clone via SSH
node: Improve logging output
Alexis Sellier committed 3 years ago
commit 026dfbde5d8f48394fa7648e8b3a3b361f37dd69
parent 5e955d39d032920758bf921614c867fed746486c
5 files changed +73 -58
modified radicle-node/src/client.rs
@@ -96,16 +96,16 @@ impl<G: Signer + EcSign> Runtime<G> {
        let routing_db = node_dir.join(ROUTING_DB_FILE);
        let tracking_db = node_dir.join(TRACKING_DB_FILE);

-
        log::info!("Opening address book {}..", address_db.display());
+
        log::info!(target: "node", "Opening address book {}..", address_db.display());
        let addresses = address::Book::open(address_db)?;

-
        log::info!("Opening routing table {}..", routing_db.display());
+
        log::info!(target: "node", "Opening routing table {}..", routing_db.display());
        let routing = routing::Table::open(routing_db)?;

-
        log::info!("Opening tracking policy table {}..", tracking_db.display());
+
        log::info!(target: "node", "Opening tracking policy table {}..", tracking_db.display());
        let tracking = tracking::Config::open(tracking_db)?;

-
        log::info!("Initializing service ({:?})..", network);
+
        log::info!(target: "node", "Initializing service ({:?})..", network);
        let service = service::Service::new(
            config,
            clock,
@@ -133,12 +133,12 @@ impl<G: Signer + EcSign> Runtime<G> {
            local_addrs.push(local_addr);
            wire.listen(listener);

-
            log::info!("Listening on {local_addr}..");
+
            log::info!(target: "node", "Listening on {local_addr}..");
        }
        let reactor = Reactor::named(wire, popol::Poller::new(), id.to_human())?;
        let handle = Handle::new(home.clone(), reactor.controller());

-
        log::info!("Binding control socket {}..", node_sock.display());
+
        log::info!(target: "node", "Binding control socket {}..", node_sock.display());

        let listener = match UnixListener::bind(&node_sock) {
            Ok(sock) => sock,
@@ -175,7 +175,7 @@ impl<G: Signer + EcSign> Runtime<G> {
    }

    pub fn run(self) -> Result<(), Error> {
-
        log::info!("Running node {}..", self.id);
+
        log::info!(target: "node", "Running node {}..", self.id);

        self.pool.run().unwrap();
        self.reactor.join().unwrap();
@@ -183,7 +183,7 @@ impl<G: Signer + EcSign> Runtime<G> {

        fs::remove_file(self.home.socket()).ok();

-
        log::debug!("Node shutdown completed for {}", self.id);
+
        log::debug!(target: "node", "Node shutdown completed for {}", self.id);

        Ok(())
    }
modified radicle-node/src/control.rs
@@ -27,14 +27,16 @@ pub fn listen<H: Handle<Error = client::handle::Error>>(
    listener: UnixListener,
    mut handle: H,
) -> Result<(), Error> {
+
    log::debug!(target: "control", "Control thread listening on socket..");
+

    for incoming in listener.incoming() {
        match incoming {
            Ok(mut stream) => {
                if let Err(e) = drain(&stream, &mut handle) {
-
                    log::debug!("Received {} on control socket", e);
+
                    log::debug!(target: "control", "Received {} on control socket", e);

                    if let DrainError::Shutdown = e {
-
                        log::debug!("Shutdown requested..");
+
                        log::debug!(target: "control", "Shutdown requested..");
                        // Channel might already be disconnected if shutdown
                        // came from somewhere else. Ignore errors.
                        handle.shutdown().ok();
@@ -46,10 +48,10 @@ pub fn listen<H: Handle<Error = client::handle::Error>>(
                    stream.shutdown(net::Shutdown::Both).ok();
                }
            }
-
            Err(e) => log::error!("Failed to accept incoming connection: {}", e),
+
            Err(e) => log::error!(target: "control", "Failed to accept incoming connection: {}", e),
        }
    }
-
    log::debug!("Exiting control loop..");
+
    log::debug!(target: "control", "Exiting control loop..");

    Ok(())
}
modified radicle-node/src/logger.rs
@@ -16,16 +16,28 @@ impl Log for Logger {

    fn log(&self, record: &Record) {
        if self.enabled(record.metadata()) {
-
            let module = record.module_path().unwrap_or_default();
+
            let target = record.target();

            if record.level() == Level::Error {
-
                write(record, module, io::stderr());
+
                write(record, target, io::stderr());
            } else {
-
                write(record, module, io::stdout());
+
                write(record, target, io::stdout());
            }

-
            fn write(record: &log::Record, module: &str, mut stream: impl io::Write) {
-
                let message = format!("{} {} {}", record.level(), module.bold(), record.args());
+
            fn write(record: &log::Record, target: &str, mut stream: impl io::Write) {
+
                let message = format!(
+
                    "{:<5} {:<8} {}",
+
                    record.level(),
+
                    target.cyan(),
+
                    record.args()
+
                );
+

+
                let message = format!(
+
                    "{} {}",
+
                    Local::now().to_rfc3339_opts(SecondsFormat::Millis, true),
+
                    message,
+
                );
+

                let message = match record.level() {
                    Level::Error => message.red(),
                    Level::Warn => message.yellow(),
@@ -33,16 +45,7 @@ impl Log for Logger {
                    Level::Debug => message.dimmed(),
                    Level::Trace => message.white().dimmed(),
                };
-

-
                writeln!(
-
                    stream,
-
                    "{} {}",
-
                    Local::now()
-
                        .to_rfc3339_opts(SecondsFormat::Millis, true)
-
                        .white(),
-
                    message,
-
                )
-
                .expect("write shouldn't fail");
+
                writeln!(stream, "{}", message).expect("write shouldn't fail");
            }
        }
    }
modified radicle-node/src/main.rs
@@ -73,7 +73,7 @@ impl Options {
    }
}

-
fn main() -> anyhow::Result<()> {
+
fn execute() -> anyhow::Result<()> {
    logger::init(log::Level::Debug)?;

    let options = Options::from_env()?;
@@ -96,3 +96,10 @@ fn main() -> anyhow::Result<()> {

    Ok(())
}
+

+
fn main() {
+
    if let Err(err) = execute() {
+
        log::error!(target: "node", "Fatal: {}", err);
+
        process::exit(1);
+
    }
+
}
modified radicle-node/src/service.rs
@@ -326,7 +326,7 @@ where
    }

    pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
-
        debug!("Init @{}", time.as_secs());
+
        debug!(target: "service", "Init @{}", time.as_secs());

        self.start_time = time;

@@ -354,7 +354,7 @@ where
        trace!("Wake +{}", now - self.start_time);

        if now - self.last_idle >= IDLE_INTERVAL {
-
            debug!("Running 'idle' task...");
+
            debug!(target: "service", "Running 'idle' task...");

            self.keep_alive(&now);
            self.disconnect_unresponsive_peers(&now);
@@ -363,7 +363,7 @@ where
            self.last_idle = now;
        }
        if now - self.last_sync >= SYNC_INTERVAL {
-
            debug!("Running 'sync' task...");
+
            debug!(target: "service", "Running 'sync' task...");

            // TODO: What do we do here?
            self.reactor.wakeup(SYNC_INTERVAL);
@@ -379,7 +379,7 @@ where
            self.last_announce = now;
        }
        if now - self.last_prune >= PRUNE_INTERVAL {
-
            debug!("Running 'prune' task...");
+
            debug!(target: "service", "Running 'prune' task...");

            if let Err(err) = self.prune_routing_entries(&now) {
                error!("Error pruning routing entries: {}", err);
@@ -390,7 +390,7 @@ where
    }

    pub fn command(&mut self, cmd: Command) {
-
        debug!("Received command {:?}", cmd);
+
        debug!(target: "service", "Received command {:?}", cmd);

        match cmd {
            Command::Connect(id, addr) => self.reactor.connect(id, addr),
@@ -410,7 +410,7 @@ where
                        .filter(|node| *node != self.node_id())
                        .collect(),
                    Err(err) => {
-
                        log::error!("Error reading routing table for {id}: {err}");
+
                        error!(target: "service", "Error reading routing table for {id}: {err}");
                        resp.send(FetchLookup::NotFound).ok();

                        return;
@@ -418,12 +418,12 @@ where
                };

                let Some(seeds) = NonEmpty::from_vec(seeds) else {
-
                    log::warn!("No seeds found to fetch from, for {}", id);
+
                    warn!(target: "service", "No seeds found to fetch from, for {}", id);
                    resp.send(FetchLookup::NotFound).ok();

                    return;
                };
-
                log::debug!("Found {} seed(s) to fetch from, for {}", seeds.len(), id);
+
                debug!(target: "service", "Found {} seed(s) to fetch from, for {}", seeds.len(), id);

                let (results_send, results) = chan::bounded(seeds.len());
                resp.send(FetchLookup::Found {
@@ -484,13 +484,13 @@ where
        let seed = session.id;

        if let Some(fetch) = session.fetch(rid) {
-
            debug!("Fetch initiated for {rid} with {seed}..");
+
            debug!(target: "service", "Fetch initiated for {rid} with {seed}..");

            reactor.write(session.id, fetch);
        } else {
            // TODO: If we can't fetch, it's because we're already fetching from
            // this peer. So we need to queue the request, or find another peer.
-
            log::error!(
+
            error!(target: "service",
                "Unable to fetch {rid} from peer {seed} that is already being fetched from"
            );
        }
@@ -538,7 +538,7 @@ where
    }

    pub fn attempted(&mut self, id: NodeId, addr: &Address) {
-
        debug!("Attempted connection to {id} ({addr})");
+
        debug!(target: "service", "Attempted connection to {id} ({addr})");

        let persistent = self.config.is_persistent(&id);
        self.sessions
@@ -548,7 +548,7 @@ where
    }

    pub fn connected(&mut self, remote: NodeId, link: Link) {
-
        info!("Connected to {} ({:?})", remote, link);
+
        info!(target: "service", "Connected to {} ({:?})", remote, link);

        // For outbound connections, we are the first to say "Hello".
        // For inbound connections, we wait for the remote to say "Hello" first.
@@ -584,7 +584,7 @@ where
    pub fn disconnected(&mut self, remote: NodeId, reason: &DisconnectReason) {
        let since = self.local_time();

-
        debug!("Disconnected from {} ({})", remote, reason);
+
        debug!(target: "service", "Disconnected from {} ({})", remote, reason);

        if let Some(session) = self.sessions.get_mut(&remote) {
            session.to_disconnected(since);
@@ -600,7 +600,7 @@ where
                    }
                    // TODO: Eventually we want a delay before attempting a reconnection,
                    // with exponential back-off.
-
                    debug!(
+
                    debug!(target: "service",
                        "Reconnecting to {} (attempts={})...",
                        remote,
                        session.attempts()
@@ -673,7 +673,7 @@ where
                // Discard inventory messages we've already seen, otherwise update
                // out last seen time.
                if !peer.inventory_announced(timestamp) {
-
                    debug!("Ignoring stale inventory announcement from {announcer}");
+
                    debug!(target: "service", "Ignoring stale inventory announcement from {announcer}");
                    return Ok(false);
                }

@@ -708,7 +708,7 @@ where
                    // Discard inventory messages we've already seen, otherwise update
                    // out last seen time.
                    if !peer.refs_announced(message.id, timestamp) {
-
                        debug!("Ignoring stale refs announcement from {announcer}");
+
                        debug!(target: "service", "Ignoring stale refs announcement from {announcer}");
                        return Ok(false);
                    }
                    // TODO: Check refs to see if we should try to fetch or not.
@@ -722,7 +722,7 @@ where

                    return Ok(true);
                } else {
-
                    log::debug!(
+
                    debug!(target: "service",
                        "Ignoring refs announcement from {announcer}: repository {} isn't tracked",
                        message.id
                    );
@@ -739,19 +739,19 @@ where
                // Discard node messages we've already seen, otherwise update
                // our last seen time.
                if !peer.node_announced(timestamp) {
-
                    debug!("Ignoring stale node announcement from {announcer}");
+
                    debug!(target: "service", "Ignoring stale node announcement from {announcer}");
                    return Ok(false);
                }

                if !ann.validate() {
-
                    warn!("Dropping node announcement from {announcer}: invalid proof-of-work");
+
                    warn!(target: "service", "Dropping node announcement from {announcer}: invalid proof-of-work");
                    return Ok(false);
                }

                let alias = match str::from_utf8(alias) {
                    Ok(s) => s,
                    Err(e) => {
-
                        warn!("Dropping node announcement from {announcer}: invalid alias: {e}");
+
                        warn!(target: "service", "Dropping node announcement from {announcer}: invalid alias: {e}");
                        return Ok(false);
                    }
                };
@@ -774,7 +774,7 @@ where
                    Ok(updated) => {
                        // Only relay if we received new information.
                        if updated {
-
                            debug!(
+
                            debug!(target: "service",
                                "Address store entry for node {announcer} updated at {timestamp}"
                            );
                            return Ok(relay);
@@ -800,7 +800,7 @@ where
        };
        peer.last_active = self.clock;

-
        debug!("Received message {:?} from {}", &message, peer.id);
+
        debug!(target: "service", "Received message {:?} from {}", &message, peer.id);

        match (&mut peer.state, message) {
            (
@@ -812,7 +812,7 @@ where
            ) => {
                // This should never happen if the service is properly configured, since all
                // incoming data is sent directly to the Git worker.
-
                log::error!("Received gossip message from {remote} during git fetch");
+
                log::error!(target: "service", "Received gossip message from {remote} during git fetch");

                return Err(session::Error::Misbehavior);
            }
@@ -820,6 +820,7 @@ where
                // Already initialized!
                if *initialized {
                    debug!(
+
                        target: "service",
                        "Disconnecting peer {} for initializing already initialized session",
                        peer.id
                    );
@@ -895,7 +896,7 @@ where
                }
            }
            (session::State::Connected { protocol, .. }, Message::Fetch { rid }) => {
-
                debug!("Fetch requested for {rid} from {remote}..");
+
                debug!(target: "service", "Fetch requested for {rid} from {remote}..");

                // TODO: Check that we have the repo first?

@@ -919,7 +920,7 @@ where
                    );
                    return Err(session::Error::Misbehavior);
                }
-
                debug!("Fetch accepted for {rid} from {remote}..");
+
                debug!(target: "service", "Fetch accepted for {rid} from {remote}..");

                *protocol = Protocol::Fetch;
                // Instruct the transport to handover the socket to the worker.
@@ -930,7 +931,7 @@ where
                error!("Received {:?} from connecting peer {}", msg, peer.id);
            }
            (session::State::Disconnected { .. }, msg) => {
-
                debug!("Ignoring {:?} from disconnected peer {}", msg, peer.id);
+
                debug!(target: "service", "Ignoring {:?} from disconnected peer {}", msg, peer.id);
            }
        }
        Ok(())
@@ -947,7 +948,7 @@ where
        for proj_id in inventory {
            included.insert(proj_id);
            if self.routing.insert(*proj_id, from, *timestamp)? {
-
                log::info!("Routing table updated for {proj_id} with seed {from}");
+
                info!(target: "service", "Routing table updated for {proj_id} with seed {from}");

                if self
                    .tracking
@@ -978,7 +979,8 @@ where
        let timestamp = self.clock.as_secs();

        if remote.refs.len() > Refs::max() {
-
            log::error!(
+
            error!(
+
                target: "service",
                "refs announcement limit ({}) exceeded, other nodes will see only some of your project references",
                Refs::max(),
            );
@@ -1079,7 +1081,7 @@ where
    fn maintain_connections(&mut self) {
        let addrs = self.choose_addresses();
        if addrs.is_empty() {
-
            debug!("No eligible peers available to connect to");
+
            debug!(target: "service", "No eligible peers available to connect to");
        }
        for (id, addr) in addrs {
            self.reactor.connect(id, addr.clone());
@@ -1393,7 +1395,8 @@ mod gossip {
        type Inventory = BoundedVec<Id, INVENTORY_LIMIT>;

        if inventory.len() > Inventory::max() {
-
            log::error!(
+
            error!(
+
                target: "service",
                "inventory announcement limit ({}) exceeded, other nodes will see only some of your projects",
                inventory.len()
            );