add /api/v2/bulk endpoint

also, initial support for patch models in rust!
This commit is contained in:
alyssa 2025-09-03 00:35:25 +00:00
parent 1776902000
commit 034865cc13
12 changed files with 715 additions and 32 deletions

View file

@ -0,0 +1,211 @@
use axum::{
Extension, Json,
extract::{Json as ExtractJson, State},
response::IntoResponse,
};
use pk_macros::api_endpoint;
use sea_query::{Expr, ExprTrait, PostgresQueryBuilder};
use sea_query_sqlx::SqlxBinder;
use serde_json::{Value, json};
use pluralkit_models::{PKGroup, PKGroupPatch, PKMember, PKMemberPatch, PKSystem};
use crate::{
ApiContext,
auth::AuthState,
error::{
GENERIC_AUTH_ERROR, NOT_OWN_GROUP, NOT_OWN_MEMBER, PKError, TARGET_GROUP_NOT_FOUND,
TARGET_MEMBER_NOT_FOUND,
},
};
#[derive(serde::Deserialize, Debug)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum BulkActionRequestFilter {
All,
Ids { ids: Vec<String> },
Connection { id: String },
}
#[derive(serde::Deserialize, Debug)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum BulkActionRequest {
Member {
filter: BulkActionRequestFilter,
patch: PKMemberPatch,
},
Group {
filter: BulkActionRequestFilter,
patch: PKGroupPatch,
},
}
#[api_endpoint]
pub async fn bulk(
Extension(auth): Extension<AuthState>,
State(ctx): State<ApiContext>,
ExtractJson(req): ExtractJson<BulkActionRequest>,
) -> Json<Value> {
let Some(system_id) = auth.system_id() else {
return Err(GENERIC_AUTH_ERROR);
};
#[derive(sqlx::FromRow)]
struct Ider {
id: i32,
hid: String,
uuid: String,
}
#[derive(sqlx::FromRow)]
struct GroupMemberEntry {
member_id: i32,
group_id: i32,
}
#[allow(dead_code)]
#[derive(sqlx::FromRow)]
struct OnlyIder {
id: i32,
}
println!("BulkActionRequest::{req:#?}");
match req {
BulkActionRequest::Member { filter, mut patch } => {
patch.validate_bulk();
if patch.errors().len() > 0 {
return Err(PKError::from_validation_errors(patch.errors()));
}
let ids: Vec<i32> = match filter {
BulkActionRequestFilter::All => {
let ids: Vec<Ider> = sqlx::query_as("select id from members where system = $1")
.bind(system_id as i64)
.fetch_all(&ctx.db)
.await?;
ids.iter().map(|v| v.id).collect()
}
BulkActionRequestFilter::Ids { ids } => {
let members: Vec<PKMember> = sqlx::query_as(
"select * from members where hid = any($1::array) or uuid::text = any($1::array)",
)
.bind(&ids)
.fetch_all(&ctx.db)
.await?;
// todo: better errors
if members.len() != ids.len() {
return Err(TARGET_MEMBER_NOT_FOUND);
}
if members.iter().any(|m| m.system != system_id) {
return Err(NOT_OWN_MEMBER);
}
members.iter().map(|m| m.id).collect()
}
BulkActionRequestFilter::Connection { id } => {
let Some(group): Option<PKGroup> =
sqlx::query_as("select * from groups where hid = $1 or uuid::text = $1")
.bind(id)
.fetch_optional(&ctx.db)
.await?
else {
return Err(TARGET_GROUP_NOT_FOUND);
};
if group.system != system_id {
return Err(NOT_OWN_GROUP);
}
let entries: Vec<GroupMemberEntry> =
sqlx::query_as("select * from group_members where group_id = $1")
.bind(group.id)
.fetch_all(&ctx.db)
.await?;
entries.iter().map(|v| v.member_id).collect()
}
};
let (q, pms) = patch
.to_sql()
.table("members") // todo: this should be in the model definition
.and_where(Expr::col("id").is_in(ids))
.returning_col("id")
.build_sqlx(PostgresQueryBuilder);
let res: Vec<OnlyIder> = sqlx::query_as_with(&q, pms).fetch_all(&ctx.db).await?;
Ok(Json(json! {{ "updated": res.len() }}))
}
BulkActionRequest::Group { filter, mut patch } => {
patch.validate_bulk();
if patch.errors().len() > 0 {
return Err(PKError::from_validation_errors(patch.errors()));
}
let ids: Vec<i32> = match filter {
BulkActionRequestFilter::All => {
let ids: Vec<Ider> = sqlx::query_as("select id from groups where system = $1")
.bind(system_id as i64)
.fetch_all(&ctx.db)
.await?;
ids.iter().map(|v| v.id).collect()
}
BulkActionRequestFilter::Ids { ids } => {
let groups: Vec<PKGroup> = sqlx::query_as(
"select * from groups where hid = any($1) or uuid::text = any($1)",
)
.bind(&ids)
.fetch_all(&ctx.db)
.await?;
// todo: better errors
if groups.len() != ids.len() {
return Err(TARGET_GROUP_NOT_FOUND);
}
if groups.iter().any(|m| m.system != system_id) {
return Err(NOT_OWN_GROUP);
}
groups.iter().map(|m| m.id).collect()
}
BulkActionRequestFilter::Connection { id } => {
let Some(member): Option<PKMember> =
sqlx::query_as("select * from members where hid = $1 or uuid::text = $1")
.bind(id)
.fetch_optional(&ctx.db)
.await?
else {
return Err(TARGET_MEMBER_NOT_FOUND);
};
if member.system != system_id {
return Err(NOT_OWN_MEMBER);
}
let entries: Vec<GroupMemberEntry> =
sqlx::query_as("select * from group_members where member_id = $1")
.bind(member.id)
.fetch_all(&ctx.db)
.await?;
entries.iter().map(|v| v.group_id).collect()
}
};
let (q, pms) = patch
.to_sql()
.table("groups") // todo: this should be in the model definition
.and_where(Expr::col("id").is_in(ids))
.returning_col("id")
.build_sqlx(PostgresQueryBuilder);
println!("{q:#?} {pms:#?}");
let res: Vec<OnlyIder> = sqlx::query_as_with(&q, pms).fetch_all(&ctx.db).await?;
Ok(Json(json! {{ "updated": res.len() }}))
}
}
}

