mirror of
https://github.com/PluralKit/PluralKit.git
synced 2026-02-04 04:56:49 +00:00
feat(stats): add metric for basebackup age
This commit is contained in:
parent
2248403140
commit
2d40a1ee16
5 changed files with 111 additions and 3 deletions
|
|
@ -9,6 +9,7 @@ libpk = { path = "../libpk" }
|
|||
anyhow = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
fred = { workspace = true }
|
||||
lazy_static = { workspace = true }
|
||||
metrics = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -99,13 +99,25 @@ async fn main() -> anyhow::Result<()> {
|
|||
update_db_message_meta
|
||||
);
|
||||
doforever!("* * * * *", "discord stats updater", update_discord_stats);
|
||||
// on :00 and :30
|
||||
// on hh:00 and hh:30
|
||||
doforever!(
|
||||
"0,30 * * * *",
|
||||
"queue deleted image cleanup job",
|
||||
queue_deleted_image_cleanup
|
||||
);
|
||||
// non-standard cron: at hh:mm:00, hh:mm:30
|
||||
doforever!("0,30 * * * * *", "stats api updater", update_stats_api);
|
||||
// every hour (could probably even be less frequent, basebackups are taken rarely)
|
||||
doforever!(
|
||||
"* * * * *",
|
||||
"data basebackup info updater",
|
||||
update_data_basebackup_prometheus
|
||||
);
|
||||
doforever!(
|
||||
"* * * * *",
|
||||
"messages basebackup info updater",
|
||||
update_messages_basebackup_prometheus
|
||||
);
|
||||
|
||||
set.join_next()
|
||||
.await
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
use std::time::Duration;
|
||||
use std::{collections::HashMap, time::Duration};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use fred::prelude::KeysInterface;
|
||||
|
|
@ -10,10 +10,22 @@ use metrics::gauge;
|
|||
use num_format::{Locale, ToFormattedString};
|
||||
use reqwest::ClientBuilder;
|
||||
use sqlx::Executor;
|
||||
use tokio::{process::Command, sync::Mutex};
|
||||
|
||||
use crate::AppCtx;
|
||||
|
||||
pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
|
||||
let data_ts = *BASEBACKUP_TS.lock().await.get("data").unwrap_or(&0) as f64;
|
||||
let messages_ts = *BASEBACKUP_TS.lock().await.get("messages").unwrap_or(&0) as f64;
|
||||
|
||||
let now_ts = chrono::Utc::now().timestamp() as f64;
|
||||
|
||||
gauge!("pluralkit_latest_backup_ts", "repo" => "data").set(data_ts);
|
||||
gauge!("pluralkit_latest_backup_ts", "repo" => "messages").set(messages_ts);
|
||||
|
||||
gauge!("pluralkit_latest_backup_age", "repo" => "data").set(now_ts - data_ts);
|
||||
gauge!("pluralkit_latest_backup_age", "repo" => "messages").set(now_ts - messages_ts);
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct Count {
|
||||
count: i64,
|
||||
|
|
@ -41,6 +53,83 @@ pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
lazy_static::lazy_static! {
|
||||
static ref BASEBACKUP_TS: Mutex<HashMap<String, i64>> = Mutex::new(HashMap::new());
|
||||
}
|
||||
|
||||
pub async fn update_data_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> {
|
||||
update_basebackup_ts("data".to_string()).await
|
||||
}
|
||||
|
||||
pub async fn update_messages_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> {
|
||||
update_basebackup_ts("messages".to_string()).await
|
||||
}
|
||||
|
||||
async fn update_basebackup_ts(repo: String) -> anyhow::Result<()> {
|
||||
let mut env = HashMap::new();
|
||||
|
||||
for (key, value) in std::env::vars() {
|
||||
if key.starts_with("AWS") {
|
||||
env.insert(key, value);
|
||||
}
|
||||
}
|
||||
|
||||
env.insert(
|
||||
"WALG_S3_PREFIX".to_string(),
|
||||
format!("s3://pluralkit-backups/{repo}/"),
|
||||
);
|
||||
|
||||
let output = Command::new("wal-g")
|
||||
.arg("backup-list")
|
||||
.arg("--json")
|
||||
.envs(env)
|
||||
.output()
|
||||
.await?;
|
||||
|
||||
if !output.status.success() {
|
||||
// todo: we should return error here
|
||||
tracing::error!(
|
||||
status = output.status.code(),
|
||||
"failed to execute wal-g command"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct WalgBackupInfo {
|
||||
backup_name: String,
|
||||
time: String,
|
||||
ts_parsed: Option<i64>,
|
||||
}
|
||||
|
||||
let mut info =
|
||||
serde_json::from_str::<Vec<WalgBackupInfo>>(&String::from_utf8_lossy(&output.stdout))?
|
||||
.into_iter()
|
||||
.filter(|v| v.backup_name.contains("base"))
|
||||
.filter_map(|mut v| {
|
||||
chrono::DateTime::parse_from_rfc3339(&v.time)
|
||||
.ok()
|
||||
.map(|dt| {
|
||||
v.ts_parsed = Some(dt.with_timezone(&chrono::Utc).timestamp());
|
||||
v
|
||||
})
|
||||
})
|
||||
.collect::<Vec<WalgBackupInfo>>();
|
||||
|
||||
info.sort_by(|a, b| b.ts_parsed.cmp(&a.ts_parsed));
|
||||
|
||||
let Some(info) = info.first() else {
|
||||
anyhow::bail!("could not find any basebackups in repo {repo}");
|
||||
};
|
||||
|
||||
BASEBACKUP_TS
|
||||
.lock()
|
||||
.await
|
||||
.insert(repo, info.ts_parsed.unwrap());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> {
|
||||
ctx.data
|
||||
.execute(
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue