queue system to fix data malformation with rapid events

This commit is contained in:
bee! 2022-05-09 08:07:44 -07:00
parent e696fe9684
commit 4728714d72
No known key found for this signature in database
GPG key ID: A350C9117C864EB7
5 changed files with 139 additions and 81 deletions

View file

@ -1,10 +1,10 @@
# Compatiplural # Compatiplural
url_override="https://api.apparyllis.com" url_override="https://v2.apparyllis.com"
api_version="v1" api_version="v1"
socket="wss://api.apparyllis.com/v1/socket" socket="wss://v2.apparyllis.com/v1/socket"
pk_url="https://api.pluralkit.me/v2" pk_url="https://api.pluralkit.me/v2"
token="AAAAAAAAAAAAAAAAAAAA" token="SIMPLYPLURAL_TOKEN"
userId="AAAAAAAAAAAAAAAAAAA" userId="abcd1234"
pk_token= "AAAAAAAAAAAAAAAA" pk_token= "PLURALKIT_TOKEN"
pk_system="AAAAAAAAAAAAAAAA" heartbeat=4500000
heartbeat=4500000 max_workers=1

View file

@ -9,12 +9,12 @@ This project already has a Procfile set up, so it's super easy to get started. O
These can be set either in the .env file, in terminal, or in the config vars section of Heroku. These can be set either in the .env file, in terminal, or in the config vars section of Heroku.
| Setting | Default | Description | | Setting | Default | Description |
| ---------| ------- | ------------------ | | ---------| ------- | ------------------ |
| url_override | https://api.apparyllis.com | The base URL for all SimplyPlural API requests. Unless you are running your own fork of Simply Plural, you shouldn't change this. | | url_override | https://v2.apparyllis.com | The base URL for all SimplyPlural API requests. Unless you are running your own fork of Simply Plural, you shouldn't change this. |
| api_version | v1 | The target SimplyPlural API version. Unless you are running your own fork of Simply Plural, you shouldn't change this. | | api_version | v1 | The target SimplyPlural API version. Unless you are running your own fork of Simply Plural, you shouldn't change this. |
| socket | wss://api.apparyllis.com/v1/socket | The socket URL for SimplyPlural. Unless you are running your own fork of Simply Plural, you shouldn't change this. | | socket | wss://v2.apparyllis.com/v1/socket | The socket URL for SimplyPlural. Unless you are running your own fork of Simply Plural, you shouldn't change this. |
| pk_url | https://api.pluralkit.me/v2 | The base URL for all PluralKit API requests. Unless you are running your own fork of PluralKit, you shouldn't change this. | | pk_url | https://api.pluralkit.me/v2 | The base URL for all PluralKit API requests. Unless you are running your own fork of PluralKit, you shouldn't change this. |
| token | token_here | Your SimplyPlural account token. As of now, the only permission necessary is the Read permission. | | token | token_here | Your SimplyPlural account token. As of now, the only permission necessary is the Read permission. |
| userId | user_id | Your SimplyPlural account/system ID. You can find it in account info near the bottom. | | userId | user_id | Your SimplyPlural account/system ID. You can find it in account info near the bottom. |
| pk_token | pluralkit_token_here | Your PluralKit token. Get it by using `pk;token`. | | pk_token | pluralkit_token_here | Your PluralKit token. Get it by using `pk;token`. |
| pk_system | pluralkit_system_id | Your Pluralkit system ID. This can be either your Discord account ID or your 5 letter ID shown by using pk;system. |
| heartbeat | 4500000 | The time in miliseconds before the websocket client reconnects to the websocket server. | | heartbeat | 4500000 | The time in miliseconds before the websocket client reconnects to the websocket server. |
| max_workers | 1 | Max number of workers for processing enqueued tasks. This probably shouldn't be changed. |

View file

