mirror of
https://github.com/PluralKit/PluralKit.git
synced 2026-02-12 16:50:10 +00:00
feat(stats): add metric for basebackup age
This commit is contained in:
parent
aa103f85e7
commit
957dfb4074
5 changed files with 111 additions and 3 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -3311,6 +3311,7 @@ dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"croner",
|
"croner",
|
||||||
"fred",
|
"fred",
|
||||||
|
"lazy_static",
|
||||||
"libpk",
|
"libpk",
|
||||||
"metrics",
|
"metrics",
|
||||||
"num-format",
|
"num-format",
|
||||||
|
|
|
||||||
|
|
@ -42,5 +42,10 @@ build api
|
||||||
build dispatch
|
build dispatch
|
||||||
build gateway
|
build gateway
|
||||||
build avatars "COPY .docker-bin/avatar_cleanup /bin/avatar_cleanup"
|
build avatars "COPY .docker-bin/avatar_cleanup /bin/avatar_cleanup"
|
||||||
build scheduled_tasks
|
build scheduled_tasks "$(cat <<EOF
|
||||||
|
RUN wget https://github.com/wal-g/wal-g/releases/download/v3.0.7/wal-g-pg-ubuntu-22.04-amd64 -O /usr/local/bin/wal-g
|
||||||
|
RUN chmod +x /usr/local/bin/wal-g
|
||||||
|
RUN apk add gcompat
|
||||||
|
EOF
|
||||||
|
)"
|
||||||
build gdpr_worker
|
build gdpr_worker
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ libpk = { path = "../libpk" }
|
||||||
anyhow = { workspace = true }
|
anyhow = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
fred = { workspace = true }
|
fred = { workspace = true }
|
||||||
|
lazy_static = { workspace = true }
|
||||||
metrics = { workspace = true }
|
metrics = { workspace = true }
|
||||||
reqwest = { workspace = true }
|
reqwest = { workspace = true }
|
||||||
serde = { workspace = true }
|
serde = { workspace = true }
|
||||||
|
|
|
||||||
|
|
@ -99,13 +99,25 @@ async fn main() -> anyhow::Result<()> {
|
||||||
update_db_message_meta
|
update_db_message_meta
|
||||||
);
|
);
|
||||||
doforever!("* * * * *", "discord stats updater", update_discord_stats);
|
doforever!("* * * * *", "discord stats updater", update_discord_stats);
|
||||||
// on :00 and :30
|
// on hh:00 and hh:30
|
||||||
doforever!(
|
doforever!(
|
||||||
"0,30 * * * *",
|
"0,30 * * * *",
|
||||||
"queue deleted image cleanup job",
|
"queue deleted image cleanup job",
|
||||||
queue_deleted_image_cleanup
|
queue_deleted_image_cleanup
|
||||||
);
|
);
|
||||||
|
// non-standard cron: at hh:mm:00, hh:mm:30
|
||||||
doforever!("0,30 * * * * *", "stats api updater", update_stats_api);
|
doforever!("0,30 * * * * *", "stats api updater", update_stats_api);
|
||||||
|
// every hour (could probably even be less frequent, basebackups are taken rarely)
|
||||||
|
doforever!(
|
||||||
|
"* * * * *",
|
||||||
|
"data basebackup info updater",
|
||||||
|
update_data_basebackup_prometheus
|
||||||
|
);
|
||||||
|
doforever!(
|
||||||
|
"* * * * *",
|
||||||
|
"messages basebackup info updater",
|
||||||
|
update_messages_basebackup_prometheus
|
||||||
|
);
|
||||||
|
|
||||||
set.join_next()
|
set.join_next()
|
||||||
.await
|
.await
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use std::time::Duration;
|
use std::{collections::HashMap, time::Duration};
|
||||||
|
|
||||||
use anyhow::anyhow;
|
use anyhow::anyhow;
|
||||||
use fred::prelude::KeysInterface;
|
use fred::prelude::KeysInterface;
|
||||||
|
|
@ -10,10 +10,22 @@ use metrics::gauge;
|
||||||
use num_format::{Locale, ToFormattedString};
|
use num_format::{Locale, ToFormattedString};
|
||||||
use reqwest::ClientBuilder;
|
use reqwest::ClientBuilder;
|
||||||
use sqlx::Executor;
|
use sqlx::Executor;
|
||||||
|
use tokio::{process::Command, sync::Mutex};
|
||||||
|
|
||||||
use crate::AppCtx;
|
use crate::AppCtx;
|
||||||
|
|
||||||
pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
|
pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
|
||||||
|
let data_ts = *BASEBACKUP_TS.lock().await.get("data").unwrap_or(&0) as f64;
|
||||||
|
let messages_ts = *BASEBACKUP_TS.lock().await.get("messages").unwrap_or(&0) as f64;
|
||||||
|
|
||||||
|
let now_ts = chrono::Utc::now().timestamp() as f64;
|
||||||
|
|
||||||
|
gauge!("pluralkit_latest_backup_ts", "repo" => "data").set(data_ts);
|
||||||
|
gauge!("pluralkit_latest_backup_ts", "repo" => "messages").set(messages_ts);
|
||||||
|
|
||||||
|
gauge!("pluralkit_latest_backup_age", "repo" => "data").set(now_ts - data_ts);
|
||||||
|
gauge!("pluralkit_latest_backup_age", "repo" => "messages").set(now_ts - messages_ts);
|
||||||
|
|
||||||
#[derive(sqlx::FromRow)]
|
#[derive(sqlx::FromRow)]
|
||||||
struct Count {
|
struct Count {
|
||||||
count: i64,
|
count: i64,
|
||||||
|
|
@ -41,6 +53,83 @@ pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lazy_static::lazy_static! {
|
||||||
|
static ref BASEBACKUP_TS: Mutex<HashMap<String, i64>> = Mutex::new(HashMap::new());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_data_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> {
|
||||||
|
update_basebackup_ts("data".to_string()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn update_messages_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> {
|
||||||
|
update_basebackup_ts("messages".to_string()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn update_basebackup_ts(repo: String) -> anyhow::Result<()> {
|
||||||
|
let mut env = HashMap::new();
|
||||||
|
|
||||||
|
for (key, value) in std::env::vars() {
|
||||||
|
if key.starts_with("AWS") {
|
||||||
|
env.insert(key, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
env.insert(
|
||||||
|
"WALG_S3_PREFIX".to_string(),
|
||||||
|
format!("s3://pluralkit-backups/{repo}/"),
|
||||||
|
);
|
||||||
|
|
||||||
|
let output = Command::new("wal-g")
|
||||||
|
.arg("backup-list")
|
||||||
|
.arg("--json")
|
||||||
|
.envs(env)
|
||||||
|
.output()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
if !output.status.success() {
|
||||||
|
// todo: we should return error here
|
||||||
|
tracing::error!(
|
||||||
|
status = output.status.code(),
|
||||||
|
"failed to execute wal-g command"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize)]
|
||||||
|
struct WalgBackupInfo {
|
||||||
|
backup_name: String,
|
||||||
|
time: String,
|
||||||
|
ts_parsed: Option<i64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut info =
|
||||||
|
serde_json::from_str::<Vec<WalgBackupInfo>>(&String::from_utf8_lossy(&output.stdout))?
|
||||||
|
.into_iter()
|
||||||
|
.filter(|v| v.backup_name.contains("base"))
|
||||||
|
.filter_map(|mut v| {
|
||||||
|
chrono::DateTime::parse_from_rfc3339(&v.time)
|
||||||
|
.ok()
|
||||||
|
.map(|dt| {
|
||||||
|
v.ts_parsed = Some(dt.with_timezone(&chrono::Utc).timestamp());
|
||||||
|
v
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Vec<WalgBackupInfo>>();
|
||||||
|
|
||||||
|
info.sort_by(|a, b| b.ts_parsed.cmp(&a.ts_parsed));
|
||||||
|
|
||||||
|
let Some(info) = info.first() else {
|
||||||
|
anyhow::bail!("could not find any basebackups in repo {repo}");
|
||||||
|
};
|
||||||
|
|
||||||
|
BASEBACKUP_TS
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.insert(repo, info.ts_parsed.unwrap());
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> {
|
pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> {
|
||||||
ctx.data
|
ctx.data
|
||||||
.execute(
|
.execute(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue