feat: new stats embed / api

This commit is contained in:
alyssa 2025-01-05 00:52:45 +00:00
parent e88d6b7e2a
commit 9f8d3d22d2
8 changed files with 215 additions and 100 deletions

1
Cargo.lock generated
View file

@ -2925,6 +2925,7 @@ dependencies = [
"num-format",
"reqwest 0.12.8",
"serde",
"serde_json",
"sqlx",
"tokio",
"tracing",

View file

@ -3,12 +3,11 @@ using System.Diagnostics;
using App.Metrics;
using Myriad.Builders;
using Myriad.Cache;
using Myriad.Gateway;
using Myriad.Rest;
using Myriad.Rest.Types.Requests;
using Myriad.Types;
using Newtonsoft.Json;
using NodaTime;
using PluralKit.Core;
@ -57,78 +56,85 @@ public class Misc
var timeAfter = SystemClock.Instance.GetCurrentInstant();
var apiLatency = timeAfter - timeBefore;
var embed = new EmbedBuilder();
// todo: these will be inaccurate when the bot is actually multi-process
var messagesReceived = _metrics.Snapshot.GetForContext("Bot").Meters
.FirstOrDefault(m => m.MultidimensionalName == BotMetrics.MessagesReceived.Name)?.Value;
if (messagesReceived != null)
embed.Field(new Embed.Field("Messages processed",
$"{messagesReceived.OneMinuteRate * 60:F1}/m ({messagesReceived.FifteenMinuteRate * 60:F1}/m over 15m)",
true));
var messagesProxied = _metrics.Snapshot.GetForContext("Bot").Meters
.FirstOrDefault(m => m.MultidimensionalName == BotMetrics.MessagesProxied.Name)?.Value;
if (messagesProxied != null)
embed.Field(new Embed.Field("Messages proxied",
$"{messagesProxied.OneMinuteRate * 60:F1}/m ({messagesProxied.FifteenMinuteRate * 60:F1}/m over 15m)",
true));
var commandsRun = _metrics.Snapshot.GetForContext("Bot").Meters
.FirstOrDefault(m => m.MultidimensionalName == BotMetrics.CommandsRun.Name)?.Value;
if (commandsRun != null)
embed.Field(new Embed.Field("Commands executed",
$"{commandsRun.OneMinuteRate * 60:F1}/m ({commandsRun.FifteenMinuteRate * 60:F1}/m over 15m)",
true));
var isCluster = _botConfig.Cluster != null && _botConfig.Cluster.TotalShards != ctx.Cluster.Shards.Count;
var counts = await _repo.GetStats();
var process = Process.GetCurrentProcess();
var stats = await GetStats(ctx);
var shards = await _shards.GetShards();
var shardInfo = shards.Where(s => s.ShardId == ctx.ShardId).FirstOrDefault();
// todo: if we're running multiple processes, it is not useful to get the CPU/RAM usage of just the current one
var process = Process.GetCurrentProcess();
var memoryUsage = process.WorkingSet64;
var now = SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds();
var shardUptime = Duration.FromSeconds(now - shardInfo.LastConnection);
var shardTotal = _botConfig.Cluster?.TotalShards ?? shards.Count();
int shardClusterTotal = ctx.Cluster.Shards.Count;
var shardUpTotal = shards.Where(x => x.Up).Count();
var embed = new EmbedBuilder();
embed
.Field(new Embed.Field("Current shard",
$"Shard #{ctx.ShardId} (of {shardTotal} total,"
+ (isCluster ? $" {shardClusterTotal} in this cluster," : "") + $" {shardUpTotal} are up)"
, true))
.Field(new Embed.Field("Shard uptime",
$"{shardUptime.FormatDuration()} ({shardInfo.DisconnectionCount} disconnections)", true))
.Field(new Embed.Field("CPU usage", $"{_cpu.LastCpuMeasure:P1}", true))
.Field(new Embed.Field("Memory usage", $"{memoryUsage / 1024 / 1024} MiB", true))
.Field(new Embed.Field("Latency",
$"API: {apiLatency.TotalMilliseconds:F0} ms, shard: {shardInfo.Latency} ms",
true));
.Field(new("Connection status", $"**{shards.Count()}** shards across **{shards.Select(s => s.ClusterId).Distinct().Count()}** clusters\n"
+ $"Current server is on **shard {ctx.ShardId} (cluster {shardInfo.ClusterId ?? 0})**\n"
+ $"Latency: API **{apiLatency.TotalMilliseconds:F0}ms** (p90: {stats.prom.nirn_proxy_latency_p90 * 1000:F0}ms, p99: {stats.prom.nirn_proxy_latency_p99 * 1000:F0}ms), "
+ $"shard **{shardInfo.Latency}ms** (avg: {stats.prom.shard_latency_average}ms)", true))
.Field(new("Resource usage", $"**CPU:** {stats.prom.cpu_used}% used / {stats.prom.cpu_total_cores} total cores ({stats.prom.cpu_total_threads} threads)\n"
+ $"**Memory:** {(stats.prom.memory_used / 1_000_000_000):N1}GB used / {(stats.prom.memory_total / 1_000_000_000):N1}GB total", true))
.Field(new("Usage metrics", $"Messages received: **{stats.prom.messages_1m}/s** ({stats.prom.messages_15m}/s over 15m)\n" +
$"Messages proxied: **{stats.prom.proxy_1m}/s** ({stats.prom.proxy_15m}/s over 15m, {stats.db.messages_24h} total in last 24h)\n" +
$"Commands executed: **{stats.prom.commands_1m}/m** ({stats.prom.commands_15m}/m over 15m)"));
embed.Field(new("Total numbers", $" {counts.SystemCount:N0} systems,"
+ $" {counts.MemberCount:N0} members,"
+ $" {counts.GroupCount:N0} groups,"
+ $" {counts.SwitchCount:N0} switches,"
+ $" {counts.MessageCount:N0} messages"));
embed.Field(new("Total numbers", $"**{stats.db.systems:N0}** systems, **{stats.db.members:N0}** members, **{stats.db.groups:N0}** groups, "
+ $"**${stats.db.switches:N0}** switches, **{stats.db.messages:N0}** messages\n" +
$"**{stats.db.guilds:N0}** servers with **{stats.db.channels:N0}** channels"));
embed
.Footer(new(String.Join(" \u2022 ", new[] {
$"PluralKit {BuildInfoService.Version}",
(isCluster ? $"Cluster {_botConfig.Cluster.NodeIndex}" : ""),
"https://github.com/PluralKit/PluralKit",
"Last restarted:",
})))
.Timestamp(process.StartTime.ToString("O"));
embed.Footer(Help.helpEmbed.Footer);
var uptime = ((DateTimeOffset)process.StartTime).ToUnixTimeSeconds();
embed.Description($"### PluralKit [{BuildInfoService.Version}](https://github.com/pluralkit/pluralkit/commit/{BuildInfoService.FullVersion})\n" +
$"Built on <t:{BuildInfoService.Timestamp}> (<t:{BuildInfoService.Timestamp}:R>)"
+ (BuildInfoService.IsDev ? ", **development build**" : "")
+ $"\nLast restart: <t:{uptime}:R>");
await ctx.Rest.EditMessage(msg.ChannelId, msg.Id,
new MessageEditRequest { Content = "", Embeds = new[] { embed.Build() } });
}
}
private async Task<Stats> GetStats(Context ctx)
{
var db = ctx.Redis.Connection.GetDatabase();
var data = await db.StringGetAsync("statsapi");
return JsonConvert.DeserializeObject<Stats>(data);
}
}
// none of these fields are "assigned to" for some reason
#pragma warning disable CS0649
class Stats
{
public DbStats db;
public PrometheusStats prom;
};
class DbStats
{
public double systems;
public double members;
public double groups;
public double switches;
public double messages;
public double messages_24h;
public double guilds;
public double channels;
};
class PrometheusStats
{
public double messages_1m;
public double messages_15m;
public double proxy_1m;
public double proxy_15m;
public double commands_1m;
public double commands_15m;
public double cpu_total_cores;
public double cpu_total_threads;
public double cpu_used;
public double memory_total;
public double memory_used;
public double nirn_proxy_rps;
public double nirn_proxy_latency_p90;
public double nirn_proxy_latency_p99;
public double shard_latency_average;
};