@ -3,7 +3,7 @@ const { Config, System, Util } = require('simplyapi')
const pkUrl = Config.pk_url const pkUrl = Config.pk_url
const pkHeader = { const pkHeader = {
'Content-Type': 'application/json', 'Content-Type': 'application/json; charset=UTF-8',
'Authorization': Config.pk_token 'Authorization': Config.pk_token
} }
@ -13,10 +13,6 @@ async function initializeCache() {
cache.frontHistory = await system.getFronters() cache.frontHistory = await system.getFronters()
} }
function unknownError400() {
return
}
function unknownTarget(target) { function unknownTarget(target) {
console.log('::SimplyWS:: Unknown update target: ' + target + '\n::SimplyWS:: Full message: ' + e) console.log('::SimplyWS:: Unknown update target: ' + target + '\n::SimplyWS:: Full message: ' + e)
} }
@ -25,15 +21,9 @@ function unrecognizedMessage(msg) {
console.log('::SimplyWS:: Unrecognized message: ' + msg + '\n::SimplyWS:: Full message: ' + e) console.log('::SimplyWS:: Unrecognized message: ' + msg + '\n::SimplyWS:: Full message: ' + e)
} }
// async function asyncForEach(array, callback) {
// for (let index = 0; index < array.length; index++) {
// await callback(array[index], index, array)
// }
// }
async function getPKFronters() { async function getPKFronters() {
let members = [] let members = []
let fronters = await axios.get(`${pkUrl}/systems/${Config.pk_system}/fronters`, { let fronters = await axios.get(`${pkUrl}/systems/@me/fronters`, {
headers: pkHeader headers: pkHeader
}) })
.catch((err) => { .catch((err) => {
@ -118,9 +108,13 @@ async function determineAction(eventData, frontData = []) {
async function insertFront(member) { async function insertFront(member) {
// get current fronters and add new fronter // get current fronters and add new fronter
let system = new System(Config)
let fronters = await getPKFronters() let fronters = await getPKFronters()
fronters.push(member.content.pkId) if (!fronters.includes(member.content.pkId)) {
fronters.push(member.content.pkId)
} else {
console.warn('::SimplyWS:: Member already in fronters: ' + member.content.pkId)
return
}
// find the "primary" fronter to move to the first element in the list // find the "primary" fronter to move to the first element in the list
let primary = await findPrimary() let primary = await findPrimary()
@ -131,38 +125,57 @@ async function insertFront(member) {
} }
} }
// cache front // post the new switch
cache.frontHistory = await system.getFronters() let url = `${pkUrl}/systems/@me/switches`
await axios.post(url, JSON.stringify({ "members": fronters }), {
// post the new switch
axios.post(`${pkUrl}/systems/${Config.pk_system}/switches`, JSON.stringify({ "members": fronters }), {
headers: pkHeader headers: pkHeader
}) })
.catch(err => { .then(async (res) => {
if (err.toJSON().status == 400) let front = await getPKFronters()
unknownError400() if (!front.includes(member.content.pkId)) {
else if (err.toJSON().status == 429) console.log('::SimplyWS:: Failed to insert fronter: ' + member.content.pkId)
await insertFront(member)
return
} else {
console.log('::SimplyWS:: ' + member.content.name + ' was added to the front.')
}
})
.catch(async err => {
let status = err.status || err.toJSON().status
if (status == 400) {
// if the fronter is already in the front, do nothing
return
}
else if (status == 404) {
// member not found
console.error("::SimplyWS:: Could not find member: " + member.content.pkId)
let index = fronters.indexOf(member.content.pkId)
fronters.splice(index, 1)
return
}
else if (status == 429) {
// Too many requests // Too many requests
setTimeout(function () { console.warn("::SimplyWS:: Too many requests, waiting to try again.")
insertFront(member) let index = fronters.indexOf(member.content.pkId)
fronters.splice(index, 1)
setTimeout(async function () {
await insertFront(member)
}, 1000) }, 1000)
return return
}
}) })
let checkFront = await getPKFronters()
if (!checkFront.includes(member.content.pkId)) {
await insertFront(member)
return
} else {
console.log('::SimplyWS:: ' + member.content.name + ' was added to the front.')
}
} }
async function removeFront(member) { async function removeFront(member) {
let system = new System(Config)
let fronters = await getPKFronters() let fronters = await getPKFronters()
let index = fronters.indexOf(member.content.pkId)
fronters.splice(index, 1) if (fronters.includes(member.content.pkId)) {
let index = fronters.indexOf(member.content.pkId)
fronters.splice(index, 1)
} else {
console.warn('::SimplyWS:: Member is not in front: ' + member.content.pkId)
return
}
// find the "primary" fronter to move to the first element in the list // find the "primary" fronter to move to the first element in the list
let p = await findPrimary() let p = await findPrimary()
@ -173,31 +186,37 @@ async function removeFront(member) {
} }
} }
// cache front let url = `${pkUrl}/systems/@me/switches`
cache.frontHistory = await system.getFronters() await axios.post(url, JSON.stringify({ "members": fronters }), {
// post the new switch
axios.post(`${pkUrl}/systems/${Config.pk_system}/switches`, JSON.stringify({ "members": fronters }), {
headers: pkHeader headers: pkHeader
}) })
.catch(err => { .then(async (res) => {
if (err.toJSON().status == 400) let front = await getPKFronters()
unknownError400() if (front.includes(member.content.pkId)) {
else if (err.toJSON().status == 429) console.log('::SimplyWS:: Failed to remove fronter: ' + member.content.pkId)
await removeFront(member)
return
} else {
console.log('::SimplyWS:: ' + member.content.name + ' was removed from the front.')
}
})
.catch(async err => {
let status = err.status || err.toJSON().status
if (status == 400) {
// fronter is already not in front
console.warn("::SimplyWS:: " + member.content.name + " is not in the front.")
return
}
else if (status == 429) {
// Too many requests // Too many requests
setTimeout(function () { console.warn("::SimplyWS:: Too many requests, waiting to try again.")
removeFront(member) fronters.push(member.content.pkId)
setTimeout(async function () {
await removeFront(member)
}, 1000) }, 1000)
return return
}
}) })
let checkFront = await getPKFronters()
if (checkFront.includes(member.content.pkId)) {
await removeFront(member)
return
} else {
console.log('::SimplyWS:: ' + member.content.name + ' was removed from the front.')
}
} }
async function updateCustomStatus(member) { async function updateCustomStatus(member) {
@ -210,20 +229,18 @@ async function updateCustomStatus(member) {
fronters.splice(fronters.indexOf(primary), 1) fronters.splice(fronters.indexOf(primary), 1)
fronters.unshift(primary) fronters.unshift(primary)
// cache front
cache.frontHistory = await system.getFronters()
// post the new switch // post the new switch
axios.post(`${pkUrl}/systems/${Config.pk_system}/switches`, JSON.stringify({ "members": fronters }), { axios.post(`${pkUrl}/systems/@me/switches`, JSON.stringify({ "members": fronters }), {
headers: pkHeader headers: pkHeader
}) })
.catch(err => { .catch(async err => {
if (err.toJSON().status == 400) if (err.toJSON().status == 400)
unknownError400() unknownError400()
else if (err.toJSON().status == 429) else if (err.toJSON().status == 429)
// Too many requests //Too many requests
console.warn("::SimplyWS:: Too many requests, waiting to try again.")
setTimeout(function () { setTimeout(function () {
updateCustomStatus(member) await updateCustomStatus(member)
}, 1000) }, 1000)
return return
}) })
@ -236,12 +253,10 @@ async function updateCustomStatus(member) {
} }
} }
const { inspect } = require('util')
const transform = require('lodash.transform') const transform = require('lodash.transform')
const isEqual = require('lodash.isequal') const isEqual = require('lodash.isequal')
const isArray = require('lodash.isarray') const isArray = require('lodash.isarray')
const isObject = require('lodash.isobject') const isObject = require('lodash.isobject')
const { PassThrough } = require('stream')
async function calculateDiff(origObj, newObj) { async function calculateDiff(origObj, newObj) {
return new Promise(function (resolve) { return new Promise(function (resolve) {
changes = (newObj, origObj) => { changes = (newObj, origObj) => {
@ -259,7 +274,6 @@ async function calculateDiff(origObj, newObj) {
module.exports = { module.exports = {
initializeCache, initializeCache,
unknownError400,
unknownTarget, unknownTarget,
unrecognizedMessage, unrecognizedMessage,
getPKFronters, getPKFronters,

View file

@ -5,19 +5,60 @@ const { Config, System } = require('simplyapi')
const { Util } = require('simplyapi') const { Util } = require('simplyapi')
const { initializeCache, determineAction, insertFront, removeFront, updateCustomStatus } = require('./dataManager') const { initializeCache, determineAction, insertFront, removeFront, updateCustomStatus } = require('./dataManager')
const {
isMainThread,
BroadcastChannel,
Worker
} = require('node:worker_threads')
let e let e
main = async () => { main = () => {
openWebSocket() initiateWorkerPool()
} }
openWebSocket = async () => { // Queue
const async = require('async')
const queue = async.queue((task, completed) => {
let error = { status: false, message: '' }
update(task.data)
.catch(err => {
error.status = true
error.message = err
})
completed(error, task)
}, Config.max_workers)
initiateWorkerPool = () => {
// Worker Pool
const bc = new BroadcastChannel('plural')
if (isMainThread) {
openWebSocket()
bc.onmessage = (event) => {
//console.log('::SimplyWS:: received message from worker')
queue.push(event.data, (error, task) => {
if (error.status) {
console.log(`An error occurred while processing task ${error.message}`)
}
})
}
for (let n = 0; n < Config.max_workers; n++)
new Worker(__filename)
}
}
// Socket
openWebSocket = () => {
const WebSocketClient = require('./WebsocketClient') const WebSocketClient = require('./WebsocketClient')
const wss = new WebSocketClient(Config.socket); const wss = new WebSocketClient(Config.socket)
let initialPacket = { "op": "authenticate", "token": Config.token } let initialPacket = { "op": "authenticate", "token": Config.token }
wss.onOpen = (_) => { wss.send(JSON.stringify(initialPacket)); } wss.onOpen = (_) => { wss.send(JSON.stringify(initialPacket)); }
wss.onClose = (e) => { console.log('SimplyWS/onClose :: %s', e); e = '' } wss.onClose = (e) => { console.log('SimplyWS/onClose :: %s', e); e = '' }
wss.onError = (e) => { console.log('SimplyWS/onError :: %s', e) } wss.onError = (e) => { console.log('SimplyWS/onError :: %s', e) }
const bc = new BroadcastChannel('plural')
wss.onMessage = (raw) => { wss.onMessage = (raw) => {
e = raw e = raw
let data = JSON.parse(e) let data = JSON.parse(e)
@ -33,7 +74,8 @@ openWebSocket = async () => {
console.log('::SimplyWS:: invalid token, exiting..') console.log('::SimplyWS:: invalid token, exiting..')
process.exit(1) process.exit(1)
case "update": case "update":
update(data) initializeCache()
bc.postMessage({data: data})
break break
default: default:
//unrecognizedMessage(data.msg) //unrecognizedMessage(data.msg)
@ -42,6 +84,7 @@ openWebSocket = async () => {
} }
} }
// Data Processing
update = async (data) => { update = async (data) => {
let target = data.target let target = data.target
switch (target) { switch (target) {

View file

@ -12,13 +12,14 @@
"repository": "github:padlocks/Compatiplural", "repository": "github:padlocks/Compatiplural",
"dependencies": { "dependencies": {
"ajv": "^8.10.0", "ajv": "^8.10.0",
"async": "^3.2.3",
"axios": "^0.26.0", "axios": "^0.26.0",
"dotenv": "^16.0.0", "dotenv": "^16.0.0",
"lodash.isarray": "^4.0.0", "lodash.isarray": "^4.0.0",
"lodash.isequal": "^4.5.0", "lodash.isequal": "^4.5.0",
"lodash.isobject": "^3.0.2", "lodash.isobject": "^3.0.2",
"lodash.transform": "^4.6.0", "lodash.transform": "^4.6.0",
"simplyapi": "^0.1.2", "simplyapi": "^0.1.3",
"ws": "^8.5.0" "ws": "^8.5.0"
}, },
"optionalDependencies": { "optionalDependencies": {