diff --git a/Cargo.lock b/Cargo.lock index 6e18bead..18e722da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -584,6 +584,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "croner" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38fd53511eaf0b00a185613875fee58b208dfce016577d0ad4bb548e1c4fb3ee" +dependencies = [ + "chrono", +] + [[package]] name = "crossbeam-epoch" version = "0.9.14" @@ -2911,6 +2920,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", + "croner", "fred", "libpk", "metrics", diff --git a/services/scheduled_tasks/Cargo.toml b/services/scheduled_tasks/Cargo.toml index 07d0d33d..3df1a3e7 100644 --- a/services/scheduled_tasks/Cargo.toml +++ b/services/scheduled_tasks/Cargo.toml @@ -16,4 +16,5 @@ sqlx = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +croner = "2.1.0" num-format = "0.4.4" diff --git a/services/scheduled_tasks/src/main.rs b/services/scheduled_tasks/src/main.rs index 10aa2ea8..1f4b28db 100644 --- a/services/scheduled_tasks/src/main.rs +++ b/services/scheduled_tasks/src/main.rs @@ -1,9 +1,9 @@ -use chrono::{TimeDelta, Timelike}; +use chrono::Utc; +use croner::Cron; use fred::prelude::RedisPool; use sqlx::PgPool; -use std::time::Duration; use tokio::task::JoinSet; -use tracing::{error, info}; +use tracing::{debug, error, info}; mod tasks; use tasks::*; @@ -32,12 +32,20 @@ async fn real_main() -> anyhow::Result<()> { // i couldn't be bothered to figure out the types of passing in an async // function to another function... so macro it is macro_rules! doforever { - ($timeout:expr, $desc:expr, $fn:ident) => { + ($cron:expr, $desc:expr, $fn:ident) => { let ctx = ctx.clone(); + let cron = Cron::new($cron) + .with_seconds_optional() + .parse() + .expect("invalid cron"); set.spawn(tokio::spawn(async move { loop { let ctx = ctx.clone(); - wait_interval($timeout).await; + let next_iter_time = cron.find_next_occurrence(&Utc::now(), false).unwrap(); + debug!("next execution of {} at {:?}", $desc, next_iter_time); + let dur = next_iter_time - Utc::now(); + tokio::time::sleep(dur.to_std().unwrap()).await; + info!("running {}", $desc); let before = std::time::Instant::now(); if let Err(error) = $fn(ctx).await { @@ -52,12 +60,25 @@ async fn real_main() -> anyhow::Result<()> { }; } - doforever!(10, "prometheus updater", update_prometheus); - doforever!(60, "database stats updater", update_db_meta); - doforever!(600, "message stats updater", update_db_message_meta); - doforever!(60, "discord stats updater", update_discord_stats); + // every 10 seconds doforever!( - 1800, + "0,10,20,30,40,50 * * * * *", + "prometheus updater", + update_prometheus + ); + // every minute + doforever!("* * * * *", "database stats updater", update_db_meta); + // every 10 minutes + doforever!( + "0,10,20,30,40,50 * * * *", + "message stats updater", + update_db_message_meta + ); + // every minute + doforever!("* * * * *", "discord stats updater", update_discord_stats); + // on :00 and :30 + doforever!( + "0,30 * * * *", "queue deleted image cleanup job", queue_deleted_image_cleanup ); @@ -68,69 +89,3 @@ async fn real_main() -> anyhow::Result<()> { Ok(()) } - -// cron except not really -async fn wait_interval(interval_secs: u32) { - let now = chrono::Utc::now(); - - // for now, only supports hardcoded intervals - let next_iter_time = match interval_secs { - // 10 sec - // at every [x]0th second (00, 10, 20, 30, 40, 50) - 10 => { - let mut minute = 0; - let mut second = (now.second() - (now.second() % 10)) + 10; - if second == 60 { - minute += 1; - second = 0; - } - now.checked_add_signed(TimeDelta::minutes(minute)) - .expect("invalid time") - .with_second(second) - .expect("invalid time") - } - // 1 minute - // every minute at :00 seconds - 60 => now - .checked_add_signed(TimeDelta::minutes(1)) - .expect("invalid time") - .with_second(0) - .expect("invalid time"), - // 10 minutes - // at every [x]0 minute (00, 10, 20, 30, 40, 50) - 600 => { - let mut minute = (now.minute() + 10) % 10; - let mut hour = 0; - if minute == 60 { - minute = 0; - hour = 1; - } - - now.checked_add_signed(TimeDelta::hours(hour)) - .expect("invalid time") - .with_minute(minute) - .expect("invalid time") - } - // 30 minutes - // at :00 and :30 - 1800 => { - let mut minute = (now.minute() + 30) % 30; - let mut hour = 0; - if minute == 60 { - minute = 0; - hour = 1; - } - - now.checked_add_signed(TimeDelta::hours(hour)) - .expect("invalid time") - .with_minute(minute) - .expect("invalid time") - } - - _ => unreachable!(), - }; - - let dur = next_iter_time - now; - - tokio::time::sleep(Duration::from_secs(dur.num_seconds() as u64)).await; -}