Merge remote-tracking branch 'upstream/main' into rust-command-parser

This commit is contained in:
dusk 2025-01-21 00:46:50 +09:00
commit 07b616e422
No known key found for this signature in database
30 changed files with 1253 additions and 282 deletions

View file

@ -31,29 +31,16 @@ pub async fn discord_state(State(ctx): State<ApiContext>) -> Json<Value> {
}
pub async fn meta(State(ctx): State<ApiContext>) -> Json<Value> {
let cluster_stats = ctx
.redis
.hgetall::<HashMap<String, String>, &str>("pluralkit:cluster_stats")
.await
.unwrap()
.values()
.map(|v| serde_json::from_str(v).unwrap())
.collect::<Vec<ClusterStats>>();
let stats = serde_json::from_str::<Value>(
ctx.redis
.get::<String, &'static str>("statsapi")
.await
.unwrap()
.as_str(),
)
.unwrap();
let db_stats = libpk::db::repository::get_stats(&ctx.db).await.unwrap();
let guild_count: i32 = cluster_stats.iter().map(|v| v.guild_count).sum();
let channel_count: i32 = cluster_stats.iter().map(|v| v.channel_count).sum();
Json(json!({
"system_count": db_stats.system_count,
"member_count": db_stats.member_count,
"group_count": db_stats.group_count,
"switch_count": db_stats.switch_count,
"message_count": db_stats.message_count,
"guild_count": guild_count,
"channel_count": channel_count,
}))
Json(stats)
}
use std::time::Duration;

View file

