diff --git a/Cargo.lock b/Cargo.lock index 132356fb..dc949bbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2925,6 +2925,7 @@ dependencies = [ "num-format", "reqwest 0.12.8", "serde", + "serde_json", "sqlx", "tokio", "tracing", diff --git a/PluralKit.Bot/Commands/Misc.cs b/PluralKit.Bot/Commands/Misc.cs index decb574a..97f4f0ca 100644 --- a/PluralKit.Bot/Commands/Misc.cs +++ b/PluralKit.Bot/Commands/Misc.cs @@ -3,12 +3,11 @@ using System.Diagnostics; using App.Metrics; using Myriad.Builders; -using Myriad.Cache; -using Myriad.Gateway; -using Myriad.Rest; using Myriad.Rest.Types.Requests; using Myriad.Types; +using Newtonsoft.Json; + using NodaTime; using PluralKit.Core; @@ -57,78 +56,85 @@ public class Misc var timeAfter = SystemClock.Instance.GetCurrentInstant(); var apiLatency = timeAfter - timeBefore; - var embed = new EmbedBuilder(); - - // todo: these will be inaccurate when the bot is actually multi-process - - var messagesReceived = _metrics.Snapshot.GetForContext("Bot").Meters - .FirstOrDefault(m => m.MultidimensionalName == BotMetrics.MessagesReceived.Name)?.Value; - if (messagesReceived != null) - embed.Field(new Embed.Field("Messages processed", - $"{messagesReceived.OneMinuteRate * 60:F1}/m ({messagesReceived.FifteenMinuteRate * 60:F1}/m over 15m)", - true)); - - var messagesProxied = _metrics.Snapshot.GetForContext("Bot").Meters - .FirstOrDefault(m => m.MultidimensionalName == BotMetrics.MessagesProxied.Name)?.Value; - if (messagesProxied != null) - embed.Field(new Embed.Field("Messages proxied", - $"{messagesProxied.OneMinuteRate * 60:F1}/m ({messagesProxied.FifteenMinuteRate * 60:F1}/m over 15m)", - true)); - - var commandsRun = _metrics.Snapshot.GetForContext("Bot").Meters - .FirstOrDefault(m => m.MultidimensionalName == BotMetrics.CommandsRun.Name)?.Value; - if (commandsRun != null) - embed.Field(new Embed.Field("Commands executed", - $"{commandsRun.OneMinuteRate * 60:F1}/m ({commandsRun.FifteenMinuteRate * 60:F1}/m over 15m)", - true)); - - var isCluster = _botConfig.Cluster != null && _botConfig.Cluster.TotalShards != ctx.Cluster.Shards.Count; - - var counts = await _repo.GetStats(); + var process = Process.GetCurrentProcess(); + var stats = await GetStats(ctx); var shards = await _shards.GetShards(); var shardInfo = shards.Where(s => s.ShardId == ctx.ShardId).FirstOrDefault(); - // todo: if we're running multiple processes, it is not useful to get the CPU/RAM usage of just the current one - var process = Process.GetCurrentProcess(); - var memoryUsage = process.WorkingSet64; - - var now = SystemClock.Instance.GetCurrentInstant().ToUnixTimeSeconds(); - var shardUptime = Duration.FromSeconds(now - shardInfo.LastConnection); - - var shardTotal = _botConfig.Cluster?.TotalShards ?? shards.Count(); - int shardClusterTotal = ctx.Cluster.Shards.Count; - var shardUpTotal = shards.Where(x => x.Up).Count(); + var embed = new EmbedBuilder(); embed - .Field(new Embed.Field("Current shard", - $"Shard #{ctx.ShardId} (of {shardTotal} total," - + (isCluster ? $" {shardClusterTotal} in this cluster," : "") + $" {shardUpTotal} are up)" - , true)) - .Field(new Embed.Field("Shard uptime", - $"{shardUptime.FormatDuration()} ({shardInfo.DisconnectionCount} disconnections)", true)) - .Field(new Embed.Field("CPU usage", $"{_cpu.LastCpuMeasure:P1}", true)) - .Field(new Embed.Field("Memory usage", $"{memoryUsage / 1024 / 1024} MiB", true)) - .Field(new Embed.Field("Latency", - $"API: {apiLatency.TotalMilliseconds:F0} ms, shard: {shardInfo.Latency} ms", - true)); + .Field(new("Connection status", $"**{shards.Count()}** shards across **{shards.Select(s => s.ClusterId).Distinct().Count()}** clusters\n" + + $"Current server is on **shard {ctx.ShardId} (cluster {shardInfo.ClusterId ?? 0})**\n" + + $"Latency: API **{apiLatency.TotalMilliseconds:F0}ms** (p90: {stats.prom.nirn_proxy_latency_p90 * 1000:F0}ms, p99: {stats.prom.nirn_proxy_latency_p99 * 1000:F0}ms), " + + $"shard **{shardInfo.Latency}ms** (avg: {stats.prom.shard_latency_average}ms)", true)) + .Field(new("Resource usage", $"**CPU:** {stats.prom.cpu_used}% used / {stats.prom.cpu_total_cores} total cores ({stats.prom.cpu_total_threads} threads)\n" + + $"**Memory:** {(stats.prom.memory_used / 1_000_000_000):N1}GB used / {(stats.prom.memory_total / 1_000_000_000):N1}GB total", true)) + .Field(new("Usage metrics", $"Messages received: **{stats.prom.messages_1m}/s** ({stats.prom.messages_15m}/s over 15m)\n" + + $"Messages proxied: **{stats.prom.proxy_1m}/s** ({stats.prom.proxy_15m}/s over 15m, {stats.db.messages_24h} total in last 24h)\n" + + $"Commands executed: **{stats.prom.commands_1m}/m** ({stats.prom.commands_15m}/m over 15m)")); - embed.Field(new("Total numbers", $" {counts.SystemCount:N0} systems," - + $" {counts.MemberCount:N0} members," - + $" {counts.GroupCount:N0} groups," - + $" {counts.SwitchCount:N0} switches," - + $" {counts.MessageCount:N0} messages")); + embed.Field(new("Total numbers", $"**{stats.db.systems:N0}** systems, **{stats.db.members:N0}** members, **{stats.db.groups:N0}** groups, " + + $"**${stats.db.switches:N0}** switches, **{stats.db.messages:N0}** messages\n" + + $"**{stats.db.guilds:N0}** servers with **{stats.db.channels:N0}** channels")); - embed - .Footer(new(String.Join(" \u2022 ", new[] { - $"PluralKit {BuildInfoService.Version}", - (isCluster ? $"Cluster {_botConfig.Cluster.NodeIndex}" : ""), - "https://github.com/PluralKit/PluralKit", - "Last restarted:", - }))) - .Timestamp(process.StartTime.ToString("O")); + embed.Footer(Help.helpEmbed.Footer); + + var uptime = ((DateTimeOffset)process.StartTime).ToUnixTimeSeconds(); + embed.Description($"### PluralKit [{BuildInfoService.Version}](https://github.com/pluralkit/pluralkit/commit/{BuildInfoService.FullVersion})\n" + + $"Built on ()" + + (BuildInfoService.IsDev ? ", **development build**" : "") + + $"\nLast restart: "); await ctx.Rest.EditMessage(msg.ChannelId, msg.Id, new MessageEditRequest { Content = "", Embeds = new[] { embed.Build() } }); } -} \ No newline at end of file + + private async Task GetStats(Context ctx) + { + var db = ctx.Redis.Connection.GetDatabase(); + var data = await db.StringGetAsync("statsapi"); + return JsonConvert.DeserializeObject(data); + } +} + +// none of these fields are "assigned to" for some reason +#pragma warning disable CS0649 + +class Stats +{ + public DbStats db; + public PrometheusStats prom; +}; + +class DbStats +{ + public double systems; + public double members; + public double groups; + public double switches; + public double messages; + public double messages_24h; + public double guilds; + public double channels; +}; + +class PrometheusStats +{ + public double messages_1m; + public double messages_15m; + public double proxy_1m; + public double proxy_15m; + public double commands_1m; + public double commands_15m; + public double cpu_total_cores; + public double cpu_total_threads; + public double cpu_used; + public double memory_total; + public double memory_used; + public double nirn_proxy_rps; + public double nirn_proxy_latency_p90; + public double nirn_proxy_latency_p99; + public double shard_latency_average; +}; \ No newline at end of file diff --git a/PluralKit.Core/PluralKit.Core.csproj b/PluralKit.Core/PluralKit.Core.csproj index d9109ebd..c0bdd134 100644 --- a/PluralKit.Core/PluralKit.Core.csproj +++ b/PluralKit.Core/PluralKit.Core.csproj @@ -57,7 +57,7 @@ - + diff --git a/PluralKit.Core/Services/BuildInfoService.cs b/PluralKit.Core/Services/BuildInfoService.cs index 33466a71..5ab99adc 100644 --- a/PluralKit.Core/Services/BuildInfoService.cs +++ b/PluralKit.Core/Services/BuildInfoService.cs @@ -4,20 +4,21 @@ public static class BuildInfoService { public static string Version { get; private set; } public static string FullVersion { get; private set; } + public static string Timestamp { get; private set; } + public static bool IsDev { get; private set; } public static async Task LoadVersion() { - using (var stream = typeof(BuildInfoService).Assembly.GetManifestResourceStream("version")) - { - // if this happens, something broke - if (stream == null) FullVersion = "(unknown version) "; - else - using (var reader = new StreamReader(stream)) - FullVersion = await reader.ReadToEndAsync(); - } + using var stream = typeof(BuildInfoService).Assembly.GetManifestResourceStream("version"); + if (stream == null) throw new Exception("missing version information"); - // cheap hack to remove newline - FullVersion = FullVersion.Remove(FullVersion.Length - 1); + using var reader = new StreamReader(stream); + var data = (await reader.ReadToEndAsync()).Split("\n"); + + FullVersion = data[0]; + Timestamp = data[1]; + + IsDev = data[2] == ""; // show only short commit hash to users Version = FullVersion.Remove(7); diff --git a/crates/api/src/endpoints/private.rs b/crates/api/src/endpoints/private.rs index 5d5049e9..ad76e275 100644 --- a/crates/api/src/endpoints/private.rs +++ b/crates/api/src/endpoints/private.rs @@ -31,29 +31,16 @@ pub async fn discord_state(State(ctx): State) -> Json { } pub async fn meta(State(ctx): State) -> Json { - let cluster_stats = ctx - .redis - .hgetall::, &str>("pluralkit:cluster_stats") - .await - .unwrap() - .values() - .map(|v| serde_json::from_str(v).unwrap()) - .collect::>(); + let stats = serde_json::from_str::( + ctx.redis + .get::("statsapi") + .await + .unwrap() + .as_str(), + ) + .unwrap(); - let db_stats = libpk::db::repository::get_stats(&ctx.db).await.unwrap(); - - let guild_count: i32 = cluster_stats.iter().map(|v| v.guild_count).sum(); - let channel_count: i32 = cluster_stats.iter().map(|v| v.channel_count).sum(); - - Json(json!({ - "system_count": db_stats.system_count, - "member_count": db_stats.member_count, - "group_count": db_stats.group_count, - "switch_count": db_stats.switch_count, - "message_count": db_stats.message_count, - "guild_count": guild_count, - "channel_count": channel_count, - })) + Json(stats) } use std::time::Duration; diff --git a/crates/scheduled_tasks/Cargo.toml b/crates/scheduled_tasks/Cargo.toml index 6eda3331..e554edd3 100644 --- a/crates/scheduled_tasks/Cargo.toml +++ b/crates/scheduled_tasks/Cargo.toml @@ -12,6 +12,7 @@ fred = { workspace = true } metrics = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } sqlx = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/crates/scheduled_tasks/src/main.rs b/crates/scheduled_tasks/src/main.rs index 1f4b28db..fc5e68ed 100644 --- a/crates/scheduled_tasks/src/main.rs +++ b/crates/scheduled_tasks/src/main.rs @@ -74,7 +74,6 @@ async fn real_main() -> anyhow::Result<()> { "message stats updater", update_db_message_meta ); - // every minute doforever!("* * * * *", "discord stats updater", update_discord_stats); // on :00 and :30 doforever!( @@ -82,6 +81,7 @@ async fn real_main() -> anyhow::Result<()> { "queue deleted image cleanup job", queue_deleted_image_cleanup ); + doforever!("0,30 * * * * *", "stats api updater", update_stats_api); set.join_next() .await diff --git a/crates/scheduled_tasks/src/tasks.rs b/crates/scheduled_tasks/src/tasks.rs index 8a39680b..62ec304b 100644 --- a/crates/scheduled_tasks/src/tasks.rs +++ b/crates/scheduled_tasks/src/tasks.rs @@ -149,3 +149,122 @@ select id, now() from images where .await?; Ok(()) } + +pub async fn update_stats_api(ctx: AppCtx) -> anyhow::Result<()> { + let client = ClientBuilder::new() + .connect_timeout(Duration::from_secs(3)) + .timeout(Duration::from_secs(3)) + .build() + .expect("error making client"); + + #[derive(serde::Deserialize, Debug)] + struct PrometheusResult { + data: PrometheusResultData, + } + #[derive(serde::Deserialize, Debug)] + struct PrometheusResultData { + result: Vec, + } + #[derive(serde::Deserialize, Debug)] + struct PrometheusData { + value: Vec, + } + + macro_rules! prom_instant_query { + ($t:ty, $q:expr) => {{ + let resp = client + .get(format!( + "http://vm.svc.pluralkit.net/select/0/prometheus/api/v1/query?query={}", + $q + )) + .send() + .await?; + + let data = resp.json::().await?; + + data.data + .result + .get(0) + .expect("missing data") + .value + .clone() + .get(1) + .expect("missing data") + .as_str() + .expect("invalid data") + .parse::<$t>()? + }}; + ($t:ty, $q:expr, $wrap:expr) => {{ + let val = prom_instant_query!($t, $q); + let val = (val * $wrap).round() / $wrap; + format!("{:.2}", val).parse::().unwrap() + }}; + } + + #[derive(serde::Serialize, sqlx::FromRow)] + struct DbStats { + systems: i64, + members: i64, + groups: i64, + switches: i64, + messages: i64, + messages_24h: i64, + guilds: i64, + channels: i64, + } + + let db_stats: DbStats = sqlx::query_as(r#" + select + t1.value as systems, + t2.value as members, + t3.value as groups, + t4.value as switches, + t5.value as messages, + (t5.value - t6.value) as messages_24h, + t7.value as guilds, + t8.value as channels + from + (select value from systems order by timestamp desc limit 1) as t1, + (select value from members order by timestamp desc limit 1) as t2, + (select value from groups order by timestamp desc limit 1) as t3, + (select value from switches order by timestamp desc limit 1) as t4, + (select value from messages order by timestamp desc limit 1) as t5, + (select value from messages where timestamp > now() - interval '1 day' order by timestamp asc limit 1) as t6, + (select value from guilds order by timestamp desc limit 1) as t7, + (select value from channels order by timestamp desc limit 1) as t8 + "#).fetch_one(&ctx.stats).await?; + + let data = serde_json::json!({ + "db": db_stats, + "prom": { + "messages_1m": prom_instant_query!(f32, "sum(bot__messages_processed_rate1m)", 10.0), + "messages_15m": prom_instant_query!(f32, "sum(bot__messages_processed_rate15m)", 10.0), + "proxy_1m": prom_instant_query!(f32, "sum(bot__messages_proxied_rate1m)", 10.0), + "proxy_15m": prom_instant_query!(f32, "sum(bot__messages_proxied_rate15m)", 10.0), + "commands_1m": prom_instant_query!(f32, "sum(bot__commands_run_rate1m)", 10.0), + "commands_15m": prom_instant_query!(f32, "sum(bot__commands_run_rate15m)", 10.0), + "cpu_total_cores": prom_instant_query!(usize, "sum(host_physical_cpus)"), + "cpu_total_threads": prom_instant_query!(usize, "sum(host_logical_cpus)"), + "cpu_used": prom_instant_query!(f32, "100 * ((sum(host_logical_cpus) - sum(rate(host_cpu_seconds_total{mode=\"idle\"}[1m]))) / sum(host_logical_cpus)) * sum(host_logical_cpus)", 10.0), + "memory_total": prom_instant_query!(i64, "sum(host_memory_total_bytes)").to_string(), + "memory_used": prom_instant_query!(i64, "sum(host_memory_total_bytes) - sum(host_memory_available_bytes)").to_string(), + "nirn_proxy_rps": prom_instant_query!(f32, "sum(rate(nirn_proxy_requests_count))", 10.0), + "nirn_proxy_latency_p90": prom_instant_query!(f32, "histogram_quantile(0.9, sum(rate(nirn_proxy_requests_bucket[5m])) by (le))", 1000.0), + "nirn_proxy_latency_p99": prom_instant_query!(f32, "histogram_quantile(0.99, sum(rate(nirn_proxy_requests_bucket[5m])) by (le))", 1000.0), + "shard_latency_average": prom_instant_query!(f32, "avg(pluralkit_gateway_shard_latency)", 10.0), + } + }); + + ctx.redis + .set::<(), &str, String>( + "statsapi", + serde_json::to_string(&data).expect("should not fail"), + // Some(fred::types::Expiration::EX(60)), + None, + None, + false, + ) + .await?; + + Ok(()) +}