Fix performance of streaming by parsing message JSON once (#25278)

This commit is contained in:
Emelia Smith 2023-06-09 19:29:16 +02:00 committed by Claire
parent a197fc094f
commit e78ee582f7

View file

@ -92,19 +92,32 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => {
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1)); const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
/** /**
* Attempts to safely parse a string as JSON, used when both receiving a message
* from redis and when receiving a message from a client over a websocket
* connection, this is why it accepts a `req` argument.
* @param {string} json * @param {string} json
* @param {any} req * @param {any?} req
* @return {Object.<string, any>|null} * @returns {Object.<string, any>|null}
*/ */
const parseJSON = (json, req) => { const parseJSON = (json, req) => {
try { try {
return JSON.parse(json); return JSON.parse(json);
} catch (err) { } catch (err) {
/* FIXME: This logging isn't great, and should probably be done at the
* call-site of parseJSON, not in the method, but this would require changing
* the signature of parseJSON to return something akin to a Result type:
* [Error|null, null|Object<string,any}], and then handling the error
* scenarios.
*/
if (req) {
if (req.accountId) { if (req.accountId) {
log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`); log.warn(req.requestId, `Error parsing message from user ${req.accountId}: ${err}`);
} else { } else {
log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`); log.silly(req.requestId, `Error parsing message from ${req.remoteAddress}: ${err}`);
} }
} else {
log.warn(`Error parsing message from redis: ${err}`);
}
return null; return null;
} }
}; };
@ -167,7 +180,7 @@ const startWorker = async (workerId) => {
const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
/** /**
* @type {Object.<string, Array.<function(string): void>>} * @type {Object.<string, Array.<function(Object<string, any>): void>>}
*/ */
const subs = {}; const subs = {};
@ -207,7 +220,10 @@ const startWorker = async (workerId) => {
return; return;
} }
callbacks.forEach(callback => callback(message)); const json = parseJSON(message, null);
if (!json) return;
callbacks.forEach(callback => callback(json));
}; };
/** /**
@ -229,6 +245,7 @@ const startWorker = async (workerId) => {
/** /**
* @param {string} channel * @param {string} channel
* @param {function(Object<string, any>): void} callback
*/ */
const unsubscribe = (channel, callback) => { const unsubscribe = (channel, callback) => {
log.silly(`Removing listener for ${channel}`); log.silly(`Removing listener for ${channel}`);
@ -378,7 +395,7 @@ const startWorker = async (workerId) => {
/** /**
* @param {any} req * @param {any} req
* @return {string} * @returns {string|undefined}
*/ */
const channelNameFromPath = req => { const channelNameFromPath = req => {
const { path, query } = req; const { path, query } = req;
@ -487,15 +504,11 @@ const startWorker = async (workerId) => {
/** /**
* @param {any} req * @param {any} req
* @param {SystemMessageHandlers} eventHandlers * @param {SystemMessageHandlers} eventHandlers
* @return {function(string): void} * @returns {function(object): void}
*/ */
const createSystemMessageListener = (req, eventHandlers) => { const createSystemMessageListener = (req, eventHandlers) => {
return message => { return message => {
const json = parseJSON(message, req); const { event } = message;
if (!json) return;
const { event } = json;
log.silly(req.requestId, `System message for ${req.accountId}: ${event}`); log.silly(req.requestId, `System message for ${req.accountId}: ${event}`);
@ -612,19 +625,16 @@ const startWorker = async (workerId) => {
* @param {function(string, string): void} output * @param {function(string, string): void} output
* @param {function(string[], function(string): void): void} attachCloseHandler * @param {function(string[], function(string): void): void} attachCloseHandler
* @param {boolean=} needsFiltering * @param {boolean=} needsFiltering
* @return {function(string): void} * @returns {function(object): void}
*/ */
const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => {
const accountId = req.accountId || req.remoteAddress; const accountId = req.accountId || req.remoteAddress;
log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`);
// Currently message is of type string, soon it'll be Record<string, any>
const listener = message => { const listener = message => {
const json = parseJSON(message, req); const { event, payload, queued_at } = message;
if (!json) return;
const { event, payload, queued_at } = json;
const transmit = () => { const transmit = () => {
const now = new Date().getTime(); const now = new Date().getTime();
@ -1207,8 +1217,15 @@ const startWorker = async (workerId) => {
ws.on('close', onEnd); ws.on('close', onEnd);
ws.on('error', onEnd); ws.on('error', onEnd);
ws.on('message', data => { ws.on('message', (data, isBinary) => {
const json = parseJSON(data, session.request); if (isBinary) {
log.debug('Received binary data, closing connection');
ws.close(1003, 'The mastodon streaming server does not support binary messages');
return;
}
const message = data.toString('utf8');
const json = parseJSON(message, session.request);
if (!json) return; if (!json) return;