| |
impl<D, S, G> Service<D, S, G>
|
| |
where
|
| |
D: Store,
|
| + |
S: WriteStorage + 'static,
|
| + |
G: crypto::signature::Signer<crypto::Signature>,
|
| + |
{
|
| + |
/// Initialize service with current time. Call this once.
|
| + |
pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
|
| + |
debug!(target: "service", "Init @{}", time.as_millis());
|
| + |
assert_ne!(time, LocalTime::default());
|
| + |
|
| + |
let nid = self.node_id();
|
| + |
|
| + |
self.clock = time;
|
| + |
self.started_at = Some(time);
|
| + |
self.last_online_at = match self.db.gossip().last() {
|
| + |
Ok(Some(last)) => Some(last.to_local_time()),
|
| + |
Ok(None) => None,
|
| + |
Err(e) => {
|
| + |
warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
|
| + |
None
|
| + |
}
|
| + |
};
|
| + |
|
| + |
// Populate refs database. This is only useful as part of the upgrade process for nodes
|
| + |
// that have been online since before the refs database was created.
|
| + |
match self.db.refs().count() {
|
| + |
Ok(0) => {
|
| + |
info!(target: "service", "Empty refs database, populating from storage..");
|
| + |
if let Err(e) = self.db.refs_mut().populate(&self.storage) {
|
| + |
warn!(target: "service", "Failed to populate refs database: {e}");
|
| + |
}
|
| + |
}
|
| + |
Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
|
| + |
Err(e) => {
|
| + |
warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
|
| + |
}
|
| + |
}
|
| + |
|
| + |
let announced = self
|
| + |
.db
|
| + |
.seeds()
|
| + |
.seeded_by(&nid)?
|
| + |
.collect::<Result<HashMap<_, _>, _>>()?;
|
| + |
let mut inventory = BTreeSet::new();
|
| + |
let mut private = BTreeSet::new();
|
| + |
|
| + |
for repo in self.storage.repositories()? {
|
| + |
let rid = repo.rid;
|
| + |
|
| + |
self.upgrade_sigrefs(&repo)?;
|
| + |
|
| + |
// If we're not seeding this repo, just skip it.
|
| + |
if !self.policies.is_seeding(&rid)? {
|
| + |
debug!(target: "service", "Local repository {rid} is not seeded");
|
| + |
continue;
|
| + |
}
|
| + |
// Add public repositories to inventory.
|
| + |
if repo.doc.is_public() {
|
| + |
inventory.insert(rid);
|
| + |
} else {
|
| + |
private.insert(rid);
|
| + |
}
|
| + |
// If we have no owned refs for this repo, then there's nothing to announce.
|
| + |
let Some(updated_at) = repo.synced_at else {
|
| + |
continue;
|
| + |
};
|
| + |
// Skip this repo if the sync status matches what we have in storage.
|
| + |
if let Some(announced) = announced.get(&rid) {
|
| + |
if updated_at.oid == announced.oid {
|
| + |
continue;
|
| + |
}
|
| + |
}
|
| + |
// Make sure our local node's sync status is up to date with storage.
|
| + |
if self.db.seeds_mut().synced(
|
| + |
&rid,
|
| + |
&nid,
|
| + |
updated_at.oid,
|
| + |
updated_at.timestamp.into(),
|
| + |
)? {
|
| + |
debug!(target: "service", "Saved local sync status for {rid}..");
|
| + |
}
|
| + |
// If we got here, it likely means a repo was updated while the node was stopped.
|
| + |
// Therefore, we pre-load a refs announcement for this repo, so that it is included in
|
| + |
// the historical gossip messages when a node connects and subscribes to this repo.
|
| + |
if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
|
| + |
debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
|
| + |
self.db.gossip_mut().announced(&nid, &ann)?;
|
| + |
}
|
| + |
}
|
| + |
|
| + |
// Ensure that our inventory is recorded in our routing table, and we are seeding
|
| + |
// all of it. It can happen that inventory is not properly seeded if for eg. the
|
| + |
// user creates a new repository while the node is stopped.
|
| + |
self.db
|
| + |
.routing_mut()
|
| + |
.add_inventory(inventory.iter(), nid, time.into())?;
|
| + |
self.inventory = gossip::inventory(self.timestamp(), inventory);
|
| + |
|
| + |
// Ensure that private repositories are not in our inventory. It's possible that
|
| + |
// a repository was public and then it was made private.
|
| + |
self.db
|
| + |
.routing_mut()
|
| + |
.remove_inventories(private.iter(), &nid)?;
|
| + |
|
| + |
// Setup subscription filter for seeded repos.
|
| + |
self.filter = Filter::allowed_by(self.policies.seed_policies()?);
|
| + |
// Connect to configured peers.
|
| + |
let addrs = self.config.connect.clone();
|
| + |
for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
|
| + |
if let Err(e) = self.connect(id, addr) {
|
| + |
debug!(target: "service", "Service::initialization connection error: {e}");
|
| + |
}
|
| + |
}
|
| + |
// Try to establish some connections.
|
| + |
self.maintain_connections();
|
| + |
// Start periodic tasks.
|
| + |
self.outbox.wakeup(IDLE_INTERVAL);
|
| + |
self.outbox.wakeup(GOSSIP_INTERVAL);
|
| + |
|
| + |
Ok(())
|
| + |
}
|
| + |
|
| + |
fn upgrade_sigrefs(&mut self, info: &radicle::storage::RepositoryInfo) -> Result<(), Error> {
|
| + |
match &info.refs {
|
| + |
Ok(_) => return Ok(()),
|
| + |
Err(err) => {
|
| + |
log::info!("Migrating `rad/sigrefs` due to: {err}");
|
| + |
}
|
| + |
}
|
| + |
|
| + |
let repo = self.storage.repository_mut(info.rid)?;
|
| + |
repo.sign_refs(&self.signer)?;
|
| + |
Ok(())
|
| + |
}
|
| + |
}
|
| + |
|
| + |
impl<D, S, G> Service<D, S, G>
|
| + |
where
|
| + |
D: Store,
|
| |
S: ReadStorage + 'static,
|
| |
G: crypto::signature::Signer<crypto::Signature>,
|
| |
{
|
| |
Ok(Lookup { local, remote })
|
| |
}
|
| |
|
| - |
/// Initialize service with current time. Call this once.
|
| - |
pub fn initialize(&mut self, time: LocalTime) -> Result<(), Error> {
|
| - |
debug!(target: "service", "Init @{}", time.as_millis());
|
| - |
assert_ne!(time, LocalTime::default());
|
| - |
|
| - |
let nid = self.node_id();
|
| - |
|
| - |
self.clock = time;
|
| - |
self.started_at = Some(time);
|
| - |
self.last_online_at = match self.db.gossip().last() {
|
| - |
Ok(Some(last)) => Some(last.to_local_time()),
|
| - |
Ok(None) => None,
|
| - |
Err(e) => {
|
| - |
warn!(target: "service", "Failed to get the latest gossip message from db: {e}");
|
| - |
None
|
| - |
}
|
| - |
};
|
| - |
|
| - |
// Populate refs database. This is only useful as part of the upgrade process for nodes
|
| - |
// that have been online since before the refs database was created.
|
| - |
match self.db.refs().count() {
|
| - |
Ok(0) => {
|
| - |
info!(target: "service", "Empty refs database, populating from storage..");
|
| - |
if let Err(e) = self.db.refs_mut().populate(&self.storage) {
|
| - |
warn!(target: "service", "Failed to populate refs database: {e}");
|
| - |
}
|
| - |
}
|
| - |
Ok(n) => debug!(target: "service", "Refs database has {n} cached references"),
|
| - |
Err(e) => {
|
| - |
warn!(target: "service", "Failed to retrieve count of refs from database: {e}")
|
| - |
}
|
| - |
}
|
| - |
|
| - |
let announced = self
|
| - |
.db
|
| - |
.seeds()
|
| - |
.seeded_by(&nid)?
|
| - |
.collect::<Result<HashMap<_, _>, _>>()?;
|
| - |
let mut inventory = BTreeSet::new();
|
| - |
let mut private = BTreeSet::new();
|
| - |
|
| - |
for repo in self.storage.repositories()? {
|
| - |
let rid = repo.rid;
|
| - |
|
| - |
// If we're not seeding this repo, just skip it.
|
| - |
if !self.policies.is_seeding(&rid)? {
|
| - |
debug!(target: "service", "Local repository {rid} is not seeded");
|
| - |
continue;
|
| - |
}
|
| - |
// Add public repositories to inventory.
|
| - |
if repo.doc.is_public() {
|
| - |
inventory.insert(rid);
|
| - |
} else {
|
| - |
private.insert(rid);
|
| - |
}
|
| - |
// If we have no owned refs for this repo, then there's nothing to announce.
|
| - |
let Some(updated_at) = repo.synced_at else {
|
| - |
continue;
|
| - |
};
|
| - |
// Skip this repo if the sync status matches what we have in storage.
|
| - |
if let Some(announced) = announced.get(&rid) {
|
| - |
if updated_at.oid == announced.oid {
|
| - |
continue;
|
| - |
}
|
| - |
}
|
| - |
// Make sure our local node's sync status is up to date with storage.
|
| - |
if self.db.seeds_mut().synced(
|
| - |
&rid,
|
| - |
&nid,
|
| - |
updated_at.oid,
|
| - |
updated_at.timestamp.into(),
|
| - |
)? {
|
| - |
debug!(target: "service", "Saved local sync status for {rid}..");
|
| - |
}
|
| - |
// If we got here, it likely means a repo was updated while the node was stopped.
|
| - |
// Therefore, we pre-load a refs announcement for this repo, so that it is included in
|
| - |
// the historical gossip messages when a node connects and subscribes to this repo.
|
| - |
if let Ok((ann, _)) = self.refs_announcement_for(rid, [nid]) {
|
| - |
debug!(target: "service", "Adding refs announcement for {rid} to historical gossip messages..");
|
| - |
self.db.gossip_mut().announced(&nid, &ann)?;
|
| - |
}
|
| - |
}
|
| - |
|
| - |
// Ensure that our inventory is recorded in our routing table, and we are seeding
|
| - |
// all of it. It can happen that inventory is not properly seeded if for eg. the
|
| - |
// user creates a new repository while the node is stopped.
|
| - |
self.db
|
| - |
.routing_mut()
|
| - |
.add_inventory(inventory.iter(), nid, time.into())?;
|
| - |
self.inventory = gossip::inventory(self.timestamp(), inventory);
|
| - |
|
| - |
// Ensure that private repositories are not in our inventory. It's possible that
|
| - |
// a repository was public and then it was made private.
|
| - |
self.db
|
| - |
.routing_mut()
|
| - |
.remove_inventories(private.iter(), &nid)?;
|
| - |
|
| - |
// Setup subscription filter for seeded repos.
|
| - |
self.filter = Filter::allowed_by(self.policies.seed_policies()?);
|
| - |
// Connect to configured peers.
|
| - |
let addrs = self.config.connect.clone();
|
| - |
for (id, addr) in addrs.into_iter().map(|ca| ca.into()) {
|
| - |
if let Err(e) = self.connect(id, addr) {
|
| - |
debug!(target: "service", "Service::initialization connection error: {e}");
|
| - |
}
|
| - |
}
|
| - |
// Try to establish some connections.
|
| - |
self.maintain_connections();
|
| - |
// Start periodic tasks.
|
| - |
self.outbox.wakeup(IDLE_INTERVAL);
|
| - |
self.outbox.wakeup(GOSSIP_INTERVAL);
|
| - |
|
| - |
Ok(())
|
| - |
}
|
| - |
|
| |
pub fn tick(&mut self, now: LocalTime, metrics: &Metrics) {
|
| |
trace!(
|
| |
target: "service",
|