mirror of
https://github.com/PluralKit/PluralKit.git
synced 2026-02-16 02:30:11 +00:00
feat: rewrite scheduled_tasks in rust
This commit is contained in:
parent
4bfee8a090
commit
0862964305
20 changed files with 419 additions and 715 deletions
136
services/scheduled_tasks/src/main.rs
Normal file
136
services/scheduled_tasks/src/main.rs
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
use chrono::{TimeDelta, Timelike};
|
||||
use fred::prelude::RedisPool;
|
||||
use sqlx::PgPool;
|
||||
use std::time::Duration;
|
||||
use tokio::task::JoinSet;
|
||||
use tracing::{error, info};
|
||||
|
||||
mod tasks;
|
||||
use tasks::*;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppCtx {
|
||||
pub data: PgPool,
|
||||
pub messages: PgPool,
|
||||
pub stats: PgPool,
|
||||
pub redis: RedisPool,
|
||||
}
|
||||
|
||||
libpk::main!("scheduled_tasks");
|
||||
async fn real_main() -> anyhow::Result<()> {
|
||||
let ctx = AppCtx {
|
||||
data: libpk::db::init_data_db().await?,
|
||||
messages: libpk::db::init_messages_db().await?,
|
||||
stats: libpk::db::init_stats_db().await?,
|
||||
redis: libpk::db::init_redis().await?,
|
||||
};
|
||||
|
||||
info!("starting scheduled tasks runner");
|
||||
|
||||
let mut set = JoinSet::new();
|
||||
|
||||
// i couldn't be bothered to figure out the types of passing in an async
|
||||
// function to another function... so macro it is
|
||||
macro_rules! doforever {
|
||||
($timeout:expr, $desc:expr, $fn:ident) => {
|
||||
let ctx = ctx.clone();
|
||||
set.spawn(tokio::spawn(async move {
|
||||
loop {
|
||||
let ctx = ctx.clone();
|
||||
wait_interval($timeout).await;
|
||||
info!("running {}", $desc);
|
||||
let before = std::time::Instant::now();
|
||||
if let Err(error) = $fn(ctx).await {
|
||||
error!("failed to run {}: {}", $desc, error);
|
||||
// sentry
|
||||
}
|
||||
let duration = before.elapsed();
|
||||
info!("ran {} in {duration:?}", $desc);
|
||||
// add prometheus log
|
||||
}
|
||||
}))
|
||||
};
|
||||
}
|
||||
|
||||
doforever!(10, "prometheus updater", update_prometheus);
|
||||
doforever!(60, "database stats updater", update_db_meta);
|
||||
doforever!(600, "message stats updater", update_db_message_meta);
|
||||
doforever!(60, "discord stats updater", update_discord_stats);
|
||||
doforever!(
|
||||
1800,
|
||||
"queue deleted image cleanup job",
|
||||
queue_deleted_image_cleanup
|
||||
);
|
||||
|
||||
set.join_next()
|
||||
.await
|
||||
.ok_or(anyhow::anyhow!("could not join_next"))???;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// cron except not really
|
||||
async fn wait_interval(interval_secs: u32) {
|
||||
let now = chrono::Utc::now();
|
||||
|
||||
// for now, only supports hardcoded intervals
|
||||
let next_iter_time = match interval_secs {
|
||||
// 10 sec
|
||||
// at every [x]0th second (00, 10, 20, 30, 40, 50)
|
||||
10 => {
|
||||
let mut minute = 0;
|
||||
let mut second = (now.second() - (now.second() % 10)) + 10;
|
||||
if second == 60 {
|
||||
minute += 1;
|
||||
second = 0;
|
||||
}
|
||||
now.checked_add_signed(TimeDelta::minutes(minute))
|
||||
.expect("invalid time")
|
||||
.with_second(second)
|
||||
.expect("invalid time")
|
||||
}
|
||||
// 1 minute
|
||||
// every minute at :00 seconds
|
||||
60 => now
|
||||
.checked_add_signed(TimeDelta::minutes(1))
|
||||
.expect("invalid time")
|
||||
.with_second(0)
|
||||
.expect("invalid time"),
|
||||
// 10 minutes
|
||||
// at every [x]0 minute (00, 10, 20, 30, 40, 50)
|
||||
600 => {
|
||||
let mut minute = (now.minute() + 10) % 10;
|
||||
let mut hour = 0;
|
||||
if minute == 60 {
|
||||
minute = 0;
|
||||
hour = 1;
|
||||
}
|
||||
|
||||
now.checked_add_signed(TimeDelta::hours(hour))
|
||||
.expect("invalid time")
|
||||
.with_minute(minute)
|
||||
.expect("invalid time")
|
||||
}
|
||||
// 30 minutes
|
||||
// at :00 and :30
|
||||
1800 => {
|
||||
let mut minute = (now.minute() + 30) % 30;
|
||||
let mut hour = 0;
|
||||
if minute == 60 {
|
||||
minute = 0;
|
||||
hour = 1;
|
||||
}
|
||||
|
||||
now.checked_add_signed(TimeDelta::hours(hour))
|
||||
.expect("invalid time")
|
||||
.with_minute(minute)
|
||||
.expect("invalid time")
|
||||
}
|
||||
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let dur = next_iter_time - now;
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(dur.num_seconds() as u64)).await;
|
||||
}
|
||||
151
services/scheduled_tasks/src/tasks.rs
Normal file
151
services/scheduled_tasks/src/tasks.rs
Normal file
|
|
@ -0,0 +1,151 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use fred::prelude::KeysInterface;
|
||||
use libpk::{
|
||||
config,
|
||||
db::repository::{get_stats, insert_stats},
|
||||
};
|
||||
use metrics::gauge;
|
||||
use num_format::{Locale, ToFormattedString};
|
||||
use reqwest::ClientBuilder;
|
||||
use sqlx::Executor;
|
||||
|
||||
use crate::AppCtx;
|
||||
|
||||
pub async fn update_prometheus(ctx: AppCtx) -> anyhow::Result<()> {
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct Count {
|
||||
count: i64,
|
||||
}
|
||||
let count: Count = sqlx::query_as("select count(*) from image_cleanup_jobs")
|
||||
.fetch_one(&ctx.data)
|
||||
.await?;
|
||||
|
||||
gauge!("pluralkit_image_cleanup_queue_length").set(count.count as f64);
|
||||
|
||||
// todo: remaining shard session_start_limit
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_db_meta(ctx: AppCtx) -> anyhow::Result<()> {
|
||||
ctx.data
|
||||
.execute(
|
||||
r#"
|
||||
update info set
|
||||
system_count = (select count(*) from systems),
|
||||
member_count = (select count(*) from systems),
|
||||
group_count = (select count(*) from systems),
|
||||
switch_count = (select count(*) from systems)
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
let new_stats = get_stats(&ctx.data).await?;
|
||||
insert_stats(&ctx.stats, "systems", new_stats.system_count).await?;
|
||||
insert_stats(&ctx.stats, "members", new_stats.member_count).await?;
|
||||
insert_stats(&ctx.stats, "groups", new_stats.group_count).await?;
|
||||
insert_stats(&ctx.stats, "switches", new_stats.switch_count).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_db_message_meta(ctx: AppCtx) -> anyhow::Result<()> {
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct MessageCount {
|
||||
count: i64,
|
||||
}
|
||||
let message_count: MessageCount = sqlx::query_as("select count(*) from messages")
|
||||
.fetch_one(&ctx.messages)
|
||||
.await?;
|
||||
sqlx::query("update info set message_count = $1")
|
||||
.bind(message_count.count)
|
||||
.execute(&ctx.data)
|
||||
.await?;
|
||||
insert_stats(&ctx.stats, "messages", message_count.count).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_discord_stats(ctx: AppCtx) -> anyhow::Result<()> {
|
||||
let client = ClientBuilder::new()
|
||||
.connect_timeout(Duration::from_secs(3))
|
||||
.timeout(Duration::from_secs(3))
|
||||
.build()
|
||||
.expect("error making client");
|
||||
|
||||
let cfg = config
|
||||
.scheduled_tasks
|
||||
.as_ref()
|
||||
.expect("missing scheduled_tasks config");
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
struct GatewayStatus {
|
||||
up: bool,
|
||||
guild_count: i64,
|
||||
channel_count: i64,
|
||||
}
|
||||
|
||||
let mut guild_count = 0;
|
||||
let mut channel_count = 0;
|
||||
|
||||
for idx in 0..=cfg.expected_gateway_count {
|
||||
let res = client
|
||||
.get(format!("http://cluster{idx}.{}/stats", cfg.gateway_url))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
let stat: GatewayStatus = res.json().await?;
|
||||
|
||||
if !stat.up {
|
||||
return Err(anyhow!("cluster {idx} is not up"));
|
||||
}
|
||||
|
||||
guild_count += stat.guild_count;
|
||||
channel_count += stat.channel_count;
|
||||
}
|
||||
|
||||
insert_stats(&ctx.stats, "guilds", guild_count).await?;
|
||||
insert_stats(&ctx.stats, "channels", channel_count).await?;
|
||||
|
||||
if cfg.set_guild_count {
|
||||
ctx.redis
|
||||
.set::<(), &str, String>(
|
||||
"pluralkit:botstatus",
|
||||
format!(
|
||||
"in {} servers",
|
||||
guild_count.to_formatted_string(&Locale::en)
|
||||
),
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn queue_deleted_image_cleanup(ctx: AppCtx) -> anyhow::Result<()> {
|
||||
// todo: we want to delete immediately when system is deleted, but after a
|
||||
// delay if member is deleted
|
||||
ctx.data
|
||||
.execute(
|
||||
r#"
|
||||
insert into image_cleanup_jobs
|
||||
select id from images where
|
||||
not exists (select from image_cleanup_jobs j where j.id = images.id)
|
||||
and not exists (select from systems where avatar_url = images.url)
|
||||
and not exists (select from systems where banner_image = images.url)
|
||||
and not exists (select from system_guild where avatar_url = images.url)
|
||||
|
||||
and not exists (select from members where avatar_url = images.url)
|
||||
and not exists (select from members where banner_image = images.url)
|
||||
and not exists (select from members where webhook_avatar_url = images.url)
|
||||
and not exists (select from member_guild where avatar_url = images.url)
|
||||
|
||||
and not exists (select from groups where icon = images.url)
|
||||
and not exists (select from groups where banner_image = images.url);
|
||||
"#,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue