mirror of
https://github.com/PluralKit/PluralKit.git
synced 2026-02-04 04:56:49 +00:00
fix(gateway): log close code, correctly set latency, don't spam redis error on identify
This commit is contained in:
parent
d1b617f6be
commit
88e136d22a
5 changed files with 91 additions and 48 deletions
|
|
@ -7,7 +7,6 @@ use axum::{
|
|||
};
|
||||
use serde_json::{json, to_string};
|
||||
use tracing::{error, info};
|
||||
use twilight_model::guild::Permissions;
|
||||
use twilight_model::id::Id;
|
||||
|
||||
use crate::discord::{
|
||||
|
|
|
|||
|
|
@ -1,9 +1,10 @@
|
|||
use futures::StreamExt;
|
||||
use libpk::_config::ClusterSettings;
|
||||
use metrics::counter;
|
||||
use std::sync::{mpsc::Sender, Arc};
|
||||
use tracing::{info, warn};
|
||||
use tracing::{info, warn, error};
|
||||
use twilight_gateway::{
|
||||
create_iterator, ConfigBuilder, Event, EventTypeFlags, Shard, ShardId, StreamExt,
|
||||
create_iterator, ConfigBuilder, Event, EventTypeFlags, Shard, ShardId, Message,
|
||||
};
|
||||
use twilight_model::gateway::{
|
||||
payload::outgoing::update_presence::UpdatePresencePayload,
|
||||
|
|
@ -77,48 +78,94 @@ pub fn create_shards(redis: fred::clients::RedisPool) -> anyhow::Result<Vec<Shar
|
|||
|
||||
pub async fn runner(
|
||||
mut shard: Shard<RedisQueue>,
|
||||
tx: Sender<(ShardId, Event)>,
|
||||
_tx: Sender<(ShardId, String)>,
|
||||
shard_state: ShardStateManager,
|
||||
cache: Arc<DiscordCache>,
|
||||
) {
|
||||
//let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered();
|
||||
// let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered();
|
||||
info!("waiting for events");
|
||||
while let Some(item) = shard.next_event(EventTypeFlags::all()).await {
|
||||
match item {
|
||||
Ok(event) => {
|
||||
// event_type * shard_id is too many labels and prometheus fails to query it
|
||||
// so we split it into two metrics
|
||||
counter!(
|
||||
"pluralkit_gateway_events_type",
|
||||
"event_type" => serde_variant::to_variant_name(&event.kind()).unwrap(),
|
||||
)
|
||||
.increment(1);
|
||||
counter!(
|
||||
"pluralkit_gateway_events_shard",
|
||||
"shard_id" => shard.id().number().to_string(),
|
||||
)
|
||||
.increment(1);
|
||||
if let Err(error) = shard_state
|
||||
.handle_event(shard.id().number(), event.clone())
|
||||
.await
|
||||
{
|
||||
tracing::warn!(?error, "error updating redis state")
|
||||
}
|
||||
if let Event::Ready(_) = event {
|
||||
if !cache.2.read().await.contains(&shard.id().number()) {
|
||||
cache.2.write().await.push(shard.id().number());
|
||||
while let Some(item) = shard.next().await {
|
||||
let raw_event = match item {
|
||||
Ok(evt) => match evt {
|
||||
Message::Close(frame) => {
|
||||
info!(
|
||||
"shard {} closed: {}",
|
||||
shard.id().number(),
|
||||
if let Some(close) = frame {
|
||||
format!("{} ({})", close.code, close.reason)
|
||||
} else {
|
||||
"unknown".to_string()
|
||||
}
|
||||
);
|
||||
if let Err(error) = shard_state.socket_closed(shard.id().number()).await {
|
||||
error!("failed to update shard state for socket closure: {error}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
cache.0.update(&event);
|
||||
//if let Err(error) = tx.send((shard.id(), event)) {
|
||||
// tracing::warn!(?error, "error sending event to global handler: {error}",);
|
||||
//}
|
||||
}
|
||||
Message::Text(text) => text,
|
||||
},
|
||||
Err(error) => {
|
||||
tracing::warn!(?error, "error receiving event from shard {}", shard.id());
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let event = match twilight_gateway::parse(raw_event.clone(), EventTypeFlags::all()) {
|
||||
Ok(Some(parsed)) => Event::from(parsed),
|
||||
Ok(None) => {
|
||||
// we received an event type unknown to twilight
|
||||
// that's fine, we probably don't need it anyway
|
||||
continue;
|
||||
}
|
||||
Err(error) => {
|
||||
error!(
|
||||
"shard {} failed to parse gateway event: {}",
|
||||
shard.id().number(),
|
||||
error
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// log the event in metrics
|
||||
// event_type * shard_id is too many labels and prometheus fails to query it
|
||||
// so we split it into two metrics
|
||||
counter!(
|
||||
"pluralkit_gateway_events_type",
|
||||
"event_type" => serde_variant::to_variant_name(&event.kind()).unwrap(),
|
||||
)
|
||||
.increment(1);
|
||||
counter!(
|
||||
"pluralkit_gateway_events_shard",
|
||||
"shard_id" => shard.id().number().to_string(),
|
||||
)
|
||||
.increment(1);
|
||||
|
||||
// update shard state and discord cache
|
||||
if let Err(error) = shard_state
|
||||
.handle_event(shard.id().number(), event.clone())
|
||||
.await
|
||||
{
|
||||
tracing::warn!(?error, "error updating redis state");
|
||||
}
|
||||
// need to do heartbeat separately, to get the latency
|
||||
if let Event::GatewayHeartbeatAck = event
|
||||
&& let Err(error) = shard_state
|
||||
.heartbeated(shard.id().number(), shard.latency())
|
||||
.await
|
||||
{
|
||||
tracing::warn!(?error, "error updating redis state for latency");
|
||||
}
|
||||
|
||||
if let Event::Ready(_) = event {
|
||||
if !cache.2.read().await.contains(&shard.id().number()) {
|
||||
cache.2.write().await.push(shard.id().number());
|
||||
}
|
||||
}
|
||||
cache.0.update(&event);
|
||||
|
||||
// okay, we've handled the event internally, let's send it to consumers
|
||||
// tx.send((shard.id(), raw_event)).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -10,8 +10,6 @@ use tokio::sync::oneshot;
|
|||
use tracing::{error, info};
|
||||
use twilight_gateway::queue::Queue;
|
||||
|
||||
use libpk::util::redis::RedisErrorExt;
|
||||
|
||||
pub fn new(redis: RedisPool) -> RedisQueue {
|
||||
RedisQueue {
|
||||
redis,
|
||||
|
|
@ -69,8 +67,7 @@ async fn request_inner(redis: RedisPool, concurrency: u32, shard_id: u32, tx: on
|
|||
Some(SetOptions::NX),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.to_option_or_error();
|
||||
.await;
|
||||
match done {
|
||||
Ok(Some(_)) => {
|
||||
info!(shard_id, bucket, "got allowance!");
|
||||
|
|
|
|||
|
|
@ -1,8 +1,7 @@
|
|||
use bytes::Bytes;
|
||||
use fred::{clients::RedisPool, interfaces::HashesInterface};
|
||||
use metrics::{counter, gauge};
|
||||
use tracing::info;
|
||||
use twilight_gateway::Event;
|
||||
use twilight_gateway::{Event, Latency};
|
||||
|
||||
use libpk::{state::*, util::redis::*};
|
||||
|
||||
|
|
@ -20,8 +19,6 @@ impl ShardStateManager {
|
|||
match event {
|
||||
Event::Ready(_) => self.ready_or_resumed(shard_id, false).await,
|
||||
Event::Resumed => self.ready_or_resumed(shard_id, true).await,
|
||||
Event::GatewayClose(_) => self.socket_closed(shard_id).await,
|
||||
Event::GatewayHeartbeat(_) => self.heartbeated(shard_id).await,
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
|
@ -71,8 +68,7 @@ impl ShardStateManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn socket_closed(&self, shard_id: u32) -> anyhow::Result<()> {
|
||||
info!("shard {} closed", shard_id);
|
||||
pub async fn socket_closed(&self, shard_id: u32) -> anyhow::Result<()> {
|
||||
gauge!("pluralkit_gateway_shard_up").decrement(1);
|
||||
let mut info = self.get_shard(shard_id).await?;
|
||||
info.up = false;
|
||||
|
|
@ -81,13 +77,14 @@ impl ShardStateManager {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn heartbeated(&self, shard_id: u32) -> anyhow::Result<()> {
|
||||
pub async fn heartbeated(&self, shard_id: u32, latency: &Latency) -> anyhow::Result<()> {
|
||||
let mut info = self.get_shard(shard_id).await?;
|
||||
info.up = true;
|
||||
info.last_heartbeat = chrono::offset::Utc::now().timestamp() as i32;
|
||||
// todo
|
||||
// info.latency = latency.recent().front().map_or_else(|| 0, |d| d.as_millis()) as i32;
|
||||
info.latency = 1;
|
||||
info.latency = latency
|
||||
.recent()
|
||||
.first()
|
||||
.map_or_else(|| 0, |d| d.as_millis()) as i32;
|
||||
self.save_shard(shard_id, info).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
#![feature(let_chains)]
|
||||
#![feature(if_let_guard)]
|
||||
|
||||
use chrono::Timelike;
|
||||
use fred::{clients::RedisPool, interfaces::*};
|
||||
use signal_hook::{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue