Compare commits

...

5 commits

17 changed files with 331 additions and 90 deletions

12
Cargo.lock generated
View file

@ -88,6 +88,7 @@ dependencies = [
"lazy_static", "lazy_static",
"libpk", "libpk",
"metrics", "metrics",
"pk_macros",
"pluralkit_models", "pluralkit_models",
"reqwest 0.12.15", "reqwest 0.12.15",
"reverse-proxy-service", "reverse-proxy-service",
@ -95,6 +96,7 @@ dependencies = [
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"sqlx", "sqlx",
"subtle",
"tokio", "tokio",
"tower 0.4.13", "tower 0.4.13",
"tower-http", "tower-http",
@ -2529,6 +2531,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
name = "pk_macros" name = "pk_macros"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"prettyplease",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn",
@ -2610,9 +2613,9 @@ dependencies = [
[[package]] [[package]]
name = "prettyplease" name = "prettyplease"
version = "0.2.31" version = "0.2.36"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5316f57387668042f561aae71480de936257848f9c43ce528e311d89a07cadeb" checksum = "ff24dfcda44452b9816fff4cd4227e1bb73ff5a2f1bc1105aa92fb8565ce44d2"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"syn", "syn",
@ -3311,6 +3314,7 @@ dependencies = [
"chrono", "chrono",
"croner", "croner",
"fred", "fred",
"lazy_static",
"libpk", "libpk",
"metrics", "metrics",
"num-format", "num-format",
@ -3963,9 +3967,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]] [[package]]
name = "syn" name = "syn"
version = "2.0.100" version = "2.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b09a44accad81e1ba1cd74a32461ba89dee89095ba17b32f5d03683b1b1fc2a0" checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",

View file

@ -42,5 +42,10 @@ build api
build dispatch build dispatch
build gateway build gateway
build avatars "COPY .docker-bin/avatar_cleanup /bin/avatar_cleanup" build avatars "COPY .docker-bin/avatar_cleanup /bin/avatar_cleanup"
build scheduled_tasks build scheduled_tasks "$(cat <<EOF
RUN wget https://github.com/wal-g/wal-g/releases/download/v3.0.7/wal-g-pg-ubuntu-22.04-amd64 -O /usr/local/bin/wal-g
RUN chmod +x /usr/local/bin/wal-g
RUN apk add gcompat
EOF
)"
build gdpr_worker build gdpr_worker

View file

@ -5,6 +5,7 @@ edition = "2021"
[dependencies] [dependencies]
pluralkit_models = { path = "../models" } pluralkit_models = { path = "../models" }
pk_macros = { path = "../macros" }
libpk = { path = "../libpk" } libpk = { path = "../libpk" }
anyhow = { workspace = true } anyhow = { workspace = true }
@ -26,3 +27,4 @@ reverse-proxy-service = { version = "0.2.1", features = ["axum"] }
serde_urlencoded = "0.7.1" serde_urlencoded = "0.7.1"
tower = "0.4.13" tower = "0.4.13"
tower-http = { version = "0.5.2", features = ["catch-panic"] } tower-http = { version = "0.5.2", features = ["catch-panic"] }
subtle = "2.6.1"

View file

@ -7,11 +7,16 @@ pub const INTERNAL_APPID_HEADER: &'static str = "x-pluralkit-appid";
pub struct AuthState { pub struct AuthState {
system_id: Option<i32>, system_id: Option<i32>,
app_id: Option<i32>, app_id: Option<i32>,
internal: bool,
} }
impl AuthState { impl AuthState {
pub fn new(system_id: Option<i32>, app_id: Option<i32>) -> Self { pub fn new(system_id: Option<i32>, app_id: Option<i32>, internal: bool) -> Self {
Self { system_id, app_id } Self {
system_id,
app_id,
internal,
}
} }
pub fn system_id(&self) -> Option<i32> { pub fn system_id(&self) -> Option<i32> {
@ -22,6 +27,10 @@ impl AuthState {
self.app_id self.app_id
} }
pub fn internal(&self) -> bool {
self.internal
}
pub fn access_level_for(&self, a: &impl Authable) -> PrivacyLevel { pub fn access_level_for(&self, a: &impl Authable) -> PrivacyLevel {
if self if self
.system_id .system_id

View file

@ -2,6 +2,7 @@ use crate::ApiContext;
use axum::{extract::State, response::Json}; use axum::{extract::State, response::Json};
use fred::interfaces::*; use fred::interfaces::*;
use libpk::state::ShardState; use libpk::state::ShardState;
use pk_macros::api_endpoint;
use serde::Deserialize; use serde::Deserialize;
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::HashMap; use std::collections::HashMap;
@ -13,34 +14,33 @@ struct ClusterStats {
pub channel_count: i32, pub channel_count: i32,
} }
#[api_endpoint]
pub async fn discord_state(State(ctx): State<ApiContext>) -> Json<Value> { pub async fn discord_state(State(ctx): State<ApiContext>) -> Json<Value> {
let mut shard_status = ctx let mut shard_status = ctx
.redis .redis
.hgetall::<HashMap<String, String>, &str>("pluralkit:shardstatus") .hgetall::<HashMap<String, String>, &str>("pluralkit:shardstatus")
.await .await?
.unwrap()
.values() .values()
.map(|v| serde_json::from_str(v).expect("could not deserialize shard")) .map(|v| serde_json::from_str(v).expect("could not deserialize shard"))
.collect::<Vec<ShardState>>(); .collect::<Vec<ShardState>>();
shard_status.sort_by(|a, b| b.shard_id.cmp(&a.shard_id)); shard_status.sort_by(|a, b| b.shard_id.cmp(&a.shard_id));
Json(json!({ Ok(Json(json!({
"shards": shard_status, "shards": shard_status,
})) })))
} }
#[api_endpoint]
pub async fn meta(State(ctx): State<ApiContext>) -> Json<Value> { pub async fn meta(State(ctx): State<ApiContext>) -> Json<Value> {
let stats = serde_json::from_str::<Value>( let stats = serde_json::from_str::<Value>(
ctx.redis ctx.redis
.get::<String, &'static str>("statsapi") .get::<String, &'static str>("statsapi")
.await .await?
.unwrap()
.as_str(), .as_str(),
) )?;
.unwrap();
Json(stats) Ok(Json(stats))
} }
use std::time::Duration; use std::time::Duration;

View file

@ -1,22 +1,18 @@
use axum::{ use axum::{extract::State, response::IntoResponse, Extension, Json};
extract::State, use pk_macros::api_endpoint;
http::StatusCode, use serde_json::{json, Value};
response::{IntoResponse, Response},
Extension, Json,
};
use serde_json::json;
use sqlx::Postgres; use sqlx::Postgres;
use tracing::error;
use pluralkit_models::{PKSystem, PKSystemConfig, PrivacyLevel}; use pluralkit_models::{PKSystem, PKSystemConfig, PrivacyLevel};
use crate::{auth::AuthState, util::json_err, ApiContext}; use crate::{auth::AuthState, error::fail, ApiContext};
#[api_endpoint]
pub async fn get_system_settings( pub async fn get_system_settings(
Extension(auth): Extension<AuthState>, Extension(auth): Extension<AuthState>,
Extension(system): Extension<PKSystem>, Extension(system): Extension<PKSystem>,
State(ctx): State<ApiContext>, State(ctx): State<ApiContext>,
) -> Response { ) -> Json<Value> {
let access_level = auth.access_level_for(&system); let access_level = auth.access_level_for(&system);
let mut config = match sqlx::query_as::<Postgres, PKSystemConfig>( let mut config = match sqlx::query_as::<Postgres, PKSystemConfig>(
@ -27,23 +23,11 @@ pub async fn get_system_settings(
.await .await
{ {
Ok(Some(config)) => config, Ok(Some(config)) => config,
Ok(None) => { Ok(None) => fail!(
error!( system = system.id,
system = system.id, "failed to find system config for existing system"
"failed to find system config for existing system" ),
); Err(err) => fail!(?err, "failed to query system config"),
return json_err(
StatusCode::INTERNAL_SERVER_ERROR,
r#"{"message": "500: Internal Server Error", "code": 0}"#.to_string(),
);
}
Err(err) => {
error!(?err, "failed to query system config");
return json_err(
StatusCode::INTERNAL_SERVER_ERROR,
r#"{"message": "500: Internal Server Error", "code": 0}"#.to_string(),
);
}
}; };
// fix this // fix this
@ -51,7 +35,7 @@ pub async fn get_system_settings(
config.name_format = Some("{name} {tag}".to_string()); config.name_format = Some("{name} {tag}".to_string());
} }
Json(&match access_level { Ok(Json(match access_level {
PrivacyLevel::Private => config.to_json(), PrivacyLevel::Private => config.to_json(),
PrivacyLevel::Public => json!({ PrivacyLevel::Public => json!({
"pings_enabled": config.pings_enabled, "pings_enabled": config.pings_enabled,
@ -64,6 +48,5 @@ pub async fn get_system_settings(
"proxy_switch": config.proxy_switch, "proxy_switch": config.proxy_switch,
"name_format": config.name_format, "name_format": config.name_format,
}), }),
}) }))
.into_response()
} }

View file

@ -1,13 +1,17 @@
use axum::http::StatusCode; use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use std::fmt; use std::fmt;
// todo // todo: model parse errors
#[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
pub struct PKError { pub struct PKError {
pub response_code: StatusCode, pub response_code: StatusCode,
pub json_code: i32, pub json_code: i32,
pub message: &'static str, pub message: &'static str,
pub inner: Option<anyhow::Error>,
} }
impl fmt::Display for PKError { impl fmt::Display for PKError {
@ -16,17 +20,67 @@ impl fmt::Display for PKError {
} }
} }
impl std::error::Error for PKError {} impl Clone for PKError {
fn clone(&self) -> PKError {
if self.inner.is_some() {
panic!("cannot clone PKError with inner error");
}
PKError {
response_code: self.response_code,
json_code: self.json_code,
message: self.message,
inner: None,
}
}
}
impl<E> From<E> for PKError
where
E: std::fmt::Display + Into<anyhow::Error>,
{
fn from(err: E) -> Self {
let mut res = GENERIC_SERVER_ERROR.clone();
res.inner = Some(err.into());
res
}
}
impl IntoResponse for PKError {
fn into_response(self) -> Response {
if let Some(inner) = self.inner {
tracing::error!(?inner, "error returned from handler");
}
crate::util::json_err(
self.response_code,
serde_json::to_string(&serde_json::json!({
"message": self.message,
"code": self.json_code,
}))
.unwrap(),
)
}
}
macro_rules! fail {
($($stuff:tt)+) => {{
tracing::error!($($stuff)+);
return Err(crate::error::GENERIC_SERVER_ERROR);
}};
}
pub(crate) use fail;
#[allow(unused_macros)]
macro_rules! define_error { macro_rules! define_error {
( $name:ident, $response_code:expr, $json_code:expr, $message:expr ) => { ( $name:ident, $response_code:expr, $json_code:expr, $message:expr ) => {
const $name: PKError = PKError { #[allow(dead_code)]
pub const $name: PKError = PKError {
response_code: $response_code, response_code: $response_code,
json_code: $json_code, json_code: $json_code,
message: $message, message: $message,
inner: None,
}; };
}; };
} }
// define_error! { GENERIC_BAD_REQUEST, StatusCode::BAD_REQUEST, 0, "400: Bad Request" } define_error! { GENERIC_BAD_REQUEST, StatusCode::BAD_REQUEST, 0, "400: Bad Request" }
define_error! { GENERIC_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR, 0, "500: Internal Server Error" }

View file

@ -4,8 +4,8 @@ use auth::{AuthState, INTERNAL_APPID_HEADER, INTERNAL_SYSTEMID_HEADER};
use axum::{ use axum::{
body::Body, body::Body,
extract::{Request as ExtractRequest, State}, extract::{Request as ExtractRequest, State},
http::{Response, StatusCode, Uri}, http::Uri,
response::IntoResponse, response::{IntoResponse, Response},
routing::{delete, get, patch, post}, routing::{delete, get, patch, post},
Extension, Router, Extension, Router,
}; };
@ -13,7 +13,9 @@ use hyper_util::{
client::legacy::{connect::HttpConnector, Client}, client::legacy::{connect::HttpConnector, Client},
rt::TokioExecutor, rt::TokioExecutor,
}; };
use tracing::{error, info}; use tracing::info;
use pk_macros::api_endpoint;
mod auth; mod auth;
mod endpoints; mod endpoints;
@ -30,11 +32,12 @@ pub struct ApiContext {
rproxy_client: Client<HttpConnector, Body>, rproxy_client: Client<HttpConnector, Body>,
} }
#[api_endpoint]
async fn rproxy( async fn rproxy(
Extension(auth): Extension<AuthState>, Extension(auth): Extension<AuthState>,
State(ctx): State<ApiContext>, State(ctx): State<ApiContext>,
mut req: ExtractRequest<Body>, mut req: ExtractRequest<Body>,
) -> Result<Response<Body>, StatusCode> { ) -> Response {
let path = req.uri().path(); let path = req.uri().path();
let path_query = req let path_query = req
.uri() .uri()
@ -59,15 +62,7 @@ async fn rproxy(
headers.append(INTERNAL_APPID_HEADER, aid.into()); headers.append(INTERNAL_APPID_HEADER, aid.into());
} }
Ok(ctx Ok(ctx.rproxy_client.request(req).await?.into_response())
.rproxy_client
.request(req)
.await
.map_err(|error| {
error!(?error, "failed to serve reverse proxy to dotnet-api");
StatusCode::BAD_GATEWAY
})?
.into_response())
} }
// this function is manually formatted for easier legibility of route_services // this function is manually formatted for easier legibility of route_services

View file

@ -5,6 +5,8 @@ use axum::{
response::Response, response::Response,
}; };
use subtle::ConstantTimeEq;
use tracing::error; use tracing::error;
use crate::auth::AuthState; use crate::auth::AuthState;
@ -48,15 +50,31 @@ pub async fn auth(State(ctx): State<ApiContext>, mut req: Request, next: Next) -
.expect("missing api config") .expect("missing api config")
.temp_token2 .temp_token2
.as_ref() .as_ref()
// this is NOT how you validate tokens && app_auth_header
// but this is low abuse risk so we're keeping it for now .as_bytes()
&& app_auth_header == config_token2 .ct_eq(config_token2.as_bytes())
.into()
{ {
authed_app_id = Some(1); authed_app_id = Some(1);
} }
// todo: fix syntax
let internal = if req.headers().get("x-pluralkit-client-ip").is_none()
&& let Some(auth_header) = req
.headers()
.get("x-pluralkit-internalauth")
.map(|h| h.to_str().ok())
.flatten()
&& let Some(real_token) = libpk::config.internal_auth.clone()
&& auth_header.as_bytes().ct_eq(real_token.as_bytes()).into()
{
true
} else {
false
};
req.extensions_mut() req.extensions_mut()
.insert(AuthState::new(authed_system_id, authed_app_id)); .insert(AuthState::new(authed_system_id, authed_app_id, internal));
next.run(req).await next.run(req).await
} }

View file

@ -45,21 +45,6 @@ pub fn ratelimiter<F, T>(f: F) -> FromFnLayer<F, Option<RedisPool>, T> {
tokio::spawn(async move { handle }); tokio::spawn(async move { handle });
let rscript = r.clone();
tokio::spawn(async move {
if let Ok(()) = rscript.wait_for_connect().await {
match rscript
.script_load::<String, String>(LUA_SCRIPT.to_string())
.await
{
Ok(_) => info!("connected to redis for request rate limiting"),
Err(error) => error!(?error, "could not load redis script"),
}
} else {
error!("could not wait for connection to load redis script!");
}
});
r r
}); });
@ -152,12 +137,34 @@ pub async fn do_request_ratelimited(
let period = 1; // seconds let period = 1; // seconds
let cost = 1; // todo: update this for group member endpoints let cost = 1; // todo: update this for group member endpoints
let script_exists: Vec<usize> =
match redis.script_exists(vec![LUA_SCRIPT_SHA.to_string()]).await {
Ok(exists) => exists,
Err(error) => {
error!(?error, "failed to check ratelimit script");
return json_err(
StatusCode::INTERNAL_SERVER_ERROR,
r#"{"message": "500: internal server error", "code": 0}"#.to_string(),
);
}
};
if script_exists[0] != 1 {
match redis
.script_load::<String, String>(LUA_SCRIPT.to_string())
.await
{
Ok(_) => info!("successfully loaded ratelimit script to redis"),
Err(error) => {
error!(?error, "could not load redis script")
}
}
}
// local rate_limit_key = KEYS[1] // local rate_limit_key = KEYS[1]
// local rate = ARGV[1] // local rate = ARGV[1]
// local period = ARGV[2] // local period = ARGV[2]
// return {remaining, tostring(retry_after), reset_after} // return {remaining, tostring(retry_after), reset_after}
// todo: check if error is script not found and reload script
let resp = redis let resp = redis
.evalsha::<(i32, String, u64), String, Vec<String>, Vec<i32>>( .evalsha::<(i32, String, u64), String, Vec<String>, Vec<i32>>(
LUA_SCRIPT_SHA.to_string(), LUA_SCRIPT_SHA.to_string(),
@ -219,7 +226,7 @@ pub async fn do_request_ratelimited(
return response; return response;
} }
Err(error) => { Err(error) => {
tracing::error!(?error, "error getting ratelimit info"); error!(?error, "error getting ratelimit info");
return json_err( return json_err(
StatusCode::INTERNAL_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR,
r#"{"message": "500: internal server error", "code": 0}"#.to_string(), r#"{"message": "500: internal server error", "code": 0}"#.to_string(),

View file

@ -128,6 +128,9 @@ pub struct PKConfig {
#[serde(default)] #[serde(default)]
pub sentry_url: Option<String>, pub sentry_url: Option<String>,
#[serde(default)]
pub internal_auth: Option<String>,
} }
impl PKConfig { impl PKConfig {

View file

@ -10,4 +10,5 @@ proc-macro = true
quote = "1.0" quote = "1.0"
proc-macro2 = "1.0" proc-macro2 = "1.0"
syn = "2.0" syn = "2.0"
prettyplease = "0.2.36"

52
crates/macros/src/api.rs Normal file
View file

@ -0,0 +1,52 @@
use quote::quote;
use syn::{parse_macro_input, FnArg, ItemFn, Pat};
fn pretty_print(ts: &proc_macro2::TokenStream) -> String {
let file = syn::parse_file(&ts.to_string()).unwrap();
prettyplease::unparse(&file)
}
pub fn macro_impl(
_args: proc_macro::TokenStream,
input: proc_macro::TokenStream,
) -> proc_macro::TokenStream {
let input = parse_macro_input!(input as ItemFn);
let fn_name = &input.sig.ident;
let fn_params = &input.sig.inputs;
let fn_body = &input.block;
let syn::ReturnType::Type(_, fn_return_type) = &input.sig.output else {
panic!("handler return type must not be nothing");
};
let pms: Vec<proc_macro2::TokenStream> = fn_params
.iter()
.map(|v| {
let FnArg::Typed(pat) = v else {
panic!("must not have self param in handler");
};
let mut pat = pat.pat.clone();
if let Pat::Ident(ident) = *pat {
let mut ident = ident.clone();
ident.mutability = None;
pat = Box::new(Pat::Ident(ident));
}
quote! { #pat }
})
.collect();
let res = quote! {
#[allow(unused_mut)]
pub async fn #fn_name(#fn_params) -> axum::response::Response {
async fn inner(#fn_params) -> Result<#fn_return_type, crate::error::PKError> {
#fn_body
}
match inner(#(#pms),*).await {
Ok(res) => res.into_response(),
Err(err) => err.into_response(),
}
}
};
res.into()
}

View file

@ -1,8 +1,14 @@
use proc_macro::TokenStream; use proc_macro::TokenStream;
mod api;
mod entrypoint; mod entrypoint;
mod model; mod model;
#[proc_macro_attribute]
pub fn api_endpoint(args: TokenStream, input: TokenStream) -> TokenStream {
api::macro_impl(args, input)
}
#[proc_macro_attribute] #[proc_macro_attribute]
pub fn main(args: TokenStream, input: TokenStream) -> TokenStream { pub fn main(args: TokenStream, input: TokenStream) -> TokenStream {
entrypoint::macro_impl(args, input) entrypoint::macro_impl(args, input)

View file

@ -9,6 +9,7 @@ libpk = { path = "../libpk" }
anyhow = { workspace = true } anyhow = { workspace = true }
chrono = { workspace = true } chrono = { workspace = true }
fred = { workspace = true } fred = { workspace = true }
lazy_static = { workspace = true }
metrics = { workspace = true } metrics = { workspace = true }
reqwest = { workspace = true } reqwest = { workspace = true }
serde = { workspace = true } serde = { workspace = true }

View file

@ -99,13 +99,25 @@ async fn main() -> anyhow::Result<()> {
update_db_message_meta update_db_message_meta
); );
doforever!("* * * * *", "discord stats updater", update_discord_stats); doforever!("* * * * *", "discord stats updater", update_discord_stats);
// on :00 and :30 // on hh:00 and hh:30
doforever!( doforever!(
"0,30 * * * *", "0,30 * * * *",
"queue deleted image cleanup job", "queue deleted image cleanup job",
queue_deleted_image_cleanup queue_deleted_image_cleanup
); );
// non-standard cron: at hh:mm:00, hh:mm:30
doforever!("0,30 * * * * *", "stats api updater", update_stats_api); doforever!("0,30 * * * * *", "stats api updater", update_stats_api);
// every hour (could probably even be less frequent, basebackups are taken rarely)
doforever!(
"* * * * *",
"data basebackup info updater",
update_data_basebackup_prometheus
);
doforever!(
"* * * * *",
"messages basebackup info updater",
update_messages_basebackup_prometheus
);
set.join_next() set.join_next()
.await .await

View file

@ -1,4 +1,4 @@
use std::time::Duration; use std::{collections::HashMap, time::Duration};
use anyhow::anyhow; use anyhow::anyhow;
use fred::prelude::KeysInterface; use fred::prelude::KeysInterface;
@ -10,10 +10,22 @@ use metrics::gauge;
use num_format::{Locale, ToFormattedString}; use num_format::{Locale, ToFormattedString};
use reqwest::ClientBuilder; use reqwest::ClientBuilder;
use sqlx::Executor; use sqlx::Executor;
use tokio::{process::Command, sync::Mutex};
use crate::AppCtx; use crate::AppCtx;
pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> { pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
let data_ts = *BASEBACKUP_TS.lock().await.get("data").unwrap_or(&0) as f64;
let messages_ts = *BASEBACKUP_TS.lock().await.get("messages").unwrap_or(&0) as f64;
let now_ts = chrono::Utc::now().timestamp() as f64;
gauge!("pluralkit_latest_backup_ts", "repo" => "data").set(data_ts);
gauge!("pluralkit_latest_backup_ts", "repo" => "messages").set(messages_ts);
gauge!("pluralkit_latest_backup_age", "repo" => "data").set(now_ts - data_ts);
gauge!("pluralkit_latest_backup_age", "repo" => "messages").set(now_ts - messages_ts);
#[derive(sqlx::FromRow)] #[derive(sqlx::FromRow)]
struct Count { struct Count {
count: i64, count: i64,
@ -41,6 +53,83 @@ pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
Ok(()) Ok(())
} }
lazy_static::lazy_static! {
static ref BASEBACKUP_TS: Mutex<HashMap<String, i64>> = Mutex::new(HashMap::new());
}
pub async fn update_data_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> {
update_basebackup_ts("data".to_string()).await
}
pub async fn update_messages_basebackup_prometheus(_: AppCtx) -> anyhow::Result<()> {
update_basebackup_ts("messages".to_string()).await
}
async fn update_basebackup_ts(repo: String) -> anyhow::Result<()> {
let mut env = HashMap::new();
for (key, value) in std::env::vars() {
if key.starts_with("AWS") {
env.insert(key, value);
}
}
env.insert(
"WALG_S3_PREFIX".to_string(),
format!("s3://pluralkit-backups/{repo}/"),
);
let output = Command::new("wal-g")
.arg("backup-list")
.arg("--json")
.envs(env)
.output()
.await?;
if !output.status.success() {
// todo: we should return error here
tracing::error!(
status = output.status.code(),
"failed to execute wal-g command"
);
return Ok(());
}
#[derive(serde::Deserialize)]
struct WalgBackupInfo {
backup_name: String,
time: String,
ts_parsed: Option<i64>,
}
let mut info =
serde_json::from_str::<Vec<WalgBackupInfo>>(&String::from_utf8_lossy(&output.stdout))?
.into_iter()
.filter(|v| v.backup_name.contains("base"))
.filter_map(|mut v| {
chrono::DateTime::parse_from_rfc3339(&v.time)
.ok()
.map(|dt| {
v.ts_parsed = Some(dt.with_timezone(&chrono::Utc).timestamp());
v
})
})
.collect::<Vec<WalgBackupInfo>>();
info.sort_by(|a, b| b.ts_parsed.cmp(&a.ts_parsed));
let Some(info) = info.first() else {
anyhow::bail!("could not find any basebackups in repo {repo}");
};
BASEBACKUP_TS
.lock()
.await
.insert(repo, info.ts_parsed.unwrap());
Ok(())
}
pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> { pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> {
ctx.data ctx.data
.execute( .execute(