From 2578eb0e3ca759f75e8257f7fc980e6dfa1bdbaf Mon Sep 17 00:00:00 2001 From: alyssa Date: Fri, 4 Apr 2025 11:06:16 +0000 Subject: [PATCH] feat: send events from gateway to bot over http --- Cargo.lock | 1 + PluralKit.Bot/Bot.cs | 8 ++- PluralKit.Bot/Services/HttpListenerService.cs | 54 +++++++++++++++++-- crates/gateway/Cargo.toml | 1 + crates/gateway/src/discord/gateway.rs | 42 +++++++++++++-- crates/gateway/src/main.rs | 36 ++++++++++++- crates/libpk/src/lib.rs | 1 + crates/libpk/src/runtime_config.rs | 6 +-- 8 files changed, 134 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee181b20..a4a5e80c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1088,6 +1088,7 @@ dependencies = [ "lazy_static", "libpk", "metrics", + "reqwest 0.12.8", "serde_json", "serde_variant", "signal-hook", diff --git a/PluralKit.Bot/Bot.cs b/PluralKit.Bot/Bot.cs index d74428ee..46e4e4a1 100644 --- a/PluralKit.Bot/Bot.cs +++ b/PluralKit.Bot/Bot.cs @@ -32,13 +32,14 @@ public class Bot private readonly DiscordApiClient _rest; private readonly RedisService _redis; private readonly ILifetimeScope _services; + private readonly RuntimeConfigService _runtimeConfig; private Timer _periodicTask; // Never read, just kept here for GC reasons public Bot(ILifetimeScope services, ILogger logger, PeriodicStatCollector collector, IMetrics metrics, BotConfig config, RedisService redis, ErrorMessageService errorMessageService, CommandMessageService commandMessageService, - Cluster cluster, DiscordApiClient rest, IDiscordCache cache) + Cluster cluster, DiscordApiClient rest, IDiscordCache cache, RuntimeConfigService runtimeConfig) { _logger = logger.ForContext(); _services = services; @@ -51,6 +52,7 @@ public class Bot _rest = rest; _redis = redis; _cache = cache; + _runtimeConfig = runtimeConfig; } private string BotStatus => $"{(_config.Prefixes ?? BotConfig.DefaultPrefixes)[0]}help" @@ -97,13 +99,15 @@ public class Bot private async Task OnEventReceived(int shardId, IGatewayEvent evt) { + if (_runtimeConfig.Exists("disable_events")) return; + // we HandleGatewayEvent **before** getting the own user, because the own user is set in HandleGatewayEvent for ReadyEvent await _cache.HandleGatewayEvent(evt); await _cache.TryUpdateSelfMember(_config.ClientId, evt); await OnEventReceivedInner(shardId, evt); } - private async Task OnEventReceivedInner(int shardId, IGatewayEvent evt) + public async Task OnEventReceivedInner(int shardId, IGatewayEvent evt) { // HandleEvent takes a type parameter, automatically inferred by the event type // It will then look up an IEventHandler in the DI container and call that object's handler method diff --git a/PluralKit.Bot/Services/HttpListenerService.cs b/PluralKit.Bot/Services/HttpListenerService.cs index 73a1f907..79573625 100644 --- a/PluralKit.Bot/Services/HttpListenerService.cs +++ b/PluralKit.Bot/Services/HttpListenerService.cs @@ -1,21 +1,26 @@ -using Serilog; +using System.Text.Json; -using Newtonsoft.Json; +using Serilog; using WatsonWebserver.Lite; using WatsonWebserver.Core; +using Myriad.Gateway; +using Myriad.Serialization; + namespace PluralKit.Bot; public class HttpListenerService { private readonly ILogger _logger; private readonly RuntimeConfigService _runtimeConfig; + private readonly Bot _bot; - public HttpListenerService(ILogger logger, RuntimeConfigService runtimeConfig) + public HttpListenerService(ILogger logger, RuntimeConfigService runtimeConfig, Bot bot) { _logger = logger.ForContext(); _runtimeConfig = runtimeConfig; + _bot = bot; } public void Start(string host) @@ -26,6 +31,8 @@ public class HttpListenerService server.Routes.PreAuthentication.Parameter.Add(WatsonWebserver.Core.HttpMethod.POST, "/runtime_config/{key}", RuntimeConfigSet); server.Routes.PreAuthentication.Parameter.Add(WatsonWebserver.Core.HttpMethod.DELETE, "/runtime_config/{key}", RuntimeConfigDelete); + server.Routes.PreAuthentication.Parameter.Add(WatsonWebserver.Core.HttpMethod.POST, "/events/{shard_id}", GatewayEvent); + server.Start(); } @@ -36,7 +43,7 @@ public class HttpListenerService { var config = _runtimeConfig.GetAll(); ctx.Response.Headers.Add("content-type", "application/json"); - await ctx.Response.Send(JsonConvert.SerializeObject(config)); + await ctx.Response.Send(JsonSerializer.Serialize(config)); } private async Task RuntimeConfigSet(HttpContextBase ctx) @@ -53,4 +60,43 @@ public class HttpListenerService await _runtimeConfig.Delete(key); await RuntimeConfigGet(ctx); } + + private JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions().ConfigureForMyriad(); + + private async Task GatewayEvent(HttpContextBase ctx) + { + var shardIdString = ctx.Request.Url.Parameters["shard_id"]; + if (!int.TryParse(shardIdString, out var shardId)) return; + + var packet = JsonSerializer.Deserialize(ctx.Request.DataAsString, _jsonSerializerOptions); + var evt = DeserializeEvent(shardId, packet.EventType!, (JsonElement)packet.Payload!); + if (evt != null) + { + await _bot.OnEventReceivedInner(shardId, evt); + } + await ctx.Response.Send("a"); + } + + private IGatewayEvent? DeserializeEvent(int shardId, string eventType, JsonElement payload) + { + if (!IGatewayEvent.EventTypes.TryGetValue(eventType, out var clrType)) + { + _logger.Debug("Shard {ShardId}: Received unknown event type {EventType}", shardId, eventType); + return null; + } + + try + { + _logger.Verbose("Shard {ShardId}: Deserializing {EventType} to {ClrType}", shardId, eventType, + clrType); + return JsonSerializer.Deserialize(payload.GetRawText(), clrType, _jsonSerializerOptions) + as IGatewayEvent; + } + catch (JsonException e) + { + _logger.Error(e, "Shard {ShardId}: Error deserializing event {EventType} to {ClrType}", shardId, + eventType, clrType); + return null; + } + } } \ No newline at end of file diff --git a/crates/gateway/Cargo.toml b/crates/gateway/Cargo.toml index bde62f22..e9fd444e 100644 --- a/crates/gateway/Cargo.toml +++ b/crates/gateway/Cargo.toml @@ -13,6 +13,7 @@ futures = { workspace = true } lazy_static = { workspace = true } libpk = { path = "../libpk" } metrics = { workspace = true } +reqwest = { workspace = true } serde_json = { workspace = true } signal-hook = { workspace = true } tokio = { workspace = true } diff --git a/crates/gateway/src/discord/gateway.rs b/crates/gateway/src/discord/gateway.rs index 6aedca23..c4a13a81 100644 --- a/crates/gateway/src/discord/gateway.rs +++ b/crates/gateway/src/discord/gateway.rs @@ -1,7 +1,8 @@ use futures::StreamExt; -use libpk::_config::ClusterSettings; +use libpk::{_config::ClusterSettings, runtime_config::RuntimeConfig}; use metrics::counter; -use std::sync::{mpsc::Sender, Arc}; +use std::sync::Arc; +use tokio::sync::mpsc::Sender; use tracing::{error, info, warn}; use twilight_gateway::{ create_iterator, ConfigBuilder, Event, EventTypeFlags, Message, Shard, ShardId, @@ -12,7 +13,10 @@ use twilight_model::gateway::{ Intents, }; -use crate::discord::identify_queue::{self, RedisQueue}; +use crate::{ + discord::identify_queue::{self, RedisQueue}, + RUNTIME_CONFIG_KEY_EVENT_TARGET, +}; use super::{cache::DiscordCache, shard_state::ShardStateManager}; @@ -78,13 +82,20 @@ pub fn create_shards(redis: fred::clients::RedisPool) -> anyhow::Result, - _tx: Sender<(ShardId, String)>, + tx: Sender<(ShardId, String)>, shard_state: ShardStateManager, cache: Arc, + runtime_config: Arc, ) { // let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered(); let shard_id = shard.id().number(); + let our_user_id = libpk::config + .discord + .as_ref() + .expect("missing discord config") + .client_id; + info!("waiting for events"); while let Some(item) = shard.next().await { let raw_event = match item { @@ -165,7 +176,28 @@ pub async fn runner( cache.0.update(&event); // okay, we've handled the event internally, let's send it to consumers - // tx.send((shard.id(), raw_event)).unwrap(); + + // some basic filtering here is useful + // we can't use if matching using the | operator, so anything matched does nothing + // and the default match skips the next block (continues to the next event) + match event { + Event::InteractionCreate(_) => {} + Event::MessageCreate(m) if m.author.id != our_user_id => {} + Event::MessageUpdate(m) + if let Some(author) = m.author.clone() + && author.id != our_user_id + && !author.bot => {} + Event::MessageDelete(_) => {} + Event::MessageDeleteBulk(_) => {} + Event::ReactionAdd(r) if r.user_id != our_user_id => {} + _ => { + continue; + } + } + + if runtime_config.exists(RUNTIME_CONFIG_KEY_EVENT_TARGET).await { + tx.send((shard.id(), raw_event)).await.unwrap(); + } } } diff --git a/crates/gateway/src/main.rs b/crates/gateway/src/main.rs index 254ce504..cc360606 100644 --- a/crates/gateway/src/main.rs +++ b/crates/gateway/src/main.rs @@ -5,6 +5,7 @@ use chrono::Timelike; use discord::gateway::cluster_config; use fred::{clients::RedisPool, interfaces::*}; use libpk::runtime_config::RuntimeConfig; +use reqwest::ClientBuilder; use signal_hook::{ consts::{SIGINT, SIGTERM}, iterator::Signals, @@ -15,7 +16,7 @@ use std::{ vec::Vec, }; use tokio::task::JoinSet; -use tracing::{info, warn}; +use tracing::{error, info, warn}; use twilight_gateway::{MessageSender, ShardId}; use twilight_model::gateway::payload::outgoing::UpdatePresence; @@ -23,6 +24,8 @@ mod cache_api; mod discord; mod logger; +const RUNTIME_CONFIG_KEY_EVENT_TARGET: &'static str = "event_target"; + libpk::main!("gateway"); async fn real_main() -> anyhow::Result<()> { let (shutdown_tx, shutdown_rx) = channel::<()>(); @@ -57,9 +60,40 @@ async fn real_main() -> anyhow::Result<()> { event_tx.clone(), shard_state.clone(), cache.clone(), + runtime_config.clone(), ))); } + set.spawn(tokio::spawn({ + let runtime_config = runtime_config.clone(); + async move { + let client = Arc::new(ClientBuilder::new() + .connect_timeout(Duration::from_secs(1)) + .timeout(Duration::from_secs(1)) + .build() + .expect("error making client")); + + while let Some((shard_id, event)) = event_rx.recv().await { + let target = runtime_config.get(RUNTIME_CONFIG_KEY_EVENT_TARGET).await; + if let Some(target) = target { + tokio::spawn({ + let client = client.clone(); + async move { + if let Err(error) = client + .post(format!("{target}/{}", shard_id.number())) + .body(event) + .send() + .await + { + error!(error = ?error, "failed to request event target") + } + } + }); + } + } + } + })); + set.spawn(tokio::spawn( async move { scheduled_task(redis, senders).await }, )); diff --git a/crates/libpk/src/lib.rs b/crates/libpk/src/lib.rs index 1864e332..10d174d0 100644 --- a/crates/libpk/src/lib.rs +++ b/crates/libpk/src/lib.rs @@ -43,6 +43,7 @@ pub fn init_logging(component: &str) { tracing_subscriber::registry() .with(sentry_layer) .with(tracing_subscriber::fmt::layer()) + .with(EnvFilter::from_default_env()) .init(); } } diff --git a/crates/libpk/src/runtime_config.rs b/crates/libpk/src/runtime_config.rs index 739818e4..e1fbea7a 100644 --- a/crates/libpk/src/runtime_config.rs +++ b/crates/libpk/src/runtime_config.rs @@ -33,7 +33,7 @@ impl RuntimeConfig { settings.insert(key, value); } - info!("starting with runtime config: {:?}", self.settings); + info!("starting with runtime config: {:?}", settings); Ok(()) } @@ -58,8 +58,8 @@ impl RuntimeConfig { Ok(()) } - pub async fn get(&self, key: String) -> Option { - self.settings.read().await.get(&key).cloned() + pub async fn get(&self, key: &str) -> Option { + self.settings.read().await.get(key).cloned() } pub async fn exists(&self, key: &str) -> bool {