fix(scheduled_tasks): replace broken time parsing code with cron library

This commit is contained in:
alyssa 2025-01-02 00:34:15 +00:00
parent f1de2f2858
commit 357122a892
3 changed files with 42 additions and 76 deletions

10
Cargo.lock generated
View file

@ -584,6 +584,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "croner"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38fd53511eaf0b00a185613875fee58b208dfce016577d0ad4bb548e1c4fb3ee"
dependencies = [
"chrono",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.14"
@ -2911,6 +2920,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"chrono",
"croner",
"fred",
"libpk",
"metrics",

View file

@ -16,4 +16,5 @@ sqlx = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
croner = "2.1.0"
num-format = "0.4.4"

View file

@ -1,9 +1,9 @@
use chrono::{TimeDelta, Timelike};
use chrono::Utc;
use croner::Cron;
use fred::prelude::RedisPool;
use sqlx::PgPool;
use std::time::Duration;
use tokio::task::JoinSet;
use tracing::{error, info};
use tracing::{debug, error, info};
mod tasks;
use tasks::*;
@ -32,12 +32,20 @@ async fn real_main() -> anyhow::Result<()> {
// i couldn't be bothered to figure out the types of passing in an async
// function to another function... so macro it is
macro_rules! doforever {
($timeout:expr, $desc:expr, $fn:ident) => {
($cron:expr, $desc:expr, $fn:ident) => {
let ctx = ctx.clone();
let cron = Cron::new($cron)
.with_seconds_optional()
.parse()
.expect("invalid cron");
set.spawn(tokio::spawn(async move {
loop {
let ctx = ctx.clone();
wait_interval($timeout).await;
let next_iter_time = cron.find_next_occurrence(&Utc::now(), false).unwrap();
debug!("next execution of {} at {:?}", $desc, next_iter_time);
let dur = next_iter_time - Utc::now();
tokio::time::sleep(dur.to_std().unwrap()).await;
info!("running {}", $desc);
let before = std::time::Instant::now();
if let Err(error) = $fn(ctx).await {
@ -52,12 +60,25 @@ async fn real_main() -> anyhow::Result<()> {
};
}
doforever!(10, "prometheus updater", update_prometheus);
doforever!(60, "database stats updater", update_db_meta);
doforever!(600, "message stats updater", update_db_message_meta);
doforever!(60, "discord stats updater", update_discord_stats);
// every 10 seconds
doforever!(
1800,
"0,10,20,30,40,50 * * * * *",
"prometheus updater",
update_prometheus
);
// every minute
doforever!("* * * * *", "database stats updater", update_db_meta);
// every 10 minutes
doforever!(
"0,10,20,30,40,50 * * * *",
"message stats updater",
update_db_message_meta
);
// every minute
doforever!("* * * * *", "discord stats updater", update_discord_stats);
// on :00 and :30
doforever!(
"0,30 * * * *",
"queue deleted image cleanup job",
queue_deleted_image_cleanup
);
@ -68,69 +89,3 @@ async fn real_main() -> anyhow::Result<()> {
Ok(())
}
// cron except not really
async fn wait_interval(interval_secs: u32) {
let now = chrono::Utc::now();
// for now, only supports hardcoded intervals
let next_iter_time = match interval_secs {
// 10 sec
// at every [x]0th second (00, 10, 20, 30, 40, 50)
10 => {
let mut minute = 0;
let mut second = (now.second() - (now.second() % 10)) + 10;
if second == 60 {
minute += 1;
second = 0;
}
now.checked_add_signed(TimeDelta::minutes(minute))
.expect("invalid time")
.with_second(second)
.expect("invalid time")
}
// 1 minute
// every minute at :00 seconds
60 => now
.checked_add_signed(TimeDelta::minutes(1))
.expect("invalid time")
.with_second(0)
.expect("invalid time"),
// 10 minutes
// at every [x]0 minute (00, 10, 20, 30, 40, 50)
600 => {
let mut minute = (now.minute() + 10) % 10;
let mut hour = 0;
if minute == 60 {
minute = 0;
hour = 1;
}
now.checked_add_signed(TimeDelta::hours(hour))
.expect("invalid time")
.with_minute(minute)
.expect("invalid time")
}
// 30 minutes
// at :00 and :30
1800 => {
let mut minute = (now.minute() + 30) % 30;
let mut hour = 0;
if minute == 60 {
minute = 0;
hour = 1;
}
now.checked_add_signed(TimeDelta::hours(hour))
.expect("invalid time")
.with_minute(minute)
.expect("invalid time")
}
_ => unreachable!(),
};
let dur = next_iter_time - now;
tokio::time::sleep(Duration::from_secs(dur.num_seconds() as u64)).await;
}