2024-09-14 12:19:47 +09:00
|
|
|
use bytes::Bytes;
|
2024-11-01 06:42:21 +09:00
|
|
|
use fred::{clients::RedisPool, interfaces::HashesInterface};
|
2024-11-14 13:39:47 +09:00
|
|
|
use metrics::{counter, gauge};
|
2024-09-14 12:19:47 +09:00
|
|
|
use prost::Message;
|
|
|
|
|
use tracing::info;
|
|
|
|
|
use twilight_gateway::Event;
|
|
|
|
|
|
|
|
|
|
use libpk::{proto::*, util::redis::*};
|
|
|
|
|
|
|
|
|
|
#[derive(Clone)]
|
|
|
|
|
pub struct ShardStateManager {
|
|
|
|
|
redis: RedisPool,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub fn new(redis: RedisPool) -> ShardStateManager {
|
|
|
|
|
ShardStateManager { redis }
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl ShardStateManager {
|
|
|
|
|
pub async fn handle_event(&self, shard_id: u32, event: Event) -> anyhow::Result<()> {
|
|
|
|
|
match event {
|
2024-11-10 04:53:10 +09:00
|
|
|
Event::Ready(_) => self.ready_or_resumed(shard_id, false).await,
|
|
|
|
|
Event::Resumed => self.ready_or_resumed(shard_id, true).await,
|
2024-09-14 12:19:47 +09:00
|
|
|
Event::GatewayClose(_) => self.socket_closed(shard_id).await,
|
|
|
|
|
Event::GatewayHeartbeat(_) => self.heartbeated(shard_id).await,
|
|
|
|
|
_ => Ok(()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn get_shard(&self, shard_id: u32) -> anyhow::Result<ShardState> {
|
|
|
|
|
let data: Option<Vec<u8>> = self
|
|
|
|
|
.redis
|
|
|
|
|
.hget("pluralkit:shardstatus", shard_id)
|
|
|
|
|
.await
|
|
|
|
|
.to_option_or_error()?;
|
|
|
|
|
match data {
|
|
|
|
|
Some(buf) => {
|
|
|
|
|
Ok(ShardState::decode(buf.as_slice()).expect("could not decode shard data!"))
|
|
|
|
|
}
|
|
|
|
|
None => Ok(ShardState::default()),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn save_shard(&self, shard_id: u32, info: ShardState) -> anyhow::Result<()> {
|
|
|
|
|
self.redis
|
2024-11-14 12:23:52 +09:00
|
|
|
.hset::<(), &str, (String, Bytes)>(
|
2024-09-14 12:19:47 +09:00
|
|
|
"pluralkit:shardstatus",
|
|
|
|
|
(
|
|
|
|
|
shard_id.to_string(),
|
|
|
|
|
Bytes::copy_from_slice(&info.encode_to_vec()),
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
.await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
2024-11-10 04:53:10 +09:00
|
|
|
async fn ready_or_resumed(&self, shard_id: u32, resumed: bool) -> anyhow::Result<()> {
|
|
|
|
|
info!(
|
|
|
|
|
"shard {} {}",
|
|
|
|
|
shard_id,
|
|
|
|
|
if resumed { "resumed" } else { "ready" }
|
|
|
|
|
);
|
2024-11-14 13:39:47 +09:00
|
|
|
counter!(
|
|
|
|
|
"pluralkit_gateway_shard_reconnect",
|
|
|
|
|
"shard_id" => shard_id.to_string(),
|
|
|
|
|
"resumed" => resumed.to_string(),
|
|
|
|
|
)
|
|
|
|
|
.increment(1);
|
|
|
|
|
gauge!("pluralkit_gateway_shard_up").increment(1);
|
2024-09-14 12:19:47 +09:00
|
|
|
let mut info = self.get_shard(shard_id).await?;
|
|
|
|
|
info.last_connection = chrono::offset::Utc::now().timestamp() as i32;
|
|
|
|
|
info.up = true;
|
|
|
|
|
self.save_shard(shard_id, info).await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn socket_closed(&self, shard_id: u32) -> anyhow::Result<()> {
|
|
|
|
|
info!("shard {} closed", shard_id);
|
2024-11-14 13:39:47 +09:00
|
|
|
gauge!("pluralkit_gateway_shard_up").decrement(1);
|
2024-09-14 12:19:47 +09:00
|
|
|
let mut info = self.get_shard(shard_id).await?;
|
|
|
|
|
info.up = false;
|
|
|
|
|
info.disconnection_count += 1;
|
|
|
|
|
self.save_shard(shard_id, info).await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async fn heartbeated(&self, shard_id: u32) -> anyhow::Result<()> {
|
|
|
|
|
let mut info = self.get_shard(shard_id).await?;
|
|
|
|
|
info.up = true;
|
|
|
|
|
info.last_heartbeat = chrono::offset::Utc::now().timestamp() as i32;
|
|
|
|
|
// todo
|
|
|
|
|
// info.latency = latency.recent().front().map_or_else(|| 0, |d| d.as_millis()) as i32;
|
|
|
|
|
info.latency = 1;
|
|
|
|
|
self.save_shard(shard_id, info).await?;
|
|
|
|
|
Ok(())
|
|
|
|
|
}
|
|
|
|
|
}
|