mirror of
https://github.com/PluralKit/PluralKit.git
synced 2026-02-04 04:56:49 +00:00
feat(stats): query http gateway, wait until gateway up to collect discord stats
This commit is contained in:
parent
e4ed354536
commit
9ff824c37b
7 changed files with 130 additions and 17 deletions
3
.github/workflows/scheduled_tasks.yml
vendored
3
.github/workflows/scheduled_tasks.yml
vendored
|
|
@ -2,8 +2,9 @@ name: Build scheduled tasks runner Docker image
|
||||||
|
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
branches: [main]
|
branches: [main, gateway-service]
|
||||||
paths:
|
paths:
|
||||||
|
- .github/workflows/scheduled_tasks.yml
|
||||||
- 'services/scheduled_tasks/**'
|
- 'services/scheduled_tasks/**'
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
|
|
||||||
1
go.work
1
go.work
|
|
@ -2,5 +2,4 @@ go 1.19
|
||||||
|
|
||||||
use (
|
use (
|
||||||
./services/scheduled_tasks
|
./services/scheduled_tasks
|
||||||
./services/web-proxy
|
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -5,12 +5,15 @@ use axum::{
|
||||||
routing::get,
|
routing::get,
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
use serde_json::to_string;
|
use serde_json::{json, to_string};
|
||||||
use tracing::{error, info};
|
use tracing::{error, info};
|
||||||
use twilight_model::guild::Permissions;
|
use twilight_model::guild::Permissions;
|
||||||
use twilight_model::id::Id;
|
use twilight_model::id::Id;
|
||||||
|
|
||||||
use crate::discord::cache::{dm_channel, DiscordCache, DM_PERMISSIONS};
|
use crate::discord::{
|
||||||
|
cache::{dm_channel, DiscordCache, DM_PERMISSIONS},
|
||||||
|
gateway::cluster_config,
|
||||||
|
};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
fn status_code(code: StatusCode, body: String) -> Response {
|
fn status_code(code: StatusCode, body: String) -> Response {
|
||||||
|
|
@ -156,6 +159,17 @@ pub async fn run_server(cache: Arc<DiscordCache>) -> anyhow::Result<()> {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
|
|
||||||
|
.route("/stats", get(|State(cache): State<Arc<DiscordCache>>| async move {
|
||||||
|
let cluster = cluster_config();
|
||||||
|
let has_been_up = cache.2.read().await.len() as u32 == if cluster.total_shards > 16 {16} else {cluster.total_shards};
|
||||||
|
let stats = json!({
|
||||||
|
"guild_count": cache.0.stats().guilds(),
|
||||||
|
"channel_count": cache.0.stats().channels(),
|
||||||
|
"up": has_been_up,
|
||||||
|
});
|
||||||
|
status_code(StatusCode::FOUND, to_string(&stats).unwrap())
|
||||||
|
}))
|
||||||
|
|
||||||
.layer(axum::middleware::from_fn(crate::logger::logger))
|
.layer(axum::middleware::from_fn(crate::logger::logger))
|
||||||
.with_state(cache);
|
.with_state(cache);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
use anyhow::format_err;
|
use anyhow::format_err;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::RwLock;
|
||||||
use twilight_cache_inmemory::{
|
use twilight_cache_inmemory::{
|
||||||
model::CachedMember,
|
model::CachedMember,
|
||||||
permission::{MemberRoles, RootError},
|
permission::{MemberRoles, RootError},
|
||||||
|
|
@ -110,10 +111,14 @@ pub fn new() -> DiscordCache {
|
||||||
.build(),
|
.build(),
|
||||||
);
|
);
|
||||||
|
|
||||||
DiscordCache(cache, client)
|
DiscordCache(cache, client, RwLock::new(Vec::new()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct DiscordCache(pub Arc<InMemoryCache>, pub Arc<twilight_http::Client>);
|
pub struct DiscordCache(
|
||||||
|
pub Arc<InMemoryCache>,
|
||||||
|
pub Arc<twilight_http::Client>,
|
||||||
|
pub RwLock<Vec<u32>>,
|
||||||
|
);
|
||||||
|
|
||||||
impl DiscordCache {
|
impl DiscordCache {
|
||||||
pub async fn guild_permissions(
|
pub async fn guild_permissions(
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
use libpk::_config::ClusterSettings;
|
||||||
use std::sync::{mpsc::Sender, Arc};
|
use std::sync::{mpsc::Sender, Arc};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
use twilight_gateway::{
|
use twilight_gateway::{
|
||||||
|
|
@ -13,6 +14,18 @@ use crate::discord::identify_queue::{self, RedisQueue};
|
||||||
|
|
||||||
use super::{cache::DiscordCache, shard_state::ShardStateManager};
|
use super::{cache::DiscordCache, shard_state::ShardStateManager};
|
||||||
|
|
||||||
|
pub fn cluster_config() -> ClusterSettings {
|
||||||
|
libpk::config
|
||||||
|
.discord
|
||||||
|
.cluster
|
||||||
|
.clone()
|
||||||
|
.unwrap_or(libpk::_config::ClusterSettings {
|
||||||
|
node_id: 0,
|
||||||
|
total_shards: 1,
|
||||||
|
total_nodes: 1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create_shards(redis: fred::pool::RedisPool) -> anyhow::Result<Vec<Shard<RedisQueue>>> {
|
pub fn create_shards(redis: fred::pool::RedisPool) -> anyhow::Result<Vec<Shard<RedisQueue>>> {
|
||||||
let intents = Intents::GUILDS
|
let intents = Intents::GUILDS
|
||||||
| Intents::DIRECT_MESSAGES
|
| Intents::DIRECT_MESSAGES
|
||||||
|
|
@ -23,16 +36,7 @@ pub fn create_shards(redis: fred::pool::RedisPool) -> anyhow::Result<Vec<Shard<R
|
||||||
|
|
||||||
let queue = identify_queue::new(redis);
|
let queue = identify_queue::new(redis);
|
||||||
|
|
||||||
let cluster_settings =
|
let cluster_settings = cluster_config();
|
||||||
libpk::config
|
|
||||||
.discord
|
|
||||||
.cluster
|
|
||||||
.clone()
|
|
||||||
.unwrap_or(libpk::_config::ClusterSettings {
|
|
||||||
node_id: 0,
|
|
||||||
total_shards: 1,
|
|
||||||
total_nodes: 1,
|
|
||||||
});
|
|
||||||
|
|
||||||
let (start_shard, end_shard): (u32, u32) = if cluster_settings.total_shards < 16 {
|
let (start_shard, end_shard): (u32, u32) = if cluster_settings.total_shards < 16 {
|
||||||
warn!("we have less than 16 shards, assuming single gateway process");
|
warn!("we have less than 16 shards, assuming single gateway process");
|
||||||
|
|
@ -77,6 +81,11 @@ pub async fn runner(
|
||||||
{
|
{
|
||||||
tracing::warn!(?error, "error updating redis state")
|
tracing::warn!(?error, "error updating redis state")
|
||||||
}
|
}
|
||||||
|
if let Event::Ready(_) = event {
|
||||||
|
if !cache.2.read().await.contains(&shard.id().number()) {
|
||||||
|
cache.2.write().await.push(shard.id().number());
|
||||||
|
}
|
||||||
|
}
|
||||||
cache.0.update(&event);
|
cache.0.update(&event);
|
||||||
//if let Err(error) = tx.send((shard.id(), event)) {
|
//if let Err(error) = tx.send((shard.id(), event)) {
|
||||||
// tracing::warn!(?error, "error sending event to global handler: {error}",);
|
// tracing::warn!(?error, "error sending event to global handler: {error}",);
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,93 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type httpstats struct {
|
||||||
|
Up bool `json:"up"`
|
||||||
|
GuildCount int `json:"guild_count"`
|
||||||
|
ChannelCount int `json:"channel_count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func query_http_cache() []httpstats {
|
||||||
|
var values []httpstats
|
||||||
|
|
||||||
|
url := os.Getenv("CONSUL_URL")
|
||||||
|
if url == "" {
|
||||||
|
panic("missing CONSUL_URL in environment")
|
||||||
|
}
|
||||||
|
|
||||||
|
expected_gateway_count, err := strconv.Atoi(os.Getenv("EXPECTED_GATEWAY_COUNT"))
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Sprintf("missing or invalid EXPECTED_GATEWAY_COUNT in environment"))
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.Get(fmt.Sprintf("%v/v1/health/service/pluralkit-gateway", url))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
panic(fmt.Sprintf("got status %v trying to query consul for all_gateway_instances", resp.Status))
|
||||||
|
}
|
||||||
|
|
||||||
|
var ips []string
|
||||||
|
|
||||||
|
data, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
var cs []any
|
||||||
|
err = json.Unmarshal(data, &cs)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(cs) != expected_gateway_count {
|
||||||
|
panic(fmt.Sprintf("got unexpected number of gateway instances from consul (expected %v, got %v)", expected_gateway_count, len(cs)))
|
||||||
|
}
|
||||||
|
|
||||||
|
for idx, itm := range cs {
|
||||||
|
if ip, ok := itm.(map[string]any)["Service"].(map[string]any)["Address"].(string); ok {
|
||||||
|
ips = append(ips, ip)
|
||||||
|
} else {
|
||||||
|
panic(fmt.Sprintf("got bad data from consul for all_gateway_instances, at index %v", idx))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("querying %v gateway clusters for discord stats\n", len(ips))
|
||||||
|
|
||||||
|
for _, ip := range ips {
|
||||||
|
resp, err := http.Get("http://"+ip+":5000/stats")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
if resp.StatusCode != http.StatusFound {
|
||||||
|
panic(fmt.Sprintf("got status %v trying to query %v:5000", resp.Status, ip))
|
||||||
|
}
|
||||||
|
var s httpstats
|
||||||
|
data, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
err = json.Unmarshal(data, &s)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
if s.Up == false {
|
||||||
|
panic("gateway is not up yet, skipping stats collection")
|
||||||
|
}
|
||||||
|
values = append(values, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
return values
|
||||||
|
}
|
||||||
|
|
||||||
type rstatval struct {
|
type rstatval struct {
|
||||||
GuildCount int `json:"GuildCount"`
|
GuildCount int `json:"GuildCount"`
|
||||||
ChannelCount int `json:"ChannelCount"`
|
ChannelCount int `json:"ChannelCount"`
|
||||||
|
|
|
||||||
|
|
@ -49,7 +49,7 @@ func update_db_message_meta() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func get_discord_counts() (int, int) {
|
func get_discord_counts() (int, int) {
|
||||||
redisStats := run_redis_query()
|
redisStats := query_http_cache()
|
||||||
|
|
||||||
guild_count := 0
|
guild_count := 0
|
||||||
channel_count := 0
|
channel_count := 0
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue