feat: send events from gateway to bot over http

This commit is contained in:
alyssa 2025-04-04 11:06:16 +00:00
parent a72afb35a0
commit 2578eb0e3c
8 changed files with 134 additions and 15 deletions

1
Cargo.lock generated
View file

@ -1088,6 +1088,7 @@ dependencies = [
"lazy_static",
"libpk",
"metrics",
"reqwest 0.12.8",
"serde_json",
"serde_variant",
"signal-hook",

View file

@ -32,13 +32,14 @@ public class Bot
private readonly DiscordApiClient _rest;
private readonly RedisService _redis;
private readonly ILifetimeScope _services;
private readonly RuntimeConfigService _runtimeConfig;
private Timer _periodicTask; // Never read, just kept here for GC reasons
public Bot(ILifetimeScope services, ILogger logger, PeriodicStatCollector collector, IMetrics metrics,
BotConfig config, RedisService redis,
ErrorMessageService errorMessageService, CommandMessageService commandMessageService,
Cluster cluster, DiscordApiClient rest, IDiscordCache cache)
Cluster cluster, DiscordApiClient rest, IDiscordCache cache, RuntimeConfigService runtimeConfig)
{
_logger = logger.ForContext<Bot>();
_services = services;
@ -51,6 +52,7 @@ public class Bot
_rest = rest;
_redis = redis;
_cache = cache;
_runtimeConfig = runtimeConfig;
}
private string BotStatus => $"{(_config.Prefixes ?? BotConfig.DefaultPrefixes)[0]}help"
@ -97,13 +99,15 @@ public class Bot
private async Task OnEventReceived(int shardId, IGatewayEvent evt)
{
if (_runtimeConfig.Exists("disable_events")) return;
// we HandleGatewayEvent **before** getting the own user, because the own user is set in HandleGatewayEvent for ReadyEvent
await _cache.HandleGatewayEvent(evt);
await _cache.TryUpdateSelfMember(_config.ClientId, evt);
await OnEventReceivedInner(shardId, evt);
}
private async Task OnEventReceivedInner(int shardId, IGatewayEvent evt)
public async Task OnEventReceivedInner(int shardId, IGatewayEvent evt)
{
// HandleEvent takes a type parameter, automatically inferred by the event type
// It will then look up an IEventHandler<TypeOfEvent> in the DI container and call that object's handler method

View file

@ -1,21 +1,26 @@
using Serilog;
using System.Text.Json;
using Newtonsoft.Json;
using Serilog;
using WatsonWebserver.Lite;
using WatsonWebserver.Core;
using Myriad.Gateway;
using Myriad.Serialization;
namespace PluralKit.Bot;
public class HttpListenerService
{
private readonly ILogger _logger;
private readonly RuntimeConfigService _runtimeConfig;
private readonly Bot _bot;
public HttpListenerService(ILogger logger, RuntimeConfigService runtimeConfig)
public HttpListenerService(ILogger logger, RuntimeConfigService runtimeConfig, Bot bot)
{
_logger = logger.ForContext<HttpListenerService>();
_runtimeConfig = runtimeConfig;
_bot = bot;
}
public void Start(string host)
@ -26,6 +31,8 @@ public class HttpListenerService
server.Routes.PreAuthentication.Parameter.Add(WatsonWebserver.Core.HttpMethod.POST, "/runtime_config/{key}", RuntimeConfigSet);
server.Routes.PreAuthentication.Parameter.Add(WatsonWebserver.Core.HttpMethod.DELETE, "/runtime_config/{key}", RuntimeConfigDelete);
server.Routes.PreAuthentication.Parameter.Add(WatsonWebserver.Core.HttpMethod.POST, "/events/{shard_id}", GatewayEvent);
server.Start();
}
@ -36,7 +43,7 @@ public class HttpListenerService
{
var config = _runtimeConfig.GetAll();
ctx.Response.Headers.Add("content-type", "application/json");
await ctx.Response.Send(JsonConvert.SerializeObject(config));
await ctx.Response.Send(JsonSerializer.Serialize(config));
}
private async Task RuntimeConfigSet(HttpContextBase ctx)
@ -53,4 +60,43 @@ public class HttpListenerService
await _runtimeConfig.Delete(key);
await RuntimeConfigGet(ctx);
}
private JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions().ConfigureForMyriad();
private async Task GatewayEvent(HttpContextBase ctx)
{
var shardIdString = ctx.Request.Url.Parameters["shard_id"];
if (!int.TryParse(shardIdString, out var shardId)) return;
var packet = JsonSerializer.Deserialize<GatewayPacket>(ctx.Request.DataAsString, _jsonSerializerOptions);
var evt = DeserializeEvent(shardId, packet.EventType!, (JsonElement)packet.Payload!);
if (evt != null)
{
await _bot.OnEventReceivedInner(shardId, evt);
}
await ctx.Response.Send("a");
}
private IGatewayEvent? DeserializeEvent(int shardId, string eventType, JsonElement payload)
{
if (!IGatewayEvent.EventTypes.TryGetValue(eventType, out var clrType))
{
_logger.Debug("Shard {ShardId}: Received unknown event type {EventType}", shardId, eventType);
return null;
}
try
{
_logger.Verbose("Shard {ShardId}: Deserializing {EventType} to {ClrType}", shardId, eventType,
clrType);
return JsonSerializer.Deserialize(payload.GetRawText(), clrType, _jsonSerializerOptions)
as IGatewayEvent;
}
catch (JsonException e)
{
_logger.Error(e, "Shard {ShardId}: Error deserializing event {EventType} to {ClrType}", shardId,
eventType, clrType);
return null;
}
}
}

View file

@ -13,6 +13,7 @@ futures = { workspace = true }
lazy_static = { workspace = true }
libpk = { path = "../libpk" }
metrics = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
signal-hook = { workspace = true }
tokio = { workspace = true }

View file

@ -1,7 +1,8 @@
use futures::StreamExt;
use libpk::_config::ClusterSettings;
use libpk::{_config::ClusterSettings, runtime_config::RuntimeConfig};
use metrics::counter;
use std::sync::{mpsc::Sender, Arc};
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use tracing::{error, info, warn};
use twilight_gateway::{
create_iterator, ConfigBuilder, Event, EventTypeFlags, Message, Shard, ShardId,
@ -12,7 +13,10 @@ use twilight_model::gateway::{
Intents,
};
use crate::discord::identify_queue::{self, RedisQueue};
use crate::{
discord::identify_queue::{self, RedisQueue},
RUNTIME_CONFIG_KEY_EVENT_TARGET,
};
use super::{cache::DiscordCache, shard_state::ShardStateManager};
@ -78,13 +82,20 @@ pub fn create_shards(redis: fred::clients::RedisPool) -> anyhow::Result<Vec<Shar
pub async fn runner(
mut shard: Shard<RedisQueue>,
_tx: Sender<(ShardId, String)>,
tx: Sender<(ShardId, String)>,
shard_state: ShardStateManager,
cache: Arc<DiscordCache>,
runtime_config: Arc<RuntimeConfig>,
) {
// let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered();
let shard_id = shard.id().number();
let our_user_id = libpk::config
.discord
.as_ref()
.expect("missing discord config")
.client_id;
info!("waiting for events");
while let Some(item) = shard.next().await {
let raw_event = match item {
@ -165,7 +176,28 @@ pub async fn runner(
cache.0.update(&event);
// okay, we've handled the event internally, let's send it to consumers
// tx.send((shard.id(), raw_event)).unwrap();
// some basic filtering here is useful
// we can't use if matching using the | operator, so anything matched does nothing
// and the default match skips the next block (continues to the next event)
match event {
Event::InteractionCreate(_) => {}
Event::MessageCreate(m) if m.author.id != our_user_id => {}
Event::MessageUpdate(m)
if let Some(author) = m.author.clone()
&& author.id != our_user_id
&& !author.bot => {}
Event::MessageDelete(_) => {}
Event::MessageDeleteBulk(_) => {}
Event::ReactionAdd(r) if r.user_id != our_user_id => {}
_ => {
continue;
}
}
if runtime_config.exists(RUNTIME_CONFIG_KEY_EVENT_TARGET).await {
tx.send((shard.id(), raw_event)).await.unwrap();
}
}
}

View file

@ -5,6 +5,7 @@ use chrono::Timelike;
use discord::gateway::cluster_config;
use fred::{clients::RedisPool, interfaces::*};
use libpk::runtime_config::RuntimeConfig;
use reqwest::ClientBuilder;
use signal_hook::{
consts::{SIGINT, SIGTERM},
iterator::Signals,
@ -15,7 +16,7 @@ use std::{
vec::Vec,
};
use tokio::task::JoinSet;
use tracing::{info, warn};
use tracing::{error, info, warn};
use twilight_gateway::{MessageSender, ShardId};
use twilight_model::gateway::payload::outgoing::UpdatePresence;
@ -23,6 +24,8 @@ mod cache_api;
mod discord;
mod logger;
const RUNTIME_CONFIG_KEY_EVENT_TARGET: &'static str = "event_target";
libpk::main!("gateway");
async fn real_main() -> anyhow::Result<()> {
let (shutdown_tx, shutdown_rx) = channel::<()>();
@ -57,9 +60,40 @@ async fn real_main() -> anyhow::Result<()> {
event_tx.clone(),
shard_state.clone(),
cache.clone(),
runtime_config.clone(),
)));
}
set.spawn(tokio::spawn({
let runtime_config = runtime_config.clone();
async move {
let client = Arc::new(ClientBuilder::new()
.connect_timeout(Duration::from_secs(1))
.timeout(Duration::from_secs(1))
.build()
.expect("error making client"));
while let Some((shard_id, event)) = event_rx.recv().await {
let target = runtime_config.get(RUNTIME_CONFIG_KEY_EVENT_TARGET).await;
if let Some(target) = target {
tokio::spawn({
let client = client.clone();
async move {
if let Err(error) = client
.post(format!("{target}/{}", shard_id.number()))
.body(event)
.send()
.await
{
error!(error = ?error, "failed to request event target")
}
}
});
}
}
}
}));
set.spawn(tokio::spawn(
async move { scheduled_task(redis, senders).await },
));

View file

@ -43,6 +43,7 @@ pub fn init_logging(component: &str) {
tracing_subscriber::registry()
.with(sentry_layer)
.with(tracing_subscriber::fmt::layer())
.with(EnvFilter::from_default_env())
.init();
}
}

View file

@ -33,7 +33,7 @@ impl RuntimeConfig {
settings.insert(key, value);
}
info!("starting with runtime config: {:?}", self.settings);
info!("starting with runtime config: {:?}", settings);
Ok(())
}
@ -58,8 +58,8 @@ impl RuntimeConfig {
Ok(())
}
pub async fn get(&self, key: String) -> Option<String> {
self.settings.read().await.get(&key).cloned()
pub async fn get(&self, key: &str) -> Option<String> {
self.settings.read().await.get(key).cloned()
}
pub async fn exists(&self, key: &str) -> bool {