View file

@ -1,2 +1,3 @@
pub mod bulk;
pub mod private;
pub mod system;

View file

@ -2,6 +2,7 @@ use axum::{
http::StatusCode,
response::{IntoResponse, Response},
};
use pluralkit_models::ValidationError;
use std::fmt;
// todo: model parse errors
@ -11,6 +12,8 @@ pub struct PKError {
pub json_code: i32,
pub message: &'static str,
pub errors: Vec<ValidationError>,
pub inner: Option<anyhow::Error>,
}
@ -30,6 +33,21 @@ impl Clone for PKError {
json_code: self.json_code,
message: self.message,
inner: None,
errors: self.errors.clone(),
}
}
}
// can't `impl From<Vec<ValidationError>>`
// because "upstream crate may add a new impl" >:(
impl PKError {
pub fn from_validation_errors(errs: Vec<ValidationError>) -> Self {
Self {
message: "Error parsing JSON model",
json_code: 40001,
errors: errs,
response_code: StatusCode::BAD_REQUEST,
inner: None,
}
}
}
@ -50,14 +68,19 @@ impl IntoResponse for PKError {
if let Some(inner) = self.inner {
tracing::error!(?inner, "error returned from handler");
}
crate::util::json_err(
self.response_code,
serde_json::to_string(&serde_json::json!({
let json = if self.errors.len() > 0 {
serde_json::json!({
"message": self.message,
"code": self.json_code,
}))
.unwrap(),
)
"errors": self.errors,
})
} else {
serde_json::json!({
"message": self.message,
"code": self.json_code,
})
};
crate::util::json_err(self.response_code, serde_json::to_string(&json).unwrap())
}
}
@ -78,9 +101,17 @@ macro_rules! define_error {
json_code: $json_code,
message: $message,
inner: None,
errors: vec![],
};
};
}
define_error! { GENERIC_AUTH_ERROR, StatusCode::UNAUTHORIZED, 0, "401: Missing or invalid Authorization header" }
define_error! { GENERIC_BAD_REQUEST, StatusCode::BAD_REQUEST, 0, "400: Bad Request" }
define_error! { GENERIC_SERVER_ERROR, StatusCode::INTERNAL_SERVER_ERROR, 0, "500: Internal Server Error" }
define_error! { NOT_OWN_MEMBER, StatusCode::FORBIDDEN, 30006, "Target member is not part of your system." }
define_error! { NOT_OWN_GROUP, StatusCode::FORBIDDEN, 30007, "Target group is not part of your system." }
define_error! { TARGET_MEMBER_NOT_FOUND, StatusCode::BAD_REQUEST, 40010, "Target member not found." }
define_error! { TARGET_GROUP_NOT_FOUND, StatusCode::BAD_REQUEST, 40011, "Target group not found." }

View file

@ -115,6 +115,8 @@ fn router(ctx: ApiContext) -> Router {
.route("/v2/messages/{message_id}", get(rproxy))
.route("/v2/bulk", post(endpoints::bulk::bulk))
.route("/private/bulk_privacy/member", post(rproxy))
.route("/private/bulk_privacy/group", post(rproxy))
.route("/private/discord/callback", post(rproxy))