View file

@ -57,7 +57,7 @@
</ItemGroup>
<Target Name="SetSourceRevisionId" BeforeTargets="InitializeSourceControlInformation">
<Exec Command="git rev-parse HEAD &gt; ../.version" IgnoreExitCode="False">
<Exec Command="git rev-parse HEAD &gt; ../.version &amp; git show --no-patch --format=%at $(git rev-parse HEAD) &gt;&gt; ../.version &amp; (git diff-index --quiet HEAD -- &amp;&amp; echo 1) &gt;&gt; ../.version" IgnoreExitCode="True">
</Exec>
</Target>

View file

@ -4,20 +4,21 @@ public static class BuildInfoService
{
public static string Version { get; private set; }
public static string FullVersion { get; private set; }
public static string Timestamp { get; private set; }
public static bool IsDev { get; private set; }
public static async Task LoadVersion()
{
using (var stream = typeof(BuildInfoService).Assembly.GetManifestResourceStream("version"))
{
// if this happens, something broke
if (stream == null) FullVersion = "(unknown version) ";
else
using (var reader = new StreamReader(stream))
FullVersion = await reader.ReadToEndAsync();
}
using var stream = typeof(BuildInfoService).Assembly.GetManifestResourceStream("version");
if (stream == null) throw new Exception("missing version information");
// cheap hack to remove newline
FullVersion = FullVersion.Remove(FullVersion.Length - 1);
using var reader = new StreamReader(stream);
var data = (await reader.ReadToEndAsync()).Split("\n");
FullVersion = data[0];
Timestamp = data[1];
IsDev = data[2] == "";
// show only short commit hash to users
Version = FullVersion.Remove(7);

View file

@ -31,29 +31,16 @@ pub async fn discord_state(State(ctx): State<ApiContext>) -> Json<Value> {
}
pub async fn meta(State(ctx): State<ApiContext>) -> Json<Value> {
let cluster_stats = ctx
.redis
.hgetall::<HashMap<String, String>, &str>("pluralkit:cluster_stats")
.await
.unwrap()
.values()
.map(|v| serde_json::from_str(v).unwrap())
.collect::<Vec<ClusterStats>>();
let stats = serde_json::from_str::<Value>(
ctx.redis
.get::<String, &'static str>("statsapi")
.await
.unwrap()
.as_str(),
)
.unwrap();
let db_stats = libpk::db::repository::get_stats(&ctx.db).await.unwrap();
let guild_count: i32 = cluster_stats.iter().map(|v| v.guild_count).sum();
let channel_count: i32 = cluster_stats.iter().map(|v| v.channel_count).sum();
Json(json!({
"system_count": db_stats.system_count,
"member_count": db_stats.member_count,
"group_count": db_stats.group_count,
"switch_count": db_stats.switch_count,
"message_count": db_stats.message_count,
"guild_count": guild_count,
"channel_count": channel_count,
}))
Json(stats)
}
use std::time::Duration;

View file

@ -12,6 +12,7 @@ fred = { workspace = true }
metrics = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

View file

@ -74,7 +74,6 @@ async fn real_main() -> anyhow::Result<()> {
"message stats updater",
update_db_message_meta
);
// every minute
doforever!("* * * * *", "discord stats updater", update_discord_stats);
// on :00 and :30
doforever!(
@ -82,6 +81,7 @@ async fn real_main() -> anyhow::Result<()> {
"queue deleted image cleanup job",
queue_deleted_image_cleanup
);
doforever!("0,30 * * * * *", "stats api updater", update_stats_api);
set.join_next()
.await

View file

@ -149,3 +149,122 @@ select id, now() from images where
.await?;
Ok(())
}
pub async fn update_stats_api(ctx: AppCtx) -> anyhow::Result<()> {
let client = ClientBuilder::new()
.connect_timeout(Duration::from_secs(3))
.timeout(Duration::from_secs(3))
.build()
.expect("error making client");
#[derive(serde::Deserialize, Debug)]
struct PrometheusResult {
data: PrometheusResultData,
}
#[derive(serde::Deserialize, Debug)]
struct PrometheusResultData {
result: Vec<PrometheusData>,
}
#[derive(serde::Deserialize, Debug)]
struct PrometheusData {
value: Vec<serde_json::Value>,
}
macro_rules! prom_instant_query {
($t:ty, $q:expr) => {{
let resp = client
.get(format!(
"http://vm.svc.pluralkit.net/select/0/prometheus/api/v1/query?query={}",
$q
))
.send()
.await?;
let data = resp.json::<PrometheusResult>().await?;
data.data
.result
.get(0)
.expect("missing data")
.value
.clone()
.get(1)
.expect("missing data")
.as_str()
.expect("invalid data")
.parse::<$t>()?
}};
($t:ty, $q:expr, $wrap:expr) => {{
let val = prom_instant_query!($t, $q);
let val = (val * $wrap).round() / $wrap;
format!("{:.2}", val).parse::<f64>().unwrap()
}};
}
#[derive(serde::Serialize, sqlx::FromRow)]
struct DbStats {
systems: i64,
members: i64,
groups: i64,
switches: i64,
messages: i64,
messages_24h: i64,
guilds: i64,
channels: i64,
}
let db_stats: DbStats = sqlx::query_as(r#"
select
t1.value as systems,
t2.value as members,
t3.value as groups,
t4.value as switches,
t5.value as messages,
(t5.value - t6.value) as messages_24h,
t7.value as guilds,
t8.value as channels
from
(select value from systems order by timestamp desc limit 1) as t1,
(select value from members order by timestamp desc limit 1) as t2,
(select value from groups order by timestamp desc limit 1) as t3,
(select value from switches order by timestamp desc limit 1) as t4,
(select value from messages order by timestamp desc limit 1) as t5,
(select value from messages where timestamp > now() - interval '1 day' order by timestamp asc limit 1) as t6,
(select value from guilds order by timestamp desc limit 1) as t7,
(select value from channels order by timestamp desc limit 1) as t8
"#).fetch_one(&ctx.stats).await?;
let data = serde_json::json!({
"db": db_stats,
"prom": {
"messages_1m": prom_instant_query!(f32, "sum(bot__messages_processed_rate1m)", 10.0),
"messages_15m": prom_instant_query!(f32, "sum(bot__messages_processed_rate15m)", 10.0),
"proxy_1m": prom_instant_query!(f32, "sum(bot__messages_proxied_rate1m)", 10.0),
"proxy_15m": prom_instant_query!(f32, "sum(bot__messages_proxied_rate15m)", 10.0),
"commands_1m": prom_instant_query!(f32, "sum(bot__commands_run_rate1m)", 10.0),
"commands_15m": prom_instant_query!(f32, "sum(bot__commands_run_rate15m)", 10.0),
"cpu_total_cores": prom_instant_query!(usize, "sum(host_physical_cpus)"),
"cpu_total_threads": prom_instant_query!(usize, "sum(host_logical_cpus)"),
"cpu_used": prom_instant_query!(f32, "100 * ((sum(host_logical_cpus) - sum(rate(host_cpu_seconds_total{mode=\"idle\"}[1m]))) / sum(host_logical_cpus)) * sum(host_logical_cpus)", 10.0),
"memory_total": prom_instant_query!(i64, "sum(host_memory_total_bytes)").to_string(),
"memory_used": prom_instant_query!(i64, "sum(host_memory_total_bytes) - sum(host_memory_available_bytes)").to_string(),
"nirn_proxy_rps": prom_instant_query!(f32, "sum(rate(nirn_proxy_requests_count))", 10.0),
"nirn_proxy_latency_p90": prom_instant_query!(f32, "histogram_quantile(0.9, sum(rate(nirn_proxy_requests_bucket[5m])) by (le))", 1000.0),
"nirn_proxy_latency_p99": prom_instant_query!(f32, "histogram_quantile(0.99, sum(rate(nirn_proxy_requests_bucket[5m])) by (le))", 1000.0),
"shard_latency_average": prom_instant_query!(f32, "avg(pluralkit_gateway_shard_latency)", 10.0),
}
});
ctx.redis
.set::<(), &str, String>(
"statsapi",
serde_json::to_string(&data).expect("should not fail"),
// Some(fred::types::Expiration::EX(60)),
None,
None,
false,
)
.await?;
Ok(())
}