feat(gateway): add metric for shard close code

This commit is contained in:
alyssa 2025-03-08 12:04:13 +00:00
parent ec49ead783
commit 3cf71112d6

View file

@ -83,29 +83,38 @@ pub async fn runner(
cache: Arc<DiscordCache>, cache: Arc<DiscordCache>,
) { ) {
// let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered(); // let _span = info_span!("shard_runner", shard_id = shard.id().number()).entered();
let shard_id = shard.id().number();
info!("waiting for events"); info!("waiting for events");
while let Some(item) = shard.next().await { while let Some(item) = shard.next().await {
let raw_event = match item { let raw_event = match item {
Ok(evt) => match evt { Ok(evt) => match evt {
Message::Close(frame) => { Message::Close(frame) => {
info!( let close_code = if let Some(close) = frame {
"shard {} closed: {}", close.code.to_string()
shard.id().number(), } else {
if let Some(close) = frame { "unknown".to_string()
format!("{} ({})", close.code, close.reason) };
} else {
"unknown".to_string() info!("shard {shard_id} closed: {close_code}");
}
); counter!(
if let Err(error) = shard_state.socket_closed(shard.id().number()).await { "pluralkit_gateway_shard_closed",
"shard_id" => shard_id.to_string(),
"close_code" => close_code,
)
.increment(1);
if let Err(error) = shard_state.socket_closed(shard_id).await {
error!("failed to update shard state for socket closure: {error}"); error!("failed to update shard state for socket closure: {error}");
} }
continue; continue;
} }
Message::Text(text) => text, Message::Text(text) => text,
}, },
Err(error) => { Err(error) => {
tracing::warn!(?error, "error receiving event from shard {}", shard.id()); tracing::warn!(?error, "error receiving event from shard {shard_id}");
continue; continue;
} }
}; };
@ -118,11 +127,7 @@ pub async fn runner(
continue; continue;
} }
Err(error) => { Err(error) => {
error!( error!("shard {shard_id} failed to parse gateway event: {error}");
"shard {} failed to parse gateway event: {}",
shard.id().number(),
error
);
continue; continue;
} }
}; };
@ -137,29 +142,24 @@ pub async fn runner(
.increment(1); .increment(1);
counter!( counter!(
"pluralkit_gateway_events_shard", "pluralkit_gateway_events_shard",
"shard_id" => shard.id().number().to_string(), "shard_id" => shard_id.to_string(),
) )
.increment(1); .increment(1);
// update shard state and discord cache // update shard state and discord cache
if let Err(error) = shard_state if let Err(error) = shard_state.handle_event(shard_id, event.clone()).await {
.handle_event(shard.id().number(), event.clone()) tracing::error!(?error, "error updating redis state");
.await
{
tracing::warn!(?error, "error updating redis state");
} }
// need to do heartbeat separately, to get the latency // need to do heartbeat separately, to get the latency
if let Event::GatewayHeartbeatAck = event if let Event::GatewayHeartbeatAck = event
&& let Err(error) = shard_state && let Err(error) = shard_state.heartbeated(shard_id, shard.latency()).await
.heartbeated(shard.id().number(), shard.latency())
.await
{ {
tracing::warn!(?error, "error updating redis state for latency"); tracing::error!(?error, "error updating redis state for latency");
} }
if let Event::Ready(_) = event { if let Event::Ready(_) = event {
if !cache.2.read().await.contains(&shard.id().number()) { if !cache.2.read().await.contains(&shard_id) {
cache.2.write().await.push(shard.id().number()); cache.2.write().await.push(shard_id);
} }
} }
cache.0.update(&event); cache.0.update(&event);