diff --git a/services/gateway/src/cache_api.rs b/services/gateway/src/cache_api.rs index df189a31..c73424f9 100644 --- a/services/gateway/src/cache_api.rs +++ b/services/gateway/src/cache_api.rs @@ -7,7 +7,6 @@ use axum::{ }; use serde_json::{json, to_string}; use tracing::{error, info}; -use twilight_model::guild::Permissions; use twilight_model::id::Id; use crate::discord::{ diff --git a/services/gateway/src/discord/gateway.rs b/services/gateway/src/discord/gateway.rs index a50fda8e..c46d3ff6 100644 --- a/services/gateway/src/discord/gateway.rs +++ b/services/gateway/src/discord/gateway.rs @@ -1,9 +1,10 @@ +use futures::StreamExt; use libpk::_config::ClusterSettings; use metrics::counter; use std::sync::{mpsc::Sender, Arc}; -use tracing::{info, warn}; +use tracing::{info, warn, error}; use twilight_gateway::{ - create_iterator, ConfigBuilder, Event, EventTypeFlags, Shard, ShardId, StreamExt, + create_iterator, ConfigBuilder, Event, EventTypeFlags, Shard, ShardId, Message, }; use twilight_model::gateway::{ payload::outgoing::update_presence::UpdatePresencePayload, @@ -77,48 +78,94 @@ pub fn create_shards(redis: fred::clients::RedisPool) -> anyhow::Result, - tx: Sender<(ShardId, Event)>, + _tx: Sender<(ShardId, String)>, shard_state: ShardStateManager, cache: Arc, ) { - //let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered(); + // let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered(); info!("waiting for events"); - while let Some(item) = shard.next_event(EventTypeFlags::all()).await { - match item { - Ok(event) => { - // event_type * shard_id is too many labels and prometheus fails to query it - // so we split it into two metrics - counter!( - "pluralkit_gateway_events_type", - "event_type" => serde_variant::to_variant_name(&event.kind()).unwrap(), - ) - .increment(1); - counter!( - "pluralkit_gateway_events_shard", - "shard_id" => shard.id().number().to_string(), - ) - .increment(1); - if let Err(error) = shard_state - .handle_event(shard.id().number(), event.clone()) - .await - { - tracing::warn!(?error, "error updating redis state") - } - if let Event::Ready(_) = event { - if !cache.2.read().await.contains(&shard.id().number()) { - cache.2.write().await.push(shard.id().number()); + while let Some(item) = shard.next().await { + let raw_event = match item { + Ok(evt) => match evt { + Message::Close(frame) => { + info!( + "shard {} closed: {}", + shard.id().number(), + if let Some(close) = frame { + format!("{} ({})", close.code, close.reason) + } else { + "unknown".to_string() + } + ); + if let Err(error) = shard_state.socket_closed(shard.id().number()).await { + error!("failed to update shard state for socket closure: {error}"); } + continue; } - cache.0.update(&event); - //if let Err(error) = tx.send((shard.id(), event)) { - // tracing::warn!(?error, "error sending event to global handler: {error}",); - //} - } + Message::Text(text) => text, + }, Err(error) => { tracing::warn!(?error, "error receiving event from shard {}", shard.id()); continue; } + }; + + let event = match twilight_gateway::parse(raw_event.clone(), EventTypeFlags::all()) { + Ok(Some(parsed)) => Event::from(parsed), + Ok(None) => { + // we received an event type unknown to twilight + // that's fine, we probably don't need it anyway + continue; + } + Err(error) => { + error!( + "shard {} failed to parse gateway event: {}", + shard.id().number(), + error + ); + continue; + } + }; + + // log the event in metrics + // event_type * shard_id is too many labels and prometheus fails to query it + // so we split it into two metrics + counter!( + "pluralkit_gateway_events_type", + "event_type" => serde_variant::to_variant_name(&event.kind()).unwrap(), + ) + .increment(1); + counter!( + "pluralkit_gateway_events_shard", + "shard_id" => shard.id().number().to_string(), + ) + .increment(1); + + // update shard state and discord cache + if let Err(error) = shard_state + .handle_event(shard.id().number(), event.clone()) + .await + { + tracing::warn!(?error, "error updating redis state"); } + // need to do heartbeat separately, to get the latency + if let Event::GatewayHeartbeatAck = event + && let Err(error) = shard_state + .heartbeated(shard.id().number(), shard.latency()) + .await + { + tracing::warn!(?error, "error updating redis state for latency"); + } + + if let Event::Ready(_) = event { + if !cache.2.read().await.contains(&shard.id().number()) { + cache.2.write().await.push(shard.id().number()); + } + } + cache.0.update(&event); + + // okay, we've handled the event internally, let's send it to consumers + // tx.send((shard.id(), raw_event)).unwrap(); } } diff --git a/services/gateway/src/discord/identify_queue.rs b/services/gateway/src/discord/identify_queue.rs index cdfe893b..2d523dfa 100644 --- a/services/gateway/src/discord/identify_queue.rs +++ b/services/gateway/src/discord/identify_queue.rs @@ -10,8 +10,6 @@ use tokio::sync::oneshot; use tracing::{error, info}; use twilight_gateway::queue::Queue; -use libpk::util::redis::RedisErrorExt; - pub fn new(redis: RedisPool) -> RedisQueue { RedisQueue { redis, @@ -69,8 +67,7 @@ async fn request_inner(redis: RedisPool, concurrency: u32, shard_id: u32, tx: on Some(SetOptions::NX), false, ) - .await - .to_option_or_error(); + .await; match done { Ok(Some(_)) => { info!(shard_id, bucket, "got allowance!"); diff --git a/services/gateway/src/discord/shard_state.rs b/services/gateway/src/discord/shard_state.rs index 1e7f94fe..f80b2dd4 100644 --- a/services/gateway/src/discord/shard_state.rs +++ b/services/gateway/src/discord/shard_state.rs @@ -1,8 +1,7 @@ -use bytes::Bytes; use fred::{clients::RedisPool, interfaces::HashesInterface}; use metrics::{counter, gauge}; use tracing::info; -use twilight_gateway::Event; +use twilight_gateway::{Event, Latency}; use libpk::{state::*, util::redis::*}; @@ -20,8 +19,6 @@ impl ShardStateManager { match event { Event::Ready(_) => self.ready_or_resumed(shard_id, false).await, Event::Resumed => self.ready_or_resumed(shard_id, true).await, - Event::GatewayClose(_) => self.socket_closed(shard_id).await, - Event::GatewayHeartbeat(_) => self.heartbeated(shard_id).await, _ => Ok(()), } } @@ -71,8 +68,7 @@ impl ShardStateManager { Ok(()) } - async fn socket_closed(&self, shard_id: u32) -> anyhow::Result<()> { - info!("shard {} closed", shard_id); + pub async fn socket_closed(&self, shard_id: u32) -> anyhow::Result<()> { gauge!("pluralkit_gateway_shard_up").decrement(1); let mut info = self.get_shard(shard_id).await?; info.up = false; @@ -81,13 +77,14 @@ impl ShardStateManager { Ok(()) } - async fn heartbeated(&self, shard_id: u32) -> anyhow::Result<()> { + pub async fn heartbeated(&self, shard_id: u32, latency: &Latency) -> anyhow::Result<()> { let mut info = self.get_shard(shard_id).await?; info.up = true; info.last_heartbeat = chrono::offset::Utc::now().timestamp() as i32; - // todo - // info.latency = latency.recent().front().map_or_else(|| 0, |d| d.as_millis()) as i32; - info.latency = 1; + info.latency = latency + .recent() + .first() + .map_or_else(|| 0, |d| d.as_millis()) as i32; self.save_shard(shard_id, info).await?; Ok(()) } diff --git a/services/gateway/src/main.rs b/services/gateway/src/main.rs index ae9e840e..6bc33e13 100644 --- a/services/gateway/src/main.rs +++ b/services/gateway/src/main.rs @@ -1,3 +1,6 @@ +#![feature(let_chains)] +#![feature(if_let_guard)] + use chrono::Timelike; use fred::{clients::RedisPool, interfaces::*}; use signal_hook::{