From 0f0577c75844e3335eedafd7afda2d854446dc9f Mon Sep 17 00:00:00 2001 From: alyssa Date: Sun, 27 Jul 2025 00:18:47 +0000 Subject: [PATCH] feat(stats): add metric for basebackup age --- Cargo.lock | 1 + ci/rust-docker-target.sh | 7 ++- crates/scheduled_tasks/Cargo.toml | 1 + crates/scheduled_tasks/src/main.rs | 14 ++++- crates/scheduled_tasks/src/tasks.rs | 91 ++++++++++++++++++++++++++++- 5 files changed, 111 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96bb2e24..9309d1a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3311,6 +3311,7 @@ dependencies = [ "chrono", "croner", "fred", + "lazy_static", "libpk", "metrics", "num-format", diff --git a/ci/rust-docker-target.sh b/ci/rust-docker-target.sh index ba6df5e9..d7d5f6fc 100755 --- a/ci/rust-docker-target.sh +++ b/ci/rust-docker-target.sh @@ -42,5 +42,10 @@ build api build dispatch build gateway build avatars "COPY .docker-bin/avatar_cleanup /bin/avatar_cleanup" -build scheduled_tasks +build scheduled_tasks "$(cat < anyhow::Result<()> { update_db_message_meta ); doforever!("* * * * *", "discord stats updater", update_discord_stats); - // on :00 and :30 + // on hh:00 and hh:30 doforever!( "0,30 * * * *", "queue deleted image cleanup job", queue_deleted_image_cleanup ); + // non-standard cron: at hh:mm:00, hh:mm:30 doforever!("0,30 * * * * *", "stats api updater", update_stats_api); + // every hour (could probably even be less frequent, basebackups are taken rarely) + doforever!( + "* * * * *", + "data basebackup info updater", + update_data_basebackup_prometheus + ); + doforever!( + "* * * * *", + "messages basebackup info updater", + update_messages_basebackup_prometheus + ); set.join_next() .await diff --git a/crates/scheduled_tasks/src/tasks.rs b/crates/scheduled_tasks/src/tasks.rs index 64246fc9..84773149 100644 --- a/crates/scheduled_tasks/src/tasks.rs +++ b/crates/scheduled_tasks/src/tasks.rs @@ -1,4 +1,4 @@ -use std::time::Duration; +use std::{collections::HashMap, time::Duration}; use anyhow::anyhow; use fred::prelude::KeysInterface; @@ -10,10 +10,22 @@ use metrics::gauge; use num_format::{Locale, ToFormattedString}; use reqwest::ClientBuilder; use sqlx::Executor; +use tokio::{process::Command, sync::Mutex}; use crate::AppCtx; pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> { + let data_ts = *BASEBACKUP_TS.lock().await.get("data").unwrap_or(&0) as f64; + let messages_ts = *BASEBACKUP_TS.lock().await.get("messages").unwrap_or(&0) as f64; + + let now_ts = chrono::Utc::now().timestamp() as f64; + + gauge!("pluralkit_latest_backup_ts", "repo" => "data").set(data_ts); + gauge!("pluralkit_latest_backup_ts", "repo" => "messages").set(messages_ts); + + gauge!("pluralkit_latest_backup_age", "repo" => "data").set(now_ts - data_ts); + gauge!("pluralkit_latest_backup_age", "repo" => "messages").set(now_ts - messages_ts); + #[derive(sqlx::FromRow)] struct Count { count: i64, @@ -41,6 +53,83 @@ pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> { Ok(()) } +lazy_static::lazy_static! { + static ref BASEBACKUP_TS: Mutex> = Mutex::new(HashMap::new()); +} + +pub async fn update_data_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> { + update_basebackup_ts("data".to_string()).await +} + +pub async fn update_messages_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> { + update_basebackup_ts("messages".to_string()).await +} + +async fn update_basebackup_ts(repo: String) -> anyhow::Result<()> { + let mut env = HashMap::new(); + + for (key, value) in std::env::vars() { + if key.starts_with("AWS") { + env.insert(key, value); + } + } + + env.insert( + "WALG_S3_PREFIX".to_string(), + format!("s3://pluralkit-backups/{repo}/"), + ); + + let output = Command::new("wal-g") + .arg("backup-list") + .arg("--json") + .envs(env) + .output() + .await?; + + if !output.status.success() { + // todo: we should return error here + tracing::error!( + status = output.status.code(), + "failed to execute wal-g command" + ); + return Ok(()); + } + + #[derive(serde::Deserialize)] + struct WalgBackupInfo { + backup_name: String, + time: String, + ts_parsed: Option, + } + + let mut info = + serde_json::from_str::>(&String::from_utf8_lossy(&output.stdout))? + .into_iter() + .filter(|v| v.backup_name.contains("base")) + .filter_map(|mut v| { + chrono::DateTime::parse_from_rfc3339(&v.time) + .ok() + .map(|dt| { + v.ts_parsed = Some(dt.with_timezone(&chrono::Utc).timestamp()); + v + }) + }) + .collect::>(); + + info.sort_by(|a, b| b.ts_parsed.cmp(&a.ts_parsed)); + + let Some(info) = info.first() else { + anyhow::bail!("could not find any basebackups in repo {repo}"); + }; + + BASEBACKUP_TS + .lock() + .await + .insert(repo, info.ts_parsed.unwrap()); + + Ok(()) +} + pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> { ctx.data .execute(