feat: add basic api/gateway metrics

This commit is contained in:
alyssa 2024-11-14 13:39:47 +09:00
parent 8831e8fabe
commit 93f8786da1
8 changed files with 141 additions and 53 deletions

View file

@ -133,11 +133,11 @@ async fn main() -> anyhow::Result<()> {
.route("/v2/members/:member_id/oembed.json", get(rproxy))
.route("/v2/groups/:group_id/oembed.json", get(rproxy))
.layer(axum::middleware::from_fn(middleware::logger))
.layer(middleware::ratelimit::ratelimiter(middleware::ratelimit::do_request_ratelimited)) // this sucks
.layer(axum::middleware::from_fn_with_state(ctx.clone(), middleware::authnz))
.layer(axum::middleware::from_fn(middleware::ignore_invalid_routes))
.layer(axum::middleware::from_fn(middleware::cors))
.layer(axum::middleware::from_fn(middleware::logger))
.layer(tower_http::catch_panic::CatchPanicLayer::custom(util::handle_panic))

View file

@ -8,6 +8,8 @@ use tracing::error;
use crate::ApiContext;
use super::logger::DID_AUTHENTICATE_HEADER;
pub async fn authnz(State(ctx): State<ApiContext>, mut request: Request, next: Next) -> Response {
let headers = request.headers_mut();
headers.remove("x-pluralkit-systemid");
@ -15,6 +17,7 @@ pub async fn authnz(State(ctx): State<ApiContext>, mut request: Request, next: N
.get("authorization")
.map(|h| h.to_str().ok())
.flatten();
let mut authenticated = false;
if let Some(auth_header) = auth_header {
if let Some(system_id) =
match libpk::db::repository::legacy_token_auth(&ctx.db, auth_header).await {
@ -29,7 +32,14 @@ pub async fn authnz(State(ctx): State<ApiContext>, mut request: Request, next: N
"x-pluralkit-systemid",
HeaderValue::from_str(format!("{system_id}").as_str()).unwrap(),
);
authenticated = true;
}
}
next.run(request).await
let mut response = next.run(request).await;
if authenticated {
response
.headers_mut()
.insert(DID_AUTHENTICATE_HEADER, HeaderValue::from_static("1"));
}
response
}

View file

@ -1,7 +1,7 @@
use std::time::Instant;
use axum::{extract::MatchedPath, extract::Request, middleware::Next, response::Response};
use metrics::histogram;
use metrics::{counter, histogram};
use tracing::{info, span, warn, Instrument, Level};
use crate::util::header_or_unknown;
@ -10,11 +10,12 @@ use crate::util::header_or_unknown;
// todo: change as necessary
const MIN_LOG_TIME: u128 = 2_000;
pub const DID_AUTHENTICATE_HEADER: &'static str = "x-pluralkit-didauthenticate";
pub async fn logger(request: Request, next: Next) -> Response {
let method = request.method().clone();
let request_id = header_or_unknown(request.headers().get("Fly-Request-Id"));
let remote_ip = header_or_unknown(request.headers().get("Fly-Client-IP"));
let remote_ip = header_or_unknown(request.headers().get("X-PluralKit-Client-IP"));
let user_agent = header_or_unknown(request.headers().get("User-Agent"));
let endpoint = request
@ -26,10 +27,9 @@ pub async fn logger(request: Request, next: Next) -> Response {
let uri = request.uri().clone();
let request_id_span = span!(
let request_span = span!(
Level::INFO,
"request",
request_id,
remote_ip,
method = method.as_str(),
endpoint = endpoint.clone(),
@ -37,9 +37,37 @@ pub async fn logger(request: Request, next: Next) -> Response {
);
let start = Instant::now();
let response = next.run(request).instrument(request_id_span).await;
let mut response = next.run(request).instrument(request_span).await;
let elapsed = start.elapsed().as_millis();
let authenticated = {
let headers = response.headers_mut();
println!("{:#?}", headers.keys());
if headers.contains_key(DID_AUTHENTICATE_HEADER) {
headers.remove(DID_AUTHENTICATE_HEADER);
true
} else {
false
}
};
counter!(
"pluralkit_api_requests",
"method" => method.to_string(),
"endpoint" => endpoint.clone(),
"status" => response.status().to_string(),
"authenticated" => authenticated.to_string(),
)
.increment(1);
histogram!(
"pluralkit_api_requests_bucket",
"method" => method.to_string(),
"endpoint" => endpoint.clone(),
"status" => response.status().to_string(),
"authenticated" => authenticated.to_string(),
)
.record(elapsed as f64 / 1_000_f64);
info!(
"{} handled request for {} {} in {}ms",
response.status(),
@ -47,15 +75,17 @@ pub async fn logger(request: Request, next: Next) -> Response {
endpoint,
elapsed
);
histogram!(
"pk_http_requests",
"method" => method.to_string(),
"route" => endpoint.clone(),
"status" => response.status().to_string()
)
.record((elapsed as f64) / 1_000_f64);
if elapsed > MIN_LOG_TIME {
counter!(
"pluralkit_api_slow_requests_count",
"method" => method.to_string(),
"endpoint" => endpoint.clone(),
"status" => response.status().to_string(),
"authenticated" => authenticated.to_string(),
)
.increment(1);
warn!(
"request to {} full path {} (endpoint {}) took a long time ({}ms)!",
method,

View file

@ -24,3 +24,5 @@ twilight-cache-inmemory = { workspace = true }
twilight-util = { workspace = true }
twilight-model = { workspace = true }
twilight-http = { workspace = true }
serde_variant = "0.1.3"

View file

@ -1,4 +1,5 @@
use libpk::_config::ClusterSettings;
use metrics::counter;
use std::sync::{mpsc::Sender, Arc};
use tracing::{info, warn};
use twilight_gateway::{
@ -85,6 +86,12 @@ pub async fn runner(
while let Some(item) = shard.next_event(EventTypeFlags::all()).await {
match item {
Ok(event) => {
counter!(
"pluralkit_gateway_events",
"shard_id" => shard.id().number().to_string(),
"event_type" => serde_variant::to_variant_name(&event.kind()).unwrap(),
)
.increment(1);
if let Err(error) = shard_state
.handle_event(shard.id().number(), event.clone())
.await

View file

@ -1,5 +1,6 @@
use bytes::Bytes;
use fred::{clients::RedisPool, interfaces::HashesInterface};
use metrics::{counter, gauge};
use prost::Message;
use tracing::info;
use twilight_gateway::Event;
@ -59,6 +60,13 @@ impl ShardStateManager {
shard_id,
if resumed { "resumed" } else { "ready" }
);
counter!(
"pluralkit_gateway_shard_reconnect",
"shard_id" => shard_id.to_string(),
"resumed" => resumed.to_string(),
)
.increment(1);
gauge!("pluralkit_gateway_shard_up").increment(1);
let mut info = self.get_shard(shard_id).await?;
info.last_connection = chrono::offset::Utc::now().timestamp() as i32;
info.up = true;
@ -68,6 +76,7 @@ impl ShardStateManager {
async fn socket_closed(&self, shard_id: u32) -> anyhow::Result<()> {
info!("shard {} closed", shard_id);
gauge!("pluralkit_gateway_shard_up").decrement(1);
let mut info = self.get_shard(shard_id).await?;
info.up = false;
info.disconnection_count += 1;

View file

@ -1,6 +1,9 @@
use std::time::Instant;
use axum::{extract::MatchedPath, extract::Request, middleware::Next, response::Response};
use axum::{
extract::MatchedPath, extract::Request, http::StatusCode, middleware::Next, response::Response,
};
use metrics::{counter, histogram};
use tracing::{info, span, warn, Instrument, Level};
// log any requests that take longer than 2 seconds
@ -30,13 +33,30 @@ pub async fn logger(request: Request, next: Next) -> Response {
let response = next.run(request).instrument(request_id_span).await;
let elapsed = start.elapsed().as_millis();
info!(
"{} handled request for {} {} in {}ms",
response.status(),
method,
uri.path(),
elapsed
);
counter!(
"pluralkit_gateway_cache_api_requests",
"method" => method.to_string(),
"endpoint" => endpoint.clone(),
"status" => response.status().to_string(),
)
.increment(1);
histogram!(
"pluralkit_gateway_cache_api_requests_bucket",
"method" => method.to_string(),
"endpoint" => endpoint.clone(),
"status" => response.status().to_string(),
)
.record(elapsed as f64 / 1_000_f64);
if response.status() != StatusCode::FOUND {
info!(
"{} handled request for {} {} in {}ms",
response.status(),
method,
uri.path(),
elapsed
);
}
if elapsed > MIN_LOG_TIME {
warn!(