From f69587ceafec26a34c7d73a6d8e288136990c425 Mon Sep 17 00:00:00 2001 From: asleepyskye Date: Sat, 24 Jan 2026 11:43:05 -0500 Subject: [PATCH] WIP: revise avatars service --- Cargo.lock | 30 +++ Cargo.toml | 2 +- PluralKit.Bot/Commands/Groups.cs | 8 +- PluralKit.Bot/Commands/Member.cs | 4 +- PluralKit.Bot/Commands/MemberAvatar.cs | 4 +- PluralKit.Bot/Commands/MemberEdit.cs | 4 +- PluralKit.Bot/Commands/SystemEdit.cs | 12 +- .../Services/AvatarHostingService.cs | 14 +- PluralKit.Bot/Utils/AvatarUtils.cs | 3 + crates/api/src/endpoints/images.rs | 126 ++++++++++ crates/api/src/endpoints/mod.rs | 1 + crates/api/src/error.rs | 2 + crates/api/src/main.rs | 3 + crates/avatars/Cargo.toml | 2 +- crates/avatars/src/cleanup.rs | 179 ++++++++++---- crates/avatars/src/main.rs | 229 +++++++++++++++--- crates/avatars/src/new.sql | 97 ++++++++ crates/avatars/src/process.rs | 47 ++-- crates/avatars/src/pull.rs | 13 +- crates/avatars/src/store.rs | 7 +- crates/libpk/Cargo.toml | 1 + crates/libpk/src/_config.rs | 7 +- crates/libpk/src/db/repository/avatars.rs | 128 +++++++--- crates/libpk/src/db/types/avatars.rs | 71 +++++- crates/scheduled_tasks/src/main.rs | 6 + crates/scheduled_tasks/src/tasks.rs | 114 +++++++-- 26 files changed, 912 insertions(+), 202 deletions(-) create mode 100644 crates/api/src/endpoints/images.rs create mode 100644 crates/avatars/src/new.sql diff --git a/Cargo.lock b/Cargo.lock index 0bb714cb..fa43e745 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -307,6 +307,7 @@ dependencies = [ "matchit 0.8.4", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -1917,6 +1918,15 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "ipnetwork" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf466541e9d546596ee94f9f69590f89473455f88372423e0008fc1a7daf100e" +dependencies = [ + "serde", +] + [[package]] name = "itertools" version = "0.12.1" @@ -2048,6 +2058,7 @@ name = "libpk" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "config", "fred", "json-subscriber", @@ -2274,6 +2285,23 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.3.1", + "httparse", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "nibble_vec" version = "0.1.0" @@ -3812,6 +3840,7 @@ dependencies = [ "hashbrown 0.15.2", "hashlink 0.10.0", "indexmap", + "ipnetwork", "log", "memchr", "once_cell", @@ -3934,6 +3963,7 @@ dependencies = [ "hkdf", "hmac", "home", + "ipnetwork", "itoa", "log", "md-5", diff --git a/Cargo.toml b/Cargo.toml index 69a9048a..f4ea5b4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ reqwest = { version = "0.12.7" , default-features = false, features = ["rustls-t sentry = { version = "0.36.0", default-features = false, features = ["backtrace", "contexts", "panic", "debug-images", "reqwest", "rustls"] } # replace native-tls with rustls serde = { version = "1.0.196", features = ["derive"] } serde_json = "1.0.117" -sqlx = { version = "0.8.2", features = ["runtime-tokio", "postgres", "time", "chrono", "macros", "uuid"] } +sqlx = { version = "0.8.2", features = ["runtime-tokio", "postgres", "time", "chrono", "macros", "uuid", "ipnetwork"] } tokio = { version = "1.46.1", features = ["full"] } tracing = "0.1" tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json"] } diff --git a/PluralKit.Bot/Commands/Groups.cs b/PluralKit.Bot/Commands/Groups.cs index b18764d2..02bc246e 100644 --- a/PluralKit.Bot/Commands/Groups.cs +++ b/PluralKit.Bot/Commands/Groups.cs @@ -290,8 +290,8 @@ public class Groups { ctx.CheckOwnGroup(target); - img = await _avatarHosting.TryRehostImage(img, AvatarHostingService.RehostedImageType.Avatar, ctx.Author.Id, ctx.System); - await _avatarHosting.VerifyAvatarOrThrow(img.Url); + img = await _avatarHosting.TryRehostImage(ctx, img, AvatarHostingService.RehostedImageType.Avatar); + await _avatarHosting.VerifyAvatarOrThrow(ctx, img.Url); await ctx.Repository.UpdateGroup(target.Id, new GroupPatch { Icon = img.CleanUrl ?? img.Url }); @@ -365,8 +365,8 @@ public class Groups { ctx.CheckOwnGroup(target); - img = await _avatarHosting.TryRehostImage(img, AvatarHostingService.RehostedImageType.Banner, ctx.Author.Id, ctx.System); - await _avatarHosting.VerifyAvatarOrThrow(img.Url, true); + img = await _avatarHosting.TryRehostImage(ctx, img, AvatarHostingService.RehostedImageType.Banner); + await _avatarHosting.VerifyAvatarOrThrow(ctx, img.Url, true); await ctx.Repository.UpdateGroup(target.Id, new GroupPatch { BannerImage = img.CleanUrl ?? img.Url }); diff --git a/PluralKit.Bot/Commands/Member.cs b/PluralKit.Bot/Commands/Member.cs index 37ab9d18..9d579433 100644 --- a/PluralKit.Bot/Commands/Member.cs +++ b/PluralKit.Bot/Commands/Member.cs @@ -81,9 +81,9 @@ public class Member uriBuilder.Query = ""; img.CleanUrl = uriBuilder.Uri.AbsoluteUri; - img = await _avatarHosting.TryRehostImage(img, AvatarHostingService.RehostedImageType.Avatar, ctx.Author.Id, ctx.System); + img = await _avatarHosting.TryRehostImage(ctx, img, AvatarHostingService.RehostedImageType.Avatar); - await _avatarHosting.VerifyAvatarOrThrow(img.Url); + await _avatarHosting.VerifyAvatarOrThrow(ctx, img.Url); await ctx.Repository.UpdateMember(member.Id, new MemberPatch { AvatarUrl = img.CleanUrl ?? img.Url }, conn); dispatchData.Add("avatar_url", img.CleanUrl); diff --git a/PluralKit.Bot/Commands/MemberAvatar.cs b/PluralKit.Bot/Commands/MemberAvatar.cs index 26eb310e..8b954a47 100644 --- a/PluralKit.Bot/Commands/MemberAvatar.cs +++ b/PluralKit.Bot/Commands/MemberAvatar.cs @@ -158,8 +158,8 @@ public class MemberAvatar ctx.CheckSystem().CheckOwnMember(target); - avatarArg = await _avatarHosting.TryRehostImage(avatarArg.Value, AvatarHostingService.RehostedImageType.Avatar, ctx.Author.Id, ctx.System); - await _avatarHosting.VerifyAvatarOrThrow(avatarArg.Value.Url); + avatarArg = await _avatarHosting.TryRehostImage(ctx, avatarArg.Value, AvatarHostingService.RehostedImageType.Avatar); + await _avatarHosting.VerifyAvatarOrThrow(ctx, avatarArg.Value.Url); await UpdateAvatar(location, ctx, target, avatarArg.Value.CleanUrl ?? avatarArg.Value.Url); await PrintResponse(location, ctx, target, avatarArg.Value, guildData); } diff --git a/PluralKit.Bot/Commands/MemberEdit.cs b/PluralKit.Bot/Commands/MemberEdit.cs index 525d60ab..a4784ed4 100644 --- a/PluralKit.Bot/Commands/MemberEdit.cs +++ b/PluralKit.Bot/Commands/MemberEdit.cs @@ -230,8 +230,8 @@ public class MemberEdit async Task SetBannerImage(ParsedImage img) { ctx.CheckOwnMember(target); - img = await _avatarHosting.TryRehostImage(img, AvatarHostingService.RehostedImageType.Banner, ctx.Author.Id, ctx.System); - await _avatarHosting.VerifyAvatarOrThrow(img.Url, true); + img = await _avatarHosting.TryRehostImage(ctx, img, AvatarHostingService.RehostedImageType.Banner); + await _avatarHosting.VerifyAvatarOrThrow(ctx, img.Url, true); await ctx.Repository.UpdateMember(target.Id, new MemberPatch { BannerImage = img.CleanUrl ?? img.Url }); diff --git a/PluralKit.Bot/Commands/SystemEdit.cs b/PluralKit.Bot/Commands/SystemEdit.cs index 3af4a639..4db5e5e4 100644 --- a/PluralKit.Bot/Commands/SystemEdit.cs +++ b/PluralKit.Bot/Commands/SystemEdit.cs @@ -573,8 +573,8 @@ public class SystemEdit { ctx.CheckOwnSystem(target); - img = await _avatarHosting.TryRehostImage(img, AvatarHostingService.RehostedImageType.Avatar, ctx.Author.Id, ctx.System); - await _avatarHosting.VerifyAvatarOrThrow(img.Url); + img = await _avatarHosting.TryRehostImage(ctx, img, AvatarHostingService.RehostedImageType.Avatar); + await _avatarHosting.VerifyAvatarOrThrow(ctx, img.Url); await ctx.Repository.UpdateSystem(target.Id, new SystemPatch { AvatarUrl = img.CleanUrl ?? img.Url }); @@ -660,8 +660,8 @@ public class SystemEdit { ctx.CheckOwnSystem(target); - img = await _avatarHosting.TryRehostImage(img, AvatarHostingService.RehostedImageType.Avatar, ctx.Author.Id, ctx.System); - await _avatarHosting.VerifyAvatarOrThrow(img.Url); + img = await _avatarHosting.TryRehostImage(ctx, img, AvatarHostingService.RehostedImageType.Avatar); + await _avatarHosting.VerifyAvatarOrThrow(ctx, img.Url); await ctx.Repository.UpdateSystemGuild(target.Id, ctx.Guild.Id, new SystemGuildPatch { AvatarUrl = img.CleanUrl ?? img.Url }); @@ -782,8 +782,8 @@ public class SystemEdit else if (await ctx.MatchImage() is { } img) { - img = await _avatarHosting.TryRehostImage(img, AvatarHostingService.RehostedImageType.Banner, ctx.Author.Id, ctx.System); - await _avatarHosting.VerifyAvatarOrThrow(img.Url, true); + img = await _avatarHosting.TryRehostImage(ctx, img, AvatarHostingService.RehostedImageType.Banner); + await _avatarHosting.VerifyAvatarOrThrow(ctx, img.Url, true); await ctx.Repository.UpdateSystem(target.Id, new SystemPatch { BannerImage = img.CleanUrl ?? img.Url }); diff --git a/PluralKit.Bot/Services/AvatarHostingService.cs b/PluralKit.Bot/Services/AvatarHostingService.cs index e43467c3..8a84ae22 100644 --- a/PluralKit.Bot/Services/AvatarHostingService.cs +++ b/PluralKit.Bot/Services/AvatarHostingService.cs @@ -1,4 +1,5 @@ using PluralKit.Core; +using Serilog; using System.Net; using System.Net.Http.Json; @@ -18,7 +19,7 @@ public class AvatarHostingService }; } - public async Task VerifyAvatarOrThrow(string url, bool isBanner = false) + public async Task VerifyAvatarOrThrow(Context ctx, string url, bool isBanner = false) { if (url.Length > Limits.MaxUriLength) throw Errors.UrlTooLong(url); @@ -35,6 +36,7 @@ public class AvatarHostingService return; var kind = isBanner ? "banner" : "avatar"; + if (ctx.Premium) kind = "premium_" + kind; try { @@ -56,11 +58,11 @@ public class AvatarHostingService } } - public async Task TryRehostImage(ParsedImage input, RehostedImageType type, ulong userId, PKSystem? system) + public async Task TryRehostImage(Context ctx, ParsedImage input, RehostedImageType type) { try { - var uploaded = await TryUploadAvatar(input.Url, type, userId, system); + var uploaded = await TryUploadAvatar(ctx, input.Url, type); if (uploaded != null) { // todo: make new image type called Cdn? @@ -78,7 +80,7 @@ public class AvatarHostingService } } - public async Task TryUploadAvatar(string? avatarUrl, RehostedImageType type, ulong userId, PKSystem? system) + public async Task TryUploadAvatar(Context ctx, string? avatarUrl, RehostedImageType type) { if (!AvatarUtils.IsDiscordCdnUrl(avatarUrl)) return null; @@ -93,8 +95,10 @@ public class AvatarHostingService _ => throw new ArgumentOutOfRangeException(nameof(type), type, null) }; + if (ctx.Premium) kind = "premium_" + kind; + var response = await _client.PostAsJsonAsync(_config.AvatarServiceUrl + "/pull", - new { url = avatarUrl, kind, uploaded_by = userId, system_id = system?.Uuid.ToString() }); + new { url = avatarUrl, kind, uploaded_by = ctx.Author.Id, system_id = ctx.System.Uuid.ToString() }); if (response.StatusCode != HttpStatusCode.OK) { var error = await response.Content.ReadFromJsonAsync(); diff --git a/PluralKit.Bot/Utils/AvatarUtils.cs b/PluralKit.Bot/Utils/AvatarUtils.cs index 65a208ed..2cbb8fb1 100644 --- a/PluralKit.Bot/Utils/AvatarUtils.cs +++ b/PluralKit.Bot/Utils/AvatarUtils.cs @@ -25,6 +25,9 @@ public static class AvatarUtils if (match.Groups["query"].Success) newUrl += "&" + match.Groups["query"].Value; + //if it's our cdn, add proxy=true to the end to allow for proxy image resizing + if (url.StartsWith("https://cdn.pluralkit.me")) newUrl += "?proxy=true"; + return newUrl; } diff --git a/crates/api/src/endpoints/images.rs b/crates/api/src/endpoints/images.rs new file mode 100644 index 00000000..5a62ea71 --- /dev/null +++ b/crates/api/src/endpoints/images.rs @@ -0,0 +1,126 @@ +use crate::ApiContext; +use crate::auth::AuthState; +use crate::error::{GENERIC_BAD_REQUEST, GENERIC_NOT_FOUND, fail}; +use axum::Extension; +use axum::extract::{Path, Request}; +use axum::http::HeaderValue; +use axum::response::IntoResponse; +use axum::{extract::State, response::Json}; +use hyper::Uri; +use libpk::config; +use libpk::db::repository::avatars as avatars_db; +use libpk::db::types::avatars::*; +use pk_macros::api_endpoint; +use pluralkit_models::PKSystemConfig; +use serde::Serialize; +use sqlx::Postgres; +use sqlx::types::Uuid; +use sqlx::types::chrono::Utc; +use std::result::Result::Ok; +use tracing::warn; + +#[derive(Serialize)] +struct APIImage { + url: String, + proxy_url: Option, +} + +#[api_endpoint] +pub async fn image_data( + State(ctx): State, + Path((system_uuid, image_uuid)): Path<(Uuid, Uuid)>, +) -> Json { + let img: Image = match avatars_db::get_by_id(&ctx.db, system_uuid, image_uuid).await { + Ok(Some(img)) => img, + Ok(None) => return Err(GENERIC_NOT_FOUND), + Err(err) => fail!(?err, "failed to query image"), + }; + let mut proxy_url: Option = None; + if let Some(proxy_hash) = img.meta.proxy_image { + let proxy_img = match avatars_db::get_by_hash(&ctx.db, proxy_hash.to_string()).await { + Ok(Some(img)) => img, + Ok(None) => { + warn!( + system_uuid = system_uuid.to_string(), + image_uuid = image_uuid.to_string(), + "failed to find proxy image" + ); + return Err(GENERIC_NOT_FOUND); + } + Err(err) => fail!(?err, "failed to query proxy image"), + }; + proxy_url = Some(proxy_img.url) + } + return Ok(Json(APIImage { + url: img.data.url, + proxy_url: proxy_url, + })); +} + +#[api_endpoint] +pub async fn upload( + Extension(auth): Extension, + State(ctx): State, + mut req: Request, +) -> impl IntoResponse { + let Some(system_id) = auth.system_id() else { + return Err(crate::error::GENERIC_AUTH_ERROR); + }; + + let uuid: Uuid = match sqlx::query_scalar("select uuid from systems where id = $1") + .bind(system_id) + .fetch_optional(&ctx.db) + .await + { + Ok(Some(uuid)) => uuid, + Ok(None) => fail!( + system = system_id, + "failed to find uuid for existing system" + ), + Err(err) => fail!(?err, "failed to query system uuid"), + }; + + let sys_config = match sqlx::query_as::( + "select * from system_config where system = $1", + ) + .bind(system_id) + .fetch_optional(&ctx.db) + .await + { + Ok(Some(sys_config)) => sys_config, + Ok(None) => fail!( + system = system_id, + "failed to find system config for existing system" + ), + Err(err) => fail!(?err, "failed to query system config"), + }; + if !sys_config.premium_lifetime { + if let Some(premium_until) = sys_config.premium_until { + if premium_until < Utc::now().naive_utc() { + return Err(GENERIC_BAD_REQUEST); + } + } else { + return Err(GENERIC_BAD_REQUEST); + } + } + + let url = format!( + "{}/upload", + config + .api + .as_ref() + .unwrap() + .avatars_service_url + .clone() + .expect("expected avatars url") + ); + + *req.uri_mut() = Uri::try_from(url).unwrap(); + let headers = req.headers_mut(); + headers.append( + "x-pluralkit-systemuuid", + HeaderValue::from_str(&uuid.to_string()).expect("expected valid uuid for header"), + ); + + Ok(ctx.rproxy_client.request(req).await?.into_response()) +} diff --git a/crates/api/src/endpoints/mod.rs b/crates/api/src/endpoints/mod.rs index c311367c..16172211 100644 --- a/crates/api/src/endpoints/mod.rs +++ b/crates/api/src/endpoints/mod.rs @@ -1,2 +1,3 @@ +pub mod images; pub mod private; pub mod system; diff --git a/crates/api/src/error.rs b/crates/api/src/error.rs index fc481d0c..b0bafb65 100644 --- a/crates/api/src/error.rs +++ b/crates/api/src/error.rs @@ -82,5 +82,7 @@ macro_rules! define_error { }; } +define_error! { GENERIC_AUTH_ERROR, StatusCode::UNAUTHORIZED, 0, "401: Missing or invalid Authorization header" } define_error! { GENERIC_BAD_REQUEST, StatusCode::BAD_REQUEST, 0, "400: Bad Request" } +define_error! { GENERIC_NOT_FOUND, StatusCode::NOT_FOUND, 0, "404: Not Found" } define_error! { GENERIC_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR, 0, "500: Internal Server Error" } diff --git a/crates/api/src/main.rs b/crates/api/src/main.rs index 5c2bcfd4..d95751f7 100644 --- a/crates/api/src/main.rs +++ b/crates/api/src/main.rs @@ -121,6 +121,9 @@ fn router(ctx: ApiContext) -> Router { .route("/private/discord/callback2", post(endpoints::private::discord_callback)) .route("/private/discord/shard_state", get(endpoints::private::discord_state)) .route("/private/stats", get(endpoints::private::meta)) + + .route("/private/images/{system_uuid}/{image_uuid}", get(endpoints::images::image_data)) + .route("/private/images/upload", post(endpoints::images::upload)) .route("/v2/systems/{system_id}/oembed.json", get(rproxy)) .route("/v2/members/{member_id}/oembed.json", get(rproxy)) diff --git a/crates/avatars/Cargo.toml b/crates/avatars/Cargo.toml index f6dfb96f..629f54e6 100644 --- a/crates/avatars/Cargo.toml +++ b/crates/avatars/Cargo.toml @@ -10,7 +10,7 @@ path = "src/cleanup.rs" [dependencies] libpk = { path = "../libpk" } anyhow = { workspace = true } -axum = { workspace = true } +axum = { workspace = true, features = ["multipart"]} futures = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } diff --git a/crates/avatars/src/cleanup.rs b/crates/avatars/src/cleanup.rs index 48b25f98..5e26b30b 100644 --- a/crates/avatars/src/cleanup.rs +++ b/crates/avatars/src/cleanup.rs @@ -1,8 +1,8 @@ use anyhow::Context; -use reqwest::{ClientBuilder, StatusCode}; -use sqlx::prelude::FromRow; -use std::{sync::Arc, time::Duration}; +use reqwest::{ClientBuilder, StatusCode, Url}; +use std::{path::Path, sync::Arc, time::Duration}; use tracing::{error, info}; +use uuid::Uuid; #[libpk::main] async fn main() -> anyhow::Result<()> { @@ -36,109 +36,196 @@ async fn main() -> anyhow::Result<()> { loop { // no infinite loops tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - match cleanup_job(pool.clone(), bucket.clone()).await { + match cleanup_job(pool.clone()).await { Ok(()) => {} Err(error) => { error!(?error, "failed to run avatar cleanup job"); // sentry } } + match cleanup_hash_job(pool.clone(), bucket.clone()).await { + Ok(()) => {} + Err(error) => { + error!(?error, "failed to run hash cleanup job"); + // sentry + } + } } } -#[derive(FromRow)] +#[derive(sqlx::FromRow)] struct CleanupJobEntry { - id: String, + id: Uuid, + system_uuid: Uuid, } -async fn cleanup_job(pool: sqlx::PgPool, bucket: Arc) -> anyhow::Result<()> { +async fn cleanup_job(pool: sqlx::PgPool) -> anyhow::Result<()> { let mut tx = pool.begin().await?; - let image_id: Option = sqlx::query_as( + let entry: Option = sqlx::query_as( // no timestamp checking here // images are only added to the table after 24h r#" - select id from image_cleanup_jobs + select id, system_uuid from image_cleanup_jobs for update skip locked limit 1;"#, ) .fetch_optional(&mut *tx) .await?; - if image_id.is_none() { + if entry.is_none() { info!("no job to run, sleeping for 1 minute"); tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; return Ok(()); } - let image_id = image_id.unwrap().id; + let entry = entry.unwrap(); + let image_id = entry.id; + let system_uuid = entry.system_uuid; info!("got image {image_id}, cleaning up..."); - let image_data = libpk::db::repository::avatars::get_by_id(&pool, image_id.clone()).await?; - if image_data.is_none() { + let image = + libpk::db::repository::avatars::get_by_id(&pool, system_uuid.clone(), image_id.clone()) + .await?; + if image.is_none() { // unsure how this can happen? there is a FK reference info!("image {image_id} was already deleted, skipping"); - sqlx::query("delete from image_cleanup_jobs where id = $1") + sqlx::query("delete from image_cleanup_jobs where id = $1 and system_uuid = $2") .bind(image_id) + .bind(system_uuid) .execute(&mut *tx) .await?; return Ok(()); } - let image_data = image_data.unwrap(); + let image = image.unwrap(); let config = libpk::config .avatars .as_ref() .expect("missing avatar service config"); - let path = image_data - .url - .strip_prefix(config.cdn_url.as_str()) - .unwrap(); - - let s3_resp = bucket.delete_object(path).await?; - match s3_resp.status_code() { - 204 => { - info!("successfully deleted image {image_id} from s3"); - } - _ => { - anyhow::bail!("s3 returned bad error code {}", s3_resp.status_code()); - } - } - - if let Some(zone_id) = config.cloudflare_zone_id.as_ref() { + if let Some(store_id) = config.fastly_store_id.as_ref() { let client = ClientBuilder::new() .connect_timeout(Duration::from_secs(3)) .timeout(Duration::from_secs(3)) .build() .context("error making client")?; - let cf_resp = client - .post(format!( - "https://api.cloudflare.com/client/v4/zones/{zone_id}/purge_cache" + let url = Url::parse(&image.data.url).expect("invalid url"); + let extension = Path::new(url.path()) + .extension() + .and_then(|s| s.to_str()) + .unwrap_or(""); + let key = format!("{system_uuid}:{image_id}.{extension}"); + + let kv_resp = client + .delete(format!( + "https://api.fastly.com/resources/stores/kv/{store_id}/keys/{key}" )) - .header( - "Authorization", - format!("Bearer {}", config.cloudflare_token.as_ref().unwrap()), - ) - .body(format!(r#"{{"files":["{}"]}}"#, image_data.url)) + .header("Fastly-Key", config.fastly_token.as_ref().unwrap()) .send() .await?; - match cf_resp.status() { + match kv_resp.status() { StatusCode::OK => { info!( - "successfully purged url {} from cloudflare cache", - image_data.url + "successfully purged image {}:{}.{} from fastly kv", + system_uuid, image_id, extension ); } _ => { - let status = cf_resp.status(); - tracing::info!("raw response from cloudflare: {:#?}", cf_resp.text().await?); - anyhow::bail!("cloudflare returned bad error code {}", status); + let status = kv_resp.status(); + tracing::info!("raw response from fastly: {:#?}", kv_resp.text().await?); + tracing::warn!("fastly returned bad error code {}", status); + } + } + + let cdn_url_parsed = Url::parse(config.cdn_url.as_str())?; + let cdn_host = cdn_url_parsed.host_str().unwrap_or(config.cdn_url.as_str()); + + let cache_resp = client + .post(format!( + "https://api.fastly.com/purge/{}/{}/{}.{}", + cdn_host, system_uuid, image_id, extension + )) + .header("Fastly-Key", config.fastly_token.as_ref().unwrap()) + .send() + .await?; + + match cache_resp.status() { + StatusCode::OK => { + info!( + "successfully purged image {}/{}.{} from fastly cache", + system_uuid, image_id, extension + ); + } + _ => { + let status = cache_resp.status(); + tracing::info!("raw response from fastly: {:#?}", cache_resp.text().await?); + tracing::warn!("fastly returned bad error code {}", status); } } } - sqlx::query("delete from images where id = $1") + sqlx::query("delete from images_assets where id = $1 and system_uuid = $2") .bind(image_id.clone()) + .bind(system_uuid.clone()) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + + Ok(()) +} + +#[derive(sqlx::FromRow)] +struct HashCleanupJobEntry { + hash: String, +} + +async fn cleanup_hash_job(pool: sqlx::PgPool, bucket: Arc) -> anyhow::Result<()> { + let mut tx = pool.begin().await?; + + let config = libpk::config + .avatars + .as_ref() + .expect("missing avatar service config"); + + let entry: Option = sqlx::query_as( + // no timestamp checking here + // images are only added to the table after 24h + r#" + select hash from image_hash_cleanup_jobs + for update skip locked limit 1;"#, + ) + .fetch_optional(&mut *tx) + .await?; + if entry.is_none() { + info!("no hash job to run, sleeping for 1 minute"); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + return Ok(()); + } + let entry = entry.unwrap(); + let hash = entry.hash; + info!("got orphaned hash {hash}, cleaning up..."); + + let url: Option = sqlx::query_scalar("select url from images_hashes where hash = $1") + .bind(&hash) + .fetch_optional(&mut *tx) + .await?; + + if let Some(url) = url { + let path = url.strip_prefix(config.cdn_url.as_str()).unwrap(); + let s3_resp = bucket.delete_object(path).await?; + match s3_resp.status_code() { + 204 => { + info!("successfully deleted image {hash} from s3"); + } + _ => { + anyhow::bail!("s3 returned bad error code {}", s3_resp.status_code()); + } + } + } + + sqlx::query("delete from images_hashes where hash = $1") + .bind(&hash) .execute(&mut *tx) .await?; diff --git a/crates/avatars/src/main.rs b/crates/avatars/src/main.rs index df80ac82..51428ec0 100644 --- a/crates/avatars/src/main.rs +++ b/crates/avatars/src/main.rs @@ -5,7 +5,8 @@ mod pull; mod store; use anyhow::Context; -use axum::extract::State; +use axum::extract::{DefaultBodyLimit, Multipart, State}; +use axum::http::HeaderMap; use axum::routing::get; use axum::{ Json, Router, @@ -21,12 +22,18 @@ use reqwest::{Client, ClientBuilder}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; use std::error::Error; +use std::net::IpAddr; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use thiserror::Error; use tracing::{error, info, warn}; use uuid::Uuid; +const NORMAL_HARD_LIMIT: usize = 8 * 1024 * 1024; +const PREMIUM_SOFT_LIMIT: usize = 30 * 1024 * 1024; +const PREMIUM_HARD_LIMIT: usize = 50 * 1024 * 1024; + #[derive(Error, Debug)] pub enum PKAvatarError { // todo: split off into logical groups (cdn/url error, image format error, etc) @@ -82,60 +89,130 @@ pub struct PullRequest { } #[derive(Serialize)] -pub struct PullResponse { +pub struct ImageResponse { url: String, new: bool, } +async fn gen_proxy_image(state: &AppState, data: Vec) -> Result, PKAvatarError> { + let encoded_proxy = process::process_async(data, ImageKind::Avatar).await?; + let store_proxy_res = crate::store::store(&state.bucket, &encoded_proxy).await?; + let proxy_url = format!("{}{}", state.config.cdn_url, store_proxy_res.path); + db::add_image_data( + &state.pool, + &ImageData { + hash: encoded_proxy.hash.to_string(), + url: proxy_url, + file_size: encoded_proxy.data.len() as i32, + width: encoded_proxy.width as i32, + height: encoded_proxy.height as i32, + content_type: encoded_proxy.format.to_mime_type().to_string(), + created_at: None, + }, + ) + .await?; + Ok(Some(encoded_proxy.hash.to_string())) +} + +async fn handle_image( + state: &AppState, + data: Vec, + mut meta: ImageMeta, +) -> Result { + let original_file_size = data.len(); + let system_uuid = meta.system_uuid; + + if meta.kind.is_premium() && original_file_size > NORMAL_HARD_LIMIT { + meta.proxy_image = gen_proxy_image(&state, data.clone()).await?; + } + + let encoded = process::process_async(data, meta.kind).await?; + let store_res = crate::store::store(&state.bucket, &encoded).await?; + meta.image = store_res.id.clone(); + let storage_url = format!("{}{}", state.config.cdn_url, store_res.path); + + let res = db::add_image( + &state.pool, + Image { + meta: meta, + data: ImageData { + hash: store_res.id, + url: storage_url, + file_size: encoded.data.len() as i32, + width: encoded.width as i32, + height: encoded.height as i32, + content_type: encoded.format.to_mime_type().to_string(), + created_at: None, + }, + }, + ) + .await?; + + if original_file_size >= PREMIUM_SOFT_LIMIT { + warn!( + "large image {} of size {} uploaded", + res.uuid, original_file_size + ) + } + + let final_url = format!( + "{}images/{}/{}.{}", + state.config.edge_url, + system_uuid, + res.uuid, + encoded + .format + .extensions_str() + .first() + .expect("expected valid extension") + ); + + Ok(ImageResponse { + url: final_url, + new: res.is_new, + }) +} + async fn pull( State(state): State, Json(req): Json, -) -> Result, PKAvatarError> { +) -> Result, PKAvatarError> { let parsed = pull::parse_url(&req.url) // parsing beforehand to "normalize" .map_err(|_| PKAvatarError::InvalidCdnUrl)?; if !(req.force || req.url.contains("https://serve.apparyllis.com/")) { if let Some(existing) = db::get_by_attachment_id(&state.pool, parsed.attachment_id).await? { // remove any pending image cleanup db::remove_deletion_queue(&state.pool, parsed.attachment_id).await?; - return Ok(Json(PullResponse { - url: existing.url, + return Ok(Json(ImageResponse { + url: existing.data.url, new: false, })); } } - - let result = crate::pull::pull(state.pull_client, &parsed).await?; - + let result = crate::pull::pull(&state.pull_client, &parsed, req.kind.is_premium()).await?; let original_file_size = result.data.len(); - let encoded = process::process_async(result.data, req.kind).await?; - let store_res = crate::store::store(&state.bucket, &encoded).await?; - let final_url = format!("{}{}", state.config.cdn_url, store_res.path); - let is_new = db::add_image( - &state.pool, - ImageMeta { - id: store_res.id, - url: final_url.clone(), - content_type: encoded.format.mime_type().to_string(), - original_url: Some(parsed.full_url), - original_type: Some(result.content_type), - original_file_size: Some(original_file_size as i32), - original_attachment_id: Some(parsed.attachment_id as i64), - file_size: encoded.data.len() as i32, - width: encoded.width as i32, - height: encoded.height as i32, - kind: req.kind, - uploaded_at: None, - uploaded_by_account: req.uploaded_by.map(|x| x as i64), - uploaded_by_system: req.system_id, - }, - ) - .await?; - - Ok(Json(PullResponse { - url: final_url, - new: is_new, - })) + Ok(Json( + handle_image( + &state, + result.data, + ImageMeta { + id: Uuid::default(), + system_uuid: req.system_id.expect("expected system id"), + image: "".to_string(), + proxy_image: None, + kind: req.kind, + original_url: Some(parsed.full_url), + original_file_size: Some(original_file_size as i32), + original_type: Some(result.content_type), + original_attachment_id: Some(parsed.attachment_id as i64), + uploaded_by_account: req.uploaded_by.map(|x| x as i64), + uploaded_by_ip: None, + uploaded_at: None, + }, + ) + .await?, + )) } async fn verify( @@ -143,13 +220,14 @@ async fn verify( Json(req): Json, ) -> Result<(), PKAvatarError> { let result = crate::pull::pull( - state.pull_client, + &state.pull_client, &ParsedUrl { full_url: req.url.clone(), channel_id: 0, attachment_id: 0, filename: "".to_string(), }, + req.kind.is_premium(), ) .await?; @@ -158,6 +236,81 @@ async fn verify( Ok(()) } +async fn upload( + State(state): State, + headers: HeaderMap, + mut multipart: Multipart, +) -> Result, PKAvatarError> { + let mut data: Option> = None; + let mut kind: Option = None; + let mut system_id: Option = None; + let mut upload_ip: Option = None; + + if let Some(val) = headers.get("x-pluralkit-systemuuid") + && let Ok(s) = val.to_str() + { + system_id = Uuid::parse_str(s).ok(); + } + if let Some(val) = headers.get("x-pluralkit-client-ip") + && let Ok(s) = val.to_str() + { + upload_ip = IpAddr::from_str(s).ok(); + } + + while let Some(field) = multipart + .next_field() + .await + .map_err(|e| PKAvatarError::InternalError(e.into()))? + { + let name = field.name().unwrap_or("").to_string(); + + match name.as_str() { + "file" => { + let bytes = field + .bytes() + .await + .map_err(|e| PKAvatarError::InternalError(e.into()))?; + data = Some(bytes.to_vec()); + } + "kind" => { + let txt = field + .text() + .await + .map_err(|e| PKAvatarError::InternalError(e.into()))?; + kind = ImageKind::from_string(&txt); + } + _ => {} + } + } + + let data = data.ok_or(PKAvatarError::MissingHeader("file"))?; + let kind = kind.ok_or(PKAvatarError::MissingHeader("kind"))?; + let system_id = system_id.ok_or(PKAvatarError::MissingHeader("x-pluralkit-systemuuid"))?; + let upload_ip = upload_ip.ok_or(PKAvatarError::MissingHeader("x-pluralkit-client-ip"))?; + + Ok(Json( + handle_image( + &state, + data, + ImageMeta { + id: Uuid::default(), + system_uuid: system_id, + image: "".to_string(), + proxy_image: None, + kind: kind, + original_url: None, + original_file_size: None, + original_type: None, + original_attachment_id: None, + uploaded_by_account: None, + uploaded_by_ip: Some(upload_ip), + uploaded_at: None, + }, + ) + .await?, + )) +} + pub async fn stats(State(state): State) -> Result, PKAvatarError> { Ok(Json(db::get_stats(&state.pool).await?)) } @@ -221,7 +374,9 @@ async fn main() -> anyhow::Result<()> { let app = Router::new() .route("/verify", post(verify)) .route("/pull", post(pull)) + .route("/upload", post(upload)) .route("/stats", get(stats)) + .layer(DefaultBodyLimit::max(PREMIUM_HARD_LIMIT)) .with_state(state); let host = &config.bind_addr; diff --git a/crates/avatars/src/new.sql b/crates/avatars/src/new.sql new file mode 100644 index 00000000..a251bc6c --- /dev/null +++ b/crates/avatars/src/new.sql @@ -0,0 +1,97 @@ +create table images_hashes ( + hash text primary key, + url text not null, + file_size int not null, + width int not null, + height int not null, + content_type text not null, + created_at timestamptz not null default now() +); + +create table images_assets ( + id uuid primary key default gen_random_uuid(), + system_uuid uuid not null, + image text references images_hashes(hash), + proxy_image text references images_hashes(hash), + kind text not null, + + original_url text, + original_file_size int, + original_type text, + original_attachment_id bigint, + + uploaded_by_account bigint, + uploaded_by_ip inet, + uploaded_at timestamptz not null default now() + + unique (id, system_uuid) +); + +insert into images_hashes ( + hash, + url, + file_size, + width, + height, + content_type, + created_at +) +select + id, + url, + file_size, + width, + height, + coalesce(content_type, 'image/webp'), + uploaded_at +from images; + +alter table images rename to images_legacy; + +create index if not exists images_original_url_idx on images_assets (original_url); +create index if not exists images_original_attachment_id_idx on images_assets (original_attachment_id); +create index if not exists images_uploaded_by_account_idx on images_assets (uploaded_by_account); + +create index if not exists images_system_id_idx on images_assets (system_uuid); +create index if not exists images_proxy_hash_idx on images_assets (image); + +-- image cleanup stuffs +alter table image_cleanup_jobs rename to image_cleanup_jobs_legacy; + +create table image_cleanup_jobs ( + id uuid primary key, + system_uuid uuid not null, + + foreign key (id, system_uuid) + references images_assets(id, system_uuid) + on delete cascade +); + +alter table image_cleanup_pending_jobs rename to image_cleanup_pending_jobs_legacy; + +create table image_cleanup_pending_jobs ( + id uuid primary key, + system_uuid uuid not null, + ts timestamp not null default now(), + + foreign key (id, system_uuid) + references images_assets(id, system_uuid) + on delete cascade +); + +create table image_hash_cleanup_jobs ( + hash text primary key + + foreign key (hash) + references images_hashes(hash) + on delete cascade +); + +create table image_hash_cleanup_pending_jobs ( + hash text primary key, + ts timestamp not null default now() + + foreign key (hash) + references images_hashes(hash) + on delete cascade +); \ No newline at end of file diff --git a/crates/avatars/src/process.rs b/crates/avatars/src/process.rs index 99ae2d36..83b43996 100644 --- a/crates/avatars/src/process.rs +++ b/crates/avatars/src/process.rs @@ -12,32 +12,10 @@ pub struct ProcessOutput { pub width: u32, pub height: u32, pub hash: Hash, - pub format: ProcessedFormat, + pub format: ImageFormat, pub data: Vec, } -#[derive(Copy, Clone, Debug)] -pub enum ProcessedFormat { - Webp, - Gif, -} - -impl ProcessedFormat { - pub fn mime_type(&self) -> &'static str { - match self { - ProcessedFormat::Gif => "image/gif", - ProcessedFormat::Webp => "image/webp", - } - } - - pub fn extension(&self) -> &'static str { - match self { - ProcessedFormat::Webp => "webp", - ProcessedFormat::Gif => "gif", - } - } -} - // Moving Vec in here since the thread needs ownership of it now, it's fine, don't need it after pub async fn process_async(data: Vec, kind: ImageKind) -> Result { tokio::task::spawn_blocking(move || process(&data, kind)) @@ -49,13 +27,16 @@ pub async fn process_async(data: Vec, kind: ImageKind) -> Result Result { let time_before = Instant::now(); let reader = reader_for(data); - match reader.format() { + let format = reader.format(); + match format { Some(ImageFormat::Png | ImageFormat::WebP | ImageFormat::Jpeg | ImageFormat::Tiff) => {} // ok :) Some(ImageFormat::Gif) => { // animated gifs will need to be handled totally differently // so split off processing here and come back if it's not applicable // (non-banner gifs + 1-frame animated gifs still need to be webp'd) - if let Some(output) = process_gif(data, kind)? { + if !kind.is_premium() + && let Some(output) = process_gif(data, kind)? + { return Ok(output); } } @@ -70,6 +51,18 @@ pub fn process(data: &[u8], kind: ImageKind) -> Result ProcessOutput { ProcessOutput { data: encoded_lossy, - format: ProcessedFormat::Webp, + format: ImageFormat::WebP, hash, width, height, diff --git a/crates/avatars/src/pull.rs b/crates/avatars/src/pull.rs index 44c4a952..da0b3d67 100644 --- a/crates/avatars/src/pull.rs +++ b/crates/avatars/src/pull.rs @@ -8,8 +8,6 @@ use std::fmt::Write; use std::time::Instant; use tracing::{error, instrument}; -const MAX_SIZE: u64 = 8 * 1024 * 1024; - #[allow(dead_code)] pub struct PullResult { pub data: Vec, @@ -19,8 +17,9 @@ pub struct PullResult { #[instrument(skip_all)] pub async fn pull( - client: Arc, + client: &Arc, parsed_url: &ParsedUrl, + premium: bool, ) -> Result { let time_before = Instant::now(); let mut trimmed_url = trim_url_query(&parsed_url.full_url)?; @@ -59,10 +58,14 @@ pub async fn pull( } } + let max_size = match premium { + true => super::PREMIUM_HARD_LIMIT as u64, + false => super::NORMAL_HARD_LIMIT as u64, + }; let size = match response.content_length() { None => return Err(PKAvatarError::MissingHeader("Content-Length")), - Some(size) if size > MAX_SIZE => { - return Err(PKAvatarError::ImageFileSizeTooLarge(size, MAX_SIZE)); + Some(size) if size > max_size => { + return Err(PKAvatarError::ImageFileSizeTooLarge(size, max_size)); } Some(size) => size, }; diff --git a/crates/avatars/src/store.rs b/crates/avatars/src/store.rs index 4232ebd8..51047d19 100644 --- a/crates/avatars/src/store.rs +++ b/crates/avatars/src/store.rs @@ -13,7 +13,10 @@ pub async fn store(bucket: &s3::Bucket, res: &ProcessOutput) -> anyhow::Result anyhow::Result { diff --git a/crates/libpk/Cargo.toml b/crates/libpk/Cargo.toml index 1f0c3c42..1a251441 100644 --- a/crates/libpk/Cargo.toml +++ b/crates/libpk/Cargo.toml @@ -20,6 +20,7 @@ twilight-model = { workspace = true } uuid = { workspace = true } config = "0.14.0" +chrono = { workspace = true, features = ["serde"] } json-subscriber = { version = "0.2.2", features = ["env-filter"] } metrics-exporter-prometheus = { version = "0.15.3", default-features = false, features = ["tokio", "http-listener", "tracing"] } sentry-tracing = "0.36.0" diff --git a/crates/libpk/src/_config.rs b/crates/libpk/src/_config.rs index f21d9adf..79ecaa7f 100644 --- a/crates/libpk/src/_config.rs +++ b/crates/libpk/src/_config.rs @@ -55,6 +55,8 @@ pub struct ApiConfig { #[serde(default = "_default_api_addr")] pub addr: String, + pub avatars_service_url: Option, + #[serde(default)] pub ratelimit_redis_addr: Option, @@ -68,6 +70,7 @@ pub struct ApiConfig { pub struct AvatarsConfig { pub s3: S3Config, pub cdn_url: String, + pub edge_url: String, #[serde(default = "_default_api_addr")] pub bind_addr: String, @@ -76,9 +79,9 @@ pub struct AvatarsConfig { pub migrate_worker_count: u32, #[serde(default)] - pub cloudflare_zone_id: Option, + pub fastly_store_id: Option, #[serde(default)] - pub cloudflare_token: Option, + pub fastly_token: Option, } #[derive(Deserialize, Clone, Debug)] diff --git a/crates/libpk/src/db/repository/avatars.rs b/crates/libpk/src/db/repository/avatars.rs index 9a667c11..3f65def8 100644 --- a/crates/libpk/src/db/repository/avatars.rs +++ b/crates/libpk/src/db/repository/avatars.rs @@ -1,20 +1,60 @@ use sqlx::{PgPool, Postgres, Transaction}; +use uuid::Uuid; use crate::db::types::avatars::*; -pub async fn get_by_id(pool: &PgPool, id: String) -> anyhow::Result> { - Ok(sqlx::query_as("select * from images where id = $1") - .bind(id) - .fetch_optional(pool) - .await?) +pub async fn get_by_id( + pool: &PgPool, + system_uuid: Uuid, + id: Uuid, +) -> anyhow::Result> { + Ok(sqlx::query_as( + "select * from images_assets a join images_hashes h ON a.image = h.hash where id = $1 and system_uuid = $2", + ) + .bind(id) + .bind(system_uuid) + .fetch_optional(pool) + .await?) +} + +pub async fn get_by_system(pool: &PgPool, system_uuid: Uuid) -> anyhow::Result> { + Ok(sqlx::query_as( + "select * from images_assets a join images_hashes h ON a.image = h.hash where system_uuid = $1", + ) + .bind(system_uuid) + .fetch_all(pool) + .await?) +} + +pub async fn get_full_by_hash( + pool: &PgPool, + system_uuid: Uuid, + image_hash: String, +) -> anyhow::Result> { + Ok(sqlx::query_as( + "select * from images_assets a join images_hashes h ON a.image = h.hash where system_uuid = $1 and h.hash = $2", + ) + .bind(system_uuid) + .bind(image_hash) + .fetch_optional(pool) + .await?) +} + +pub async fn get_by_hash(pool: &PgPool, image_hash: String) -> anyhow::Result> { + Ok( + sqlx::query_as("select * from images_hashes where hash = $1") + .bind(image_hash) + .fetch_optional(pool) + .await?, + ) } pub async fn get_by_original_url( pool: &PgPool, original_url: &str, -) -> anyhow::Result> { +) -> anyhow::Result> { Ok( - sqlx::query_as("select * from images where original_url = $1") + sqlx::query_as("select * from images_assets a join images_hashes h ON a.image = h.hash where original_url = $1") .bind(original_url) .fetch_optional(pool) .await?, @@ -24,9 +64,9 @@ pub async fn get_by_original_url( pub async fn get_by_attachment_id( pool: &PgPool, attachment_id: u64, -) -> anyhow::Result> { +) -> anyhow::Result> { Ok( - sqlx::query_as("select * from images where original_attachment_id = $1") + sqlx::query_as("select * from images_assets a join images_hashes h ON a.image = h.hash where original_attachment_id = $1") .bind(attachment_id as i64) .fetch_optional(pool) .await?, @@ -73,28 +113,56 @@ pub async fn get_stats(pool: &PgPool) -> anyhow::Result { .await?) } -pub async fn add_image(pool: &PgPool, meta: ImageMeta) -> anyhow::Result { - let kind_str = match meta.kind { - ImageKind::Avatar => "avatar", - ImageKind::Banner => "banner", - }; +pub async fn add_image(pool: &PgPool, image: Image) -> anyhow::Result { + let kind_str = image.meta.kind.to_string(); - let res = sqlx::query("insert into images (id, url, content_type, original_url, file_size, width, height, original_file_size, original_type, original_attachment_id, kind, uploaded_by_account, uploaded_by_system, uploaded_at) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, (now() at time zone 'utc')) on conflict (id) do nothing") - .bind(meta.id) - .bind(meta.url) - .bind(meta.content_type) - .bind(meta.original_url) - .bind(meta.file_size) - .bind(meta.width) - .bind(meta.height) - .bind(meta.original_file_size) - .bind(meta.original_type) - .bind(meta.original_attachment_id) - .bind(kind_str) - .bind(meta.uploaded_by_account) - .bind(meta.uploaded_by_system) - .execute(pool).await?; - Ok(res.rows_affected() > 0) + add_image_data(pool, &image.data).await?; + + if let Some(img) = get_full_by_hash(pool, image.meta.system_uuid, image.meta.image).await? { + return Ok(ImageResult { + is_new: false, + uuid: img.meta.id, + }); + } + + let res: (uuid::Uuid,) = sqlx::query_as( + "insert into images_assets (system_uuid, image, proxy_image, kind, original_url, original_file_size, original_type, original_attachment_id, uploaded_by_account) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9) + returning id" + ) + .bind(image.meta.system_uuid) + .bind(image.data.hash) + .bind (image.meta.proxy_image) + .bind(kind_str) + .bind(image.meta.original_url) + .bind(image.meta.original_file_size) + .bind(image.meta.original_type) + .bind(image.meta.original_attachment_id) + .bind(image.meta.uploaded_by_account) + .fetch_one(pool) + .await?; + + Ok(ImageResult { + is_new: true, + uuid: res.0, + }) +} + +pub async fn add_image_data(pool: &PgPool, image_data: &ImageData) -> anyhow::Result<()> { + sqlx::query( + "insert into images_hashes (hash, url, file_size, width, height, content_type) + values ($1, $2, $3, $4, $5, $6) + on conflict (hash) do nothing", + ) + .bind(&image_data.hash) + .bind(&image_data.url) + .bind(image_data.file_size) + .bind(image_data.width) + .bind(image_data.height) + .bind(&image_data.content_type) + .execute(pool) + .await?; + return Ok(()); } pub async fn push_queue( diff --git a/crates/libpk/src/db/types/avatars.rs b/crates/libpk/src/db/types/avatars.rs index 0b07fbb2..25427e91 100644 --- a/crates/libpk/src/db/types/avatars.rs +++ b/crates/libpk/src/db/types/avatars.rs @@ -1,3 +1,5 @@ +use std::net::IpAddr; + use serde::{Deserialize, Serialize}; use sqlx::{ FromRow, @@ -5,23 +7,52 @@ use sqlx::{ }; use uuid::Uuid; -#[derive(FromRow)] -pub struct ImageMeta { - pub id: String, - pub kind: ImageKind, - pub content_type: String, +#[derive(FromRow, Serialize)] +pub struct ImageData { + pub hash: String, pub url: String, pub file_size: i32, pub width: i32, pub height: i32, - pub uploaded_at: Option>, + pub content_type: String, + pub created_at: Option>, +} +#[derive(FromRow, Serialize)] +pub struct ImageMeta { + pub id: Uuid, + #[serde(skip_serializing)] + pub system_uuid: Uuid, + #[serde(skip_serializing)] + pub image: String, + pub proxy_image: Option, + pub kind: ImageKind, + + #[serde(skip_serializing)] pub original_url: Option, - pub original_attachment_id: Option, + #[serde(skip_serializing)] pub original_file_size: Option, + #[serde(skip_serializing)] pub original_type: Option, + #[serde(skip_serializing)] + pub original_attachment_id: Option, + pub uploaded_by_account: Option, - pub uploaded_by_system: Option, + pub uploaded_by_ip: Option, + pub uploaded_at: Option>, +} + +#[derive(FromRow, Serialize)] +pub struct Image { + #[sqlx(flatten)] + pub meta: ImageMeta, + #[sqlx(flatten)] + pub data: ImageData, +} + +pub struct ImageResult { + pub is_new: bool, + pub uuid: Uuid, } #[derive(FromRow, Serialize)] @@ -36,6 +67,8 @@ pub struct Stats { pub enum ImageKind { Avatar, Banner, + PremiumAvatar, + PremiumBanner, } impl ImageKind { @@ -43,8 +76,30 @@ impl ImageKind { match self { Self::Avatar => (512, 512), Self::Banner => (1024, 1024), + Self::PremiumAvatar => (0, 0), + Self::PremiumBanner => (0, 0), } } + pub fn is_premium(&self) -> bool { + matches!(self, ImageKind::PremiumAvatar | ImageKind::PremiumBanner) + } + pub fn to_string(&self) -> &str { + return match self { + ImageKind::Avatar => "avatar", + ImageKind::Banner => "banner", + ImageKind::PremiumAvatar => "premium_avatar", + ImageKind::PremiumBanner => "premium_banner", + }; + } + pub fn from_string(str: &str) -> Option { + return match str { + "avatar" => Some(ImageKind::Avatar), + "banner" => Some(ImageKind::Banner), + "premium_avatar" => Some(ImageKind::PremiumAvatar), + "premium_banner" => Some(ImageKind::PremiumBanner), + _ => None, + }; + } } #[derive(FromRow)] diff --git a/crates/scheduled_tasks/src/main.rs b/crates/scheduled_tasks/src/main.rs index 805246f1..41e354f1 100644 --- a/crates/scheduled_tasks/src/main.rs +++ b/crates/scheduled_tasks/src/main.rs @@ -105,6 +105,12 @@ async fn main() -> anyhow::Result<()> { "queue deleted image cleanup job", queue_deleted_image_cleanup ); + // on hh:15 and hh:45 + doforever!( + "15,45 * * * *", + "queue orphaned hash cleanup job", + queue_orphaned_hash_cleanup + ); // non-standard cron: at hh:mm:00, hh:mm:30 doforever!("0,30 * * * * *", "stats api updater", update_stats_api); // every hour (could probably even be less frequent, basebackups are taken rarely) diff --git a/crates/scheduled_tasks/src/tasks.rs b/crates/scheduled_tasks/src/tasks.rs index 84773149..c610d26b 100644 --- a/crates/scheduled_tasks/src/tasks.rs +++ b/crates/scheduled_tasks/src/tasks.rs @@ -228,32 +228,44 @@ pub async fn update_discord_stats(ctx: AppCtx) -> anyhow::Result<()> { Ok(()) } +const IMAGE_CHECK_COLUMNS: &[(&str, &str)] = &[ + ("systems", "avatar_url"), + ("systems", "banner_image"), + ("system_guild", "avatar_url"), + ("members", "avatar_url"), + ("members", "banner_image"), + ("members", "webhook_avatar_url"), + ("member_guild", "avatar_url"), + ("groups", "icon"), + ("groups", "banner_image"), +]; + pub async fn queue_deleted_image_cleanup(ctx: AppCtx) -> anyhow::Result<()> { // if an image is present on no member, add it to the pending deletion queue // if it is still present on no member after 24h, actually delete it - - let usage_query = r#" - and not exists (select from systems where avatar_url = images.url) - and not exists (select from systems where banner_image = images.url) - and not exists (select from system_guild where avatar_url = images.url) - - and not exists (select from members where avatar_url = images.url) - and not exists (select from members where banner_image = images.url) - and not exists (select from members where webhook_avatar_url = images.url) - and not exists (select from member_guild where avatar_url = images.url) - - and not exists (select from groups where icon = images.url) - and not exists (select from groups where banner_image = images.url); - "#; + let mut usage_query = String::new(); + for (table, col) in IMAGE_CHECK_COLUMNS { + usage_query.push_str(&format!( + r#" + and not exists ( + select 1 from {table} + where {col} = h.url + or {col} like '%/' || a.system_uuid::text || '/' || a.id::text || '.%' + ) + "# + )); + } ctx.data .execute( format!( r#" insert into image_cleanup_pending_jobs - select id, now() from images where - not exists (select from image_cleanup_pending_jobs j where j.id = images.id) - and not exists (select from image_cleanup_jobs j where j.id = images.id) + select a.id, a.system_uuid, now() from images_assets a + join images_hashes h on a.image = h.hash where + a.kind not in ('premium_banner', 'premium_avatar') + and not exists (select from image_cleanup_pending_jobs j where j.id = a.id) + and not exists (select from image_cleanup_jobs j where j.id = a.id) {} "#, usage_query @@ -266,12 +278,14 @@ pub async fn queue_deleted_image_cleanup(ctx: AppCtx) -> anyhow::Result<()> { .execute( format!( r#" - insert into image_cleanup_jobs - select image_cleanup_pending_jobs.id from image_cleanup_pending_jobs - left join images on images.id = image_cleanup_pending_jobs.id + insert into image_cleanup_jobs (id, system_uuid) + select p.id, p.system_uuid from image_cleanup_pending_jobs p + join images_assets a on a.id = p.id + join images_hashes h on a.image = h.hash where - ts < now() - '24 hours'::interval - and not exists (select from image_cleanup_jobs j where j.id = images.id) + a.kind not in ('premium_banner', 'premium_avatar') + and ts < now() - '24 hours'::interval + and not exists (select from image_cleanup_jobs j where j.id = p.id) {} "#, usage_query @@ -283,6 +297,62 @@ pub async fn queue_deleted_image_cleanup(ctx: AppCtx) -> anyhow::Result<()> { Ok(()) } +pub async fn queue_orphaned_hash_cleanup(ctx: AppCtx) -> anyhow::Result<()> { + let mut usage_checks = String::new(); + for (table, col) in IMAGE_CHECK_COLUMNS { + usage_checks.push_str(&format!( + "and not exists (select 1 from {table} where {col} = h.url) " + )); + } + + ctx.data + .execute( + format!( + r#" + insert into image_hash_cleanup_pending_jobs (hash, ts) + select h.hash, now() + from images_hashes h + where + not exists ( + select 1 from images_assets a + where a.image = h.hash + or a.proxy_image = h.hash + ) + {usage_checks} + and not exists (select 1 from image_hash_cleanup_pending_jobs p where p.hash = h.hash) + and not exists (select 1 from image_hash_cleanup_jobs j where j.hash = h.hash) + "# + ) + .as_str(), + ) + .await?; + + ctx.data + .execute( + format!( + r#" + insert into image_hash_cleanup_jobs (hash) + select p.hash + from image_hash_cleanup_pending_jobs p + join images_hashes h ON h.hash = p.hash + where + p.ts < now() - '24 hours'::interval + and not exists ( + select 1 from images_assets a + where a.image = h.hash + or a.proxy_image = h.hash + ) + {usage_checks} + and not exists (select 1 from image_hash_cleanup_jobs j where j.hash = p.hash) + "# + ) + .as_str(), + ) + .await?; + + Ok(()) +} + pub async fn update_stats_api(ctx: AppCtx) -> anyhow::Result<()> { let client = ClientBuilder::new() .connect_timeout(Duration::from_secs(3))