From 2ab70b3a6a7a84af16e11f4c0b36df4ea325383a Mon Sep 17 00:00:00 2001 From: CombatPug Date: Mon, 13 Jan 2025 10:50:40 +0100 Subject: [PATCH] SHARD-1234: use rate limiter for websocket messages (#102) * use rate limiter for websocket messages * close ws on non-ok request and disallow opening new connections from banned IPs --- src/middlewares/rateLimit.ts | 2 +- src/websocket/index.ts | 37 +++++++++++++++++++++++++++++++++++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/middlewares/rateLimit.ts b/src/middlewares/rateLimit.ts index 5a3dcc20..3f355c19 100644 --- a/src/middlewares/rateLimit.ts +++ b/src/middlewares/rateLimit.ts @@ -23,7 +23,7 @@ async function handleRejection(res: Response, softReject: boolean): Promise { +export async function checkRequest(ip: string, request: RpcRequest): Promise { return await requestersList.isRequestOkay(ip, request.method, request.params) } diff --git a/src/websocket/index.ts b/src/websocket/index.ts index cd74e256..d595940d 100644 --- a/src/websocket/index.ts +++ b/src/websocket/index.ts @@ -9,6 +9,7 @@ import { evmLogProvider_ConnectionStream } from './log_server' import { SubscriptionDetails } from './clients' import { nestedCountersInstance } from '../utils/nestedCounters' import { IncomingMessage } from 'http' +import {checkRequest, requestersList} from "../middlewares/rateLimit"; interface Params { address?: string | string[] @@ -62,6 +63,13 @@ export const onConnection = async (socket: WebSocket.WebSocket, req: IncomingMes ) return } + if (requestersList.isIpBanned(ip)) { + socket.close( + 1008, + 'Connection closed: IP banned from opening new connections.' + ) + return + } const currentIPConnections = connectionsByIP.get(ip) || new Set() @@ -92,7 +100,34 @@ export const onConnection = async (socket: WebSocket.WebSocket, req: IncomingMes const eth_methods = Object.freeze(wrappedMethods) - socket.on('message', (message: string) => { + socket.on('message', async (message: string) => { + if (CONFIG.rateLimit) { + let request + try { + request = JSON.parse(message) + } catch (e) { + socket.send('Invalid message format') + return + } + + try { + const isRequestOkay = await checkRequest(ip, request) + + if (!isRequestOkay) { + socket.close( + 1008, + JSON.stringify({ jsonrpc: '2.0', error: { code: -1, message: 'Rate limit exceeded' } }) + ) + return + } + + } catch (error) { + console.error('Rate limiting error:', error) + socket.close(1008, 'Internal server error') + return + } + } + // Update last activity time on message received socketActivityMap.set(socket, Date.now())