diff --git a/.env_example b/.env_example index decde22..c83e345 100644 --- a/.env_example +++ b/.env_example @@ -3,8 +3,8 @@ url_override="https://v2.apparyllis.com" api_version="v1" socket="wss://v2.apparyllis.com/v1/socket" pk_url="https://api.pluralkit.me/v2" -token="AAAAAAAAAAAAAAAAAAAA" -userId="AAAAAAAAAAAAAAAAAAA" -pk_token= "AAAAAAAAAAAAAAAA" -pk_system="AAAAAAAAAAAAAAAA" -heartbeat=4500000 \ No newline at end of file +token="SIMPLYPLURAL_TOKEN" +userId="abcd1234" +pk_token= "PLURALKIT_TOKEN" +heartbeat=4500000 +max_workers=1 \ No newline at end of file diff --git a/README.md b/README.md index 537fac0..aed59b5 100644 --- a/README.md +++ b/README.md @@ -16,5 +16,5 @@ These can be set either in the .env file, in terminal, or in the config vars sec | token | token_here | Your SimplyPlural account token. As of now, the only permission necessary is the Read permission. | | userId | user_id | Your SimplyPlural account/system ID. You can find it in account info near the bottom. | | pk_token | pluralkit_token_here | Your PluralKit token. Get it by using `pk;token`. | -| pk_system | pluralkit_system_id | Your Pluralkit system ID. This can be either your Discord account ID or your 5 letter ID shown by using pk;system. | | heartbeat | 4500000 | The time in miliseconds before the websocket client reconnects to the websocket server. | +| max_workers | 1 | Max number of workers for processing enqueued tasks. This probably shouldn't be changed. | diff --git a/dataManager.js b/dataManager.js index 8901da7..0b8ba3a 100644 --- a/dataManager.js +++ b/dataManager.js @@ -3,7 +3,7 @@ const { Config, System, Util } = require('simplyapi') const pkUrl = Config.pk_url const pkHeader = { - 'Content-Type': 'application/json', + 'Content-Type': 'application/json; charset=UTF-8', 'Authorization': Config.pk_token } @@ -13,10 +13,6 @@ async function initializeCache() { cache.frontHistory = await system.getFronters() } -function unknownError400() { - return -} - function unknownTarget(target) { console.log('::SimplyWS:: Unknown update target: ' + target + '\n::SimplyWS:: Full message: ' + e) } @@ -25,15 +21,9 @@ function unrecognizedMessage(msg) { console.log('::SimplyWS:: Unrecognized message: ' + msg + '\n::SimplyWS:: Full message: ' + e) } -// async function asyncForEach(array, callback) { -// for (let index = 0; index < array.length; index++) { -// await callback(array[index], index, array) -// } -// } - async function getPKFronters() { let members = [] - let fronters = await axios.get(`${pkUrl}/systems/${Config.pk_system}/fronters`, { + let fronters = await axios.get(`${pkUrl}/systems/@me/fronters`, { headers: pkHeader }) .catch((err) => { @@ -118,9 +108,13 @@ async function determineAction(eventData, frontData = []) { async function insertFront(member) { // get current fronters and add new fronter - let system = new System(Config) let fronters = await getPKFronters() - fronters.push(member.content.pkId) + if (!fronters.includes(member.content.pkId)) { + fronters.push(member.content.pkId) + } else { + console.warn('::SimplyWS:: Member already in fronters: ' + member.content.pkId) + return + } // find the "primary" fronter to move to the first element in the list let primary = await findPrimary() @@ -131,38 +125,57 @@ async function insertFront(member) { } } - // cache front - cache.frontHistory = await system.getFronters() - - // post the new switch - axios.post(`${pkUrl}/systems/${Config.pk_system}/switches`, JSON.stringify({ "members": fronters }), { + // post the new switch + let url = `${pkUrl}/systems/@me/switches` + await axios.post(url, JSON.stringify({ "members": fronters }), { headers: pkHeader }) - .catch(err => { - if (err.toJSON().status == 400) - unknownError400() - else if (err.toJSON().status == 429) + .then(async (res) => { + let front = await getPKFronters() + if (!front.includes(member.content.pkId)) { + console.log('::SimplyWS:: Failed to insert fronter: ' + member.content.pkId) + await insertFront(member) + return + } else { + console.log('::SimplyWS:: ' + member.content.name + ' was added to the front.') + } + }) + .catch(async err => { + let status = err.status || err.toJSON().status + if (status == 400) { + // if the fronter is already in the front, do nothing + return + } + else if (status == 404) { + // member not found + console.error("::SimplyWS:: Could not find member: " + member.content.pkId) + let index = fronters.indexOf(member.content.pkId) + fronters.splice(index, 1) + return + } + else if (status == 429) { // Too many requests - setTimeout(function () { - insertFront(member) + console.warn("::SimplyWS:: Too many requests, waiting to try again.") + let index = fronters.indexOf(member.content.pkId) + fronters.splice(index, 1) + setTimeout(async function () { + await insertFront(member) }, 1000) return + } }) - - let checkFront = await getPKFronters() - if (!checkFront.includes(member.content.pkId)) { - await insertFront(member) - return - } else { - console.log('::SimplyWS:: ' + member.content.name + ' was added to the front.') - } } async function removeFront(member) { - let system = new System(Config) let fronters = await getPKFronters() - let index = fronters.indexOf(member.content.pkId) - fronters.splice(index, 1) + + if (fronters.includes(member.content.pkId)) { + let index = fronters.indexOf(member.content.pkId) + fronters.splice(index, 1) + } else { + console.warn('::SimplyWS:: Member is not in front: ' + member.content.pkId) + return + } // find the "primary" fronter to move to the first element in the list let p = await findPrimary() @@ -173,31 +186,37 @@ async function removeFront(member) { } } - // cache front - cache.frontHistory = await system.getFronters() - - // post the new switch - axios.post(`${pkUrl}/systems/${Config.pk_system}/switches`, JSON.stringify({ "members": fronters }), { + let url = `${pkUrl}/systems/@me/switches` + await axios.post(url, JSON.stringify({ "members": fronters }), { headers: pkHeader }) - .catch(err => { - if (err.toJSON().status == 400) - unknownError400() - else if (err.toJSON().status == 429) + .then(async (res) => { + let front = await getPKFronters() + if (front.includes(member.content.pkId)) { + console.log('::SimplyWS:: Failed to remove fronter: ' + member.content.pkId) + await removeFront(member) + return + } else { + console.log('::SimplyWS:: ' + member.content.name + ' was removed from the front.') + } + }) + .catch(async err => { + let status = err.status || err.toJSON().status + if (status == 400) { + // fronter is already not in front + console.warn("::SimplyWS:: " + member.content.name + " is not in the front.") + return + } + else if (status == 429) { // Too many requests - setTimeout(function () { - removeFront(member) + console.warn("::SimplyWS:: Too many requests, waiting to try again.") + fronters.push(member.content.pkId) + setTimeout(async function () { + await removeFront(member) }, 1000) return + } }) - - let checkFront = await getPKFronters() - if (checkFront.includes(member.content.pkId)) { - await removeFront(member) - return - } else { - console.log('::SimplyWS:: ' + member.content.name + ' was removed from the front.') - } } async function updateCustomStatus(member) { @@ -210,18 +229,16 @@ async function updateCustomStatus(member) { fronters.splice(fronters.indexOf(primary), 1) fronters.unshift(primary) - // cache front - cache.frontHistory = await system.getFronters() - // post the new switch - axios.post(`${pkUrl}/systems/${Config.pk_system}/switches`, JSON.stringify({ "members": fronters }), { + axios.post(`${pkUrl}/systems/@me/switches`, JSON.stringify({ "members": fronters }), { headers: pkHeader }) - .catch(err => { + .catch(async err => { if (err.toJSON().status == 400) unknownError400() else if (err.toJSON().status == 429) - // Too many requests + //Too many requests + console.warn("::SimplyWS:: Too many requests, waiting to try again.") setTimeout(function () { updateCustomStatus(member) }, 1000) @@ -236,19 +253,16 @@ async function updateCustomStatus(member) { } } -const { inspect } = require('util') const transform = require('lodash.transform') const isEqual = require('lodash.isequal') -const isArray = require('lodash.isarray') const isObject = require('lodash.isobject') -const { PassThrough } = require('stream') async function calculateDiff(origObj, newObj) { return new Promise(function (resolve) { changes = (newObj, origObj) => { let arrayIndexCounter = 0 return transform(newObj, function (result, value, key) { if (!isEqual(value, origObj[key])) { - let resultKey = isArray(origObj) ? arrayIndexCounter++ : key + let resultKey = Array.isArray(origObj) ? arrayIndexCounter++ : key result[resultKey] = (isObject(value) && isObject(origObj[key])) ? changes(value, origObj[key]) : value } }) @@ -259,7 +273,6 @@ async function calculateDiff(origObj, newObj) { module.exports = { initializeCache, - unknownError400, unknownTarget, unrecognizedMessage, getPKFronters, diff --git a/index.js b/index.js index 60a0c78..21887d2 100644 --- a/index.js +++ b/index.js @@ -5,19 +5,60 @@ const { Config, System } = require('simplyapi') const { Util } = require('simplyapi') const { initializeCache, determineAction, insertFront, removeFront, updateCustomStatus } = require('./dataManager') +const { + isMainThread, + BroadcastChannel, + Worker +} = require('node:worker_threads') + let e -main = async () => { - openWebSocket() +main = () => { + initiateWorkerPool() } -openWebSocket = async () => { +// Queue +const async = require('async') +const queue = async.queue((task, completed) => { + let error = { status: false, message: '' } + update(task.data) + .catch(err => { + error.status = true + error.message = err + }) + completed(error, task) + +}, Config.max_workers) + +initiateWorkerPool = () => { + // Worker Pool + const bc = new BroadcastChannel('plural') + + if (isMainThread) { + openWebSocket() + + bc.onmessage = (event) => { + //console.log('::SimplyWS:: received message from worker') + queue.push(event.data, (error, task) => { + if (error.status) { + console.log(`An error occurred while processing task ${error.message}`) + } + }) + } + for (let n = 0; n < Config.max_workers; n++) + new Worker(__filename) + } +} + +// Socket +openWebSocket = () => { const WebSocketClient = require('./WebsocketClient') - const wss = new WebSocketClient(Config.socket); + const wss = new WebSocketClient(Config.socket) let initialPacket = { "op": "authenticate", "token": Config.token } wss.onOpen = (_) => { wss.send(JSON.stringify(initialPacket)); } wss.onClose = (e) => { console.log('SimplyWS/onClose :: %s', e); e = '' } wss.onError = (e) => { console.log('SimplyWS/onError :: %s', e) } + const bc = new BroadcastChannel('plural') wss.onMessage = (raw) => { e = raw let data = JSON.parse(e) @@ -33,7 +74,8 @@ openWebSocket = async () => { console.log('::SimplyWS:: invalid token, exiting..') process.exit(1) case "update": - update(data) + initializeCache() + bc.postMessage({data: data}) break default: //unrecognizedMessage(data.msg) @@ -42,6 +84,7 @@ openWebSocket = async () => { } } +// Data Processing update = async (data) => { let target = data.target switch (target) { diff --git a/package.json b/package.json index f3ac2a4..1b81ed4 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "Compatiplural", - "version": "1.0.0", + "version": "1.1.0", "description": "SimplyPlural -> PluralKit Connectivity", "main": "index.js", "scripts": { @@ -12,13 +12,13 @@ "repository": "github:padlocks/Compatiplural", "dependencies": { "ajv": "^8.10.0", + "async": "^3.2.3", "axios": "^0.26.0", "dotenv": "^16.0.0", - "lodash.isarray": "^4.0.0", "lodash.isequal": "^4.5.0", "lodash.isobject": "^3.0.2", "lodash.transform": "^4.6.0", - "simplyapi": "^0.1.2", + "simplyapi": "^0.1.4", "ws": "^8.5.0" }, "optionalDependencies": {