@ -80,7 +80,7 @@ async fn dispatch(
) -> String {
// todo: fix
if req.auth != std::env::var("HTTP_AUTH_TOKEN").unwrap() {
return "".to_string();
panic!("bad auth");
}
let uri = match req.url.parse::<Uri>() {

View file

@ -85,6 +85,8 @@ impl ShardStateManager {
.recent()
.first()
.map_or_else(|| 0, |d| d.as_millis()) as i32;
gauge!("pluralkit_gateway_shard_latency", "shard_id" => shard_id.to_string())
.set(info.latency);
self.save_shard(shard_id, info).await?;
Ok(())
}

View file

@ -12,6 +12,7 @@ fred = { workspace = true }
metrics = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

View file

@ -74,7 +74,6 @@ async fn real_main() -> anyhow::Result<()> {
"message stats updater",
update_db_message_meta
);
// every minute
doforever!("* * * * *", "discord stats updater", update_discord_stats);
// on :00 and :30
doforever!(
@ -82,6 +81,7 @@ async fn real_main() -> anyhow::Result<()> {
"queue deleted image cleanup job",
queue_deleted_image_cleanup
);
doforever!("0,30 * * * * *", "stats api updater", update_stats_api);
set.join_next()
.await

View file

@ -10,6 +10,7 @@ use metrics::gauge;
use num_format::{Locale, ToFormattedString};
use reqwest::ClientBuilder;
use sqlx::Executor;
use tracing::error;
use crate::AppCtx;
@ -34,9 +35,9 @@ pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> {
r#"
update info set
system_count = (select count(*) from systems),
member_count = (select count(*) from systems),
group_count = (select count(*) from systems),
switch_count = (select count(*) from systems)
member_count = (select count(*) from members),
group_count = (select count(*) from groups),
switch_count = (select count(*) from switches)
"#,
)
.await?;
@ -149,3 +150,128 @@ select id, now() from images where
.await?;
Ok(())
}
pub async fn update_stats_api(ctx: AppCtx) -> anyhow::Result<()> {
let client = ClientBuilder::new()
.connect_timeout(Duration::from_secs(3))
.timeout(Duration::from_secs(3))
.build()
.expect("error making client");
#[derive(serde::Deserialize, Debug)]
struct PrometheusResult {
data: PrometheusResultData,
}
#[derive(serde::Deserialize, Debug)]
struct PrometheusResultData {
result: Vec<PrometheusData>,
}
#[derive(serde::Deserialize, Debug)]
struct PrometheusData {
value: Vec<serde_json::Value>,
}
macro_rules! prom_instant_query {
($t:ty, $q:expr) => {{
let resp = client
.get(format!(
"http://vm.svc.pluralkit.net/select/0/prometheus/api/v1/query?query={}",
$q
))
.send()
.await?;
let data = resp.json::<PrometheusResult>().await?;
let error_handler = || {
error!("missing data at {}", $q);
};
data.data
.result
.get(0)
.ok_or_else(error_handler)
.unwrap()
.value
.clone()
.get(1)
.ok_or_else(error_handler)
.unwrap()
.as_str()
.ok_or_else(error_handler)
.unwrap()
.parse::<$t>()?
}};
($t:ty, $q:expr, $wrap:expr) => {{
let val = prom_instant_query!($t, $q);
let val = (val * $wrap).round() / $wrap;
format!("{:.2}", val).parse::<f64>().unwrap()
}};
}
#[derive(serde::Serialize, sqlx::FromRow)]
struct DbStats {
systems: i64,
members: i64,
groups: i64,
switches: i64,
messages: i64,
messages_24h: i64,
guilds: i64,
channels: i64,
}
let db_stats: DbStats = sqlx::query_as(r#"
select
t1.value as systems,
t2.value as members,
t3.value as groups,
t4.value as switches,
t5.value as messages,
(t5.value - t6.value) as messages_24h,
t7.value as guilds,
t8.value as channels
from
(select value from systems order by timestamp desc limit 1) as t1,
(select value from members order by timestamp desc limit 1) as t2,
(select value from groups order by timestamp desc limit 1) as t3,
(select value from switches order by timestamp desc limit 1) as t4,
(select value from messages order by timestamp desc limit 1) as t5,
(select value from messages where timestamp > now() - interval '1 day' order by timestamp asc limit 1) as t6,
(select value from guilds order by timestamp desc limit 1) as t7,
(select value from channels order by timestamp desc limit 1) as t8
"#).fetch_one(&ctx.stats).await?;
let data = serde_json::json!({
"db": db_stats,
"prom": {
"messages_1m": prom_instant_query!(f32, "sum(bot__messages_processed_rate1m)", 10.0),
"messages_15m": prom_instant_query!(f32, "sum(bot__messages_processed_rate15m)", 10.0),
"proxy_1m": prom_instant_query!(f32, "sum(bot__messages_proxied_rate1m)", 10.0),
"proxy_15m": prom_instant_query!(f32, "sum(bot__messages_proxied_rate15m)", 10.0),
"commands_1m": prom_instant_query!(f32, "sum(bot__commands_run_rate1m)", 10.0),
"commands_15m": prom_instant_query!(f32, "sum(bot__commands_run_rate15m)", 10.0),
"cpu_total_cores": prom_instant_query!(usize, "sum(host_physical_cpus)"),
"cpu_total_threads": prom_instant_query!(usize, "sum(host_logical_cpus)"),
"cpu_used": prom_instant_query!(f32, "100 * ((sum(host_logical_cpus) - sum(rate(host_cpu_seconds_total{mode=\"idle\"}[1m]))) / sum(host_logical_cpus)) * sum(host_logical_cpus)", 10.0),
"memory_total": prom_instant_query!(i64, "sum(host_memory_total_bytes)").to_string(),
"memory_used": prom_instant_query!(i64, "sum(host_memory_total_bytes) - sum(host_memory_available_bytes)").to_string(),
"nirn_proxy_rps": prom_instant_query!(f32, "sum(rate(nirn_proxy_requests_count))", 10.0),
"nirn_proxy_latency_p90": prom_instant_query!(f32, "histogram_quantile(0.9, sum(rate(nirn_proxy_requests_bucket[5m])) by (le))", 1000.0),
"nirn_proxy_latency_p99": prom_instant_query!(f32, "histogram_quantile(0.99, sum(rate(nirn_proxy_requests_bucket[5m])) by (le))", 1000.0),
"shard_latency_average": prom_instant_query!(f32, "avg(pluralkit_gateway_shard_latency)", 10.0),
}
});
ctx.redis
.set::<(), &str, String>(
"statsapi",
serde_json::to_string(&data).expect("should not fail"),
Some(fred::types::Expiration::EX(60)),
None,
false,
)
.await?;
Ok(())
}