From d127ab3f29a9a1a48c1a3d84c8c2fc50d98bccc7 Mon Sep 17 00:00:00 2001 From: Hoang Pham Date: Mon, 9 Dec 2024 16:34:19 +0700 Subject: [PATCH] Fix server crashed, regular cleanups Signed-off-by: Hoang Pham --- tests/integration/configMock.js | 42 +++ tests/integration/metrics.spec.mjs | 35 +- tests/integration/socket.spec.mjs | 46 +-- websocket_server/ApiService.js | 12 +- websocket_server/AppManager.js | 13 +- websocket_server/BackupManager.js | 43 +-- websocket_server/CleanupManager.js | 79 +++++ websocket_server/Config.js | 70 ++++ websocket_server/Constants.js | 33 ++ websocket_server/InMemoryStrategy.js | 35 ++ websocket_server/LRUCacheStrategy.js | 5 +- websocket_server/PrometheusDataManager.js | 5 +- websocket_server/RedisStrategy.js | 3 +- websocket_server/RoomDataManager.js | 35 +- websocket_server/ServerManager.js | 66 +++- websocket_server/SharedTokenGenerator.js | 10 +- websocket_server/SocketDataManager.js | 53 ++- websocket_server/SocketManager.js | 372 ++++++++++++++++------ websocket_server/StorageManager.js | 4 + websocket_server/Utils.js | 21 +- websocket_server/main.js | 26 +- 21 files changed, 737 insertions(+), 271 deletions(-) create mode 100644 tests/integration/configMock.js create mode 100644 websocket_server/CleanupManager.js create mode 100644 websocket_server/Config.js create mode 100644 websocket_server/Constants.js create mode 100644 websocket_server/InMemoryStrategy.js diff --git a/tests/integration/configMock.js b/tests/integration/configMock.js new file mode 100644 index 0000000..f0e218f --- /dev/null +++ b/tests/integration/configMock.js @@ -0,0 +1,42 @@ +const defaultMockValues = { + IS_TEST_ENV: true, + BYPASS_SSL_VALIDATION: false, + USE_TLS: false, + TLS_KEY_PATH: null, + TLS_CERT_PATH: null, + STORAGE_STRATEGY: 'lru', + REDIS_URL: null, + FORCE_CLOSE_TIMEOUT: 60 * 1000, + METRICS_TOKEN: null, + JWT_SECRET_KEY: null, + BACKUP_DIR: './backup', + ROOM_CLEANUP_INTERVAL: 1000, + LOCK_TIMEOUT: 1000, + LOCK_RETRY_INTERVAL: 1000, + MAX_BACKUPS_PER_ROOM: 10, + ROOM_MAX_AGE: 1000, + MAX_ROOMS_IN_STORAGE: 1000, +} + +export function createConfigMock(customValues = {}) { + const mockValues = { ...defaultMockValues, ...customValues } + + const computedProperties = { + get JWT_SECRET_KEY() { + return mockValues.JWT_SECRET_KEY + }, + get NEXTCLOUD_WEBSOCKET_URL() { + return mockValues.NEXTCLOUD_WEBSOCKET_URL + }, + get NEXTCLOUD_URL() { + return mockValues.NEXTCLOUD_URL + }, + } + + const mockConfig = { + ...mockValues, + ...computedProperties, + } + + return mockConfig +} diff --git a/tests/integration/metrics.spec.mjs b/tests/integration/metrics.spec.mjs index ef18cea..6fe37c9 100644 --- a/tests/integration/metrics.spec.mjs +++ b/tests/integration/metrics.spec.mjs @@ -1,32 +1,37 @@ import { beforeAll, afterAll, describe, it, expect, vi } from 'vitest' import axios from 'axios' -import ServerManager from '../../websocket_server/ServerManager.js' +import { createConfigMock } from './configMock.js' +import ServerManagerModule from '../../websocket_server/ServerManager.js' +import ConfigModule from '../../websocket_server/Config.js' -const SERVER_URL = 'http://localhost:3008' -const SECRET = 'secret' +vi.mock('../../websocket_server/Config.js', () => ({ + default: createConfigMock({ + NEXTCLOUD_URL: 'http://localhost:3008', + NEXTCLOUD_WEBSOCKET_URL: 'http://localhost:3008', + PORT: '3008', + METRICS_TOKEN: 'secret', + }), +})) -vi.stubEnv('METRICS_TOKEN', SECRET) +const Config = ConfigModule +const ServerManager = ServerManagerModule describe('Metrics endpoint', () => { let serverManager beforeAll(async () => { - serverManager = new ServerManager({ - port: 3008, - storageStrategy: 'lru', - }) - - serverManager.start() + serverManager = new ServerManager() + await serverManager.start() }) afterAll(async () => { - await serverManager.server.close() + await serverManager.gracefulShutdown() }) it('should work with bearer auth', async () => { - const response = await axios.get(`${SERVER_URL}/metrics`, { + const response = await axios.get(`${Config.NEXTCLOUD_URL}/metrics`, { headers: { - Authorization: `Bearer ${SECRET}`, + Authorization: `Bearer ${Config.METRICS_TOKEN}`, }, }) expect(response.status).toBe(200) @@ -39,14 +44,14 @@ describe('Metrics endpoint', () => { }) it('should work with token param', async () => { - const response = await axios.get(`${SERVER_URL}/metrics?token=${SECRET}`) + const response = await axios.get(`${Config.NEXTCLOUD_URL}/metrics?token=${Config.METRICS_TOKEN}`) expect(response.status).toBe(200) expect(response.data).toContain('whiteboard_room_stats{stat="activeRooms"}') }) it('Not return on invalid auth', async () => { try { - await axios.get(`${SERVER_URL}/metrics`, { + await axios.get(`${Config.NEXTCLOUD_URL}/metrics`, { headers: { Authorization: 'Bearer wrongtoken', }, diff --git a/tests/integration/socket.spec.mjs b/tests/integration/socket.spec.mjs index 292f9cc..2d87ee6 100644 --- a/tests/integration/socket.spec.mjs +++ b/tests/integration/socket.spec.mjs @@ -1,13 +1,23 @@ import { beforeAll, afterAll, describe, it, expect, vi } from 'vitest' -import ServerManager from '../../websocket_server/ServerManager.js' -import io from 'socket.io-client' +import { io } from 'socket.io-client' import jwt from 'jsonwebtoken' -import Utils from '../../websocket_server/Utils.js' +import { createConfigMock } from './configMock.js' +import ServerManagerModule from '../../websocket_server/ServerManager.js' +import UtilsModule from '../../websocket_server/Utils.js' +import ConfigModule from '../../websocket_server/Config.js' -const SERVER_URL = 'http://localhost:3009' -const SECRET = 'secret' +vi.mock('../../websocket_server/Config.js', () => ({ + default: createConfigMock({ + NEXTCLOUD_URL: 'http://localhost:3009', + NEXTCLOUD_WEBSOCKET_URL: 'http://localhost:3009', + PORT: '3009', + JWT_SECRET_KEY: 'secret', + }), +})) -vi.stubEnv('JWT_SECRET_KEY', SECRET) +const Config = ConfigModule +const ServerManager = ServerManagerModule +const Utils = UtilsModule function waitFor(socket, event) { return new Promise((resolve) => { @@ -19,16 +29,12 @@ describe('Socket handling', () => { let serverManager, socket beforeAll(async () => { - serverManager = new ServerManager({ - port: 3009, - storageStrategy: 'lru', - }) - - serverManager.start() + serverManager = new ServerManager() + await serverManager.start() - socket = io(SERVER_URL, { + socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { - token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, SECRET), + token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, Config.JWT_SECRET_KEY), }, }) @@ -39,11 +45,11 @@ describe('Socket handling', () => { afterAll(async () => { await socket.disconnect() - await serverManager.server.close() + await serverManager.gracefulShutdown() }) it('socket invalid jwt', async () => { - const socket = io(SERVER_URL, { + const socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, 'wrongsecret'), }, @@ -56,9 +62,9 @@ describe('Socket handling', () => { }) it('socket valid jwt', async () => { - const socket = io(SERVER_URL, { + const socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { - token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, SECRET), + token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, Config.JWT_SECRET_KEY), }, }) return new Promise((resolve) => { @@ -78,9 +84,9 @@ describe('Socket handling', () => { }) it('read only socket', async () => { - const socket = io(SERVER_URL, { + const socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { - token: jwt.sign({ roomID: 123, user: { name: 'Admin' }, isFileReadOnly: true }, SECRET), + token: jwt.sign({ roomID: 123, user: { name: 'Admin' }, isFileReadOnly: true }, Config.JWT_SECRET_KEY), }, }) return new Promise((resolve) => { diff --git a/websocket_server/ApiService.js b/websocket_server/ApiService.js index 383e781..1587bb4 100644 --- a/websocket_server/ApiService.js +++ b/websocket_server/ApiService.js @@ -7,16 +7,12 @@ import fetch from 'node-fetch' import https from 'https' -import dotenv from 'dotenv' -import Utils from './Utils.js' -dotenv.config() +import Config from './Config.js' export default class ApiService { constructor(tokenGenerator) { - this.NEXTCLOUD_URL = process.env.NEXTCLOUD_URL - this.IS_DEV = Utils.parseBooleanFromEnv(process.env.IS_DEV) - this.agent = this.IS_DEV ? new https.Agent({ rejectUnauthorized: false }) : null + this.agent = (Config.USE_TLS && Config.BYPASS_SSL_VALIDATION) ? new https.Agent({ rejectUnauthorized: false }) : null this.tokenGenerator = tokenGenerator } @@ -50,7 +46,7 @@ export default class ApiService { } async getRoomDataFromServer(roomID, jwtToken) { - const url = `${this.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` + const url = `${Config.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` const options = this.fetchOptions('GET', jwtToken) return this.fetchData(url, options) } @@ -58,7 +54,7 @@ export default class ApiService { async saveRoomDataToServer(roomID, roomData, lastEditedUser, files) { console.log(`[${roomID}] Saving room data to server: ${roomData.length} elements, ${Object.keys(files).length} files`) - const url = `${this.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` + const url = `${Config.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` const body = { data: { diff --git a/websocket_server/AppManager.js b/websocket_server/AppManager.js index e9e95f3..45d669d 100644 --- a/websocket_server/AppManager.js +++ b/websocket_server/AppManager.js @@ -3,19 +3,14 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import dotenv from 'dotenv' import express from 'express' -import PrometheusDataManager from './PrometheusDataManager.js' - -dotenv.config() +import Config from './Config.js' export default class AppManager { - constructor(storageManager) { + constructor(metricsManager) { this.app = express() - this.storageManager = storageManager - this.metricsManager = new PrometheusDataManager(storageManager) - this.METRICS_TOKEN = process.env.METRICS_TOKEN + this.metricsManager = metricsManager this.setupRoutes() } @@ -30,7 +25,7 @@ export default class AppManager { async metricsHandler(req, res) { const token = req.headers.authorization?.split(' ')[1] || req.query.token - if (!this.METRICS_TOKEN || token !== this.METRICS_TOKEN) { + if (!Config.METRICS_TOKEN || token !== Config.METRICS_TOKEN) { return res.status(403).send('Unauthorized') } this.metricsManager.updateMetrics() diff --git a/websocket_server/BackupManager.js b/websocket_server/BackupManager.js index 95ffb98..5695e3c 100644 --- a/websocket_server/BackupManager.js +++ b/websocket_server/BackupManager.js @@ -10,18 +10,11 @@ import path from 'path' import crypto from 'crypto' import zlib from 'zlib' import { promisify } from 'util' +import Config from './Config.js' const gzip = promisify(zlib.gzip) const gunzip = promisify(zlib.gunzip) -/** - * @typedef {object} BackupOptions - * @property {string} [backupDir='./backup'] - Directory to store backups - * @property {number} [maxBackupsPerRoom=5] - Maximum number of backups to keep per room - * @property {number} [lockTimeout=5000] - Maximum time in ms to wait for a lock - * @property {number} [lockRetryInterval=50] - Time in ms between lock retry attempts - */ - /** * @typedef {object} BackupData * @property {string} id - Unique identifier for the backup @@ -39,15 +32,9 @@ export default class BackupManager { /** * Creates a new BackupManager instance - * @param {BackupOptions} [options] - Configuration options */ - constructor(options = {}) { - const { backupDir = './backup', maxBackupsPerRoom = 5 } = options - this.backupDir = backupDir - this.maxBackupsPerRoom = maxBackupsPerRoom + constructor() { this.locks = new Map() - this.lockTimeout = options.lockTimeout || 5000 // 5 seconds - this.lockRetryInterval = options.lockRetryInterval || 50 // 50ms this.init() } @@ -57,7 +44,7 @@ export default class BackupManager { */ async init() { try { - await fs.mkdir(this.backupDir, { recursive: true }) + await fs.mkdir(Config.BACKUP_DIR, { recursive: true }) await this.cleanupTemporaryFiles() } catch (error) { console.error('Failed to initialize BackupManager:', error) @@ -70,12 +57,12 @@ export default class BackupManager { */ async cleanupTemporaryFiles() { try { - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) const tmpFiles = files.filter((f) => f.endsWith('.tmp')) await Promise.all( tmpFiles.map((file) => fs - .unlink(path.join(this.backupDir, file)) + .unlink(path.join(Config.BACKUP_DIR, file)) .catch(console.error), ), ) @@ -92,11 +79,11 @@ export default class BackupManager { async acquireLock(roomId) { const startTime = Date.now() while (this.locks.get(roomId)) { - if (Date.now() - startTime > this.lockTimeout) { + if (Date.now() - startTime > Config.LOCK_TIMEOUT) { throw new Error(`Lock acquisition timeout for room ${roomId}`) } await new Promise((resolve) => - setTimeout(resolve, this.lockRetryInterval), + setTimeout(resolve, Config.LOCK_RETRY_INTERVAL), ) } this.locks.set(roomId, Date.now()) @@ -187,7 +174,7 @@ export default class BackupManager { */ async writeBackupFile(roomId, backupData) { const backupFile = path.join( - this.backupDir, + Config.BACKUP_DIR, `${roomId}_${backupData.timestamp}.bak`, ) const tempFile = `${backupFile}.tmp` @@ -205,7 +192,7 @@ export default class BackupManager { */ async getLatestBackup(roomId) { const sanitizedRoomId = this.sanitizeRoomId(roomId) - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) const roomBackups = files .filter( (f) => @@ -218,7 +205,7 @@ export default class BackupManager { try { const compressed = await fs.readFile( - path.join(this.backupDir, roomBackups[0]), + path.join(Config.BACKUP_DIR, roomBackups[0]), ) const decompressed = await gunzip(compressed) const backup = JSON.parse(decompressed.toString()) @@ -246,7 +233,7 @@ export default class BackupManager { const sanitizedRoomId = this.sanitizeRoomId(roomId) try { - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) const roomBackups = files .filter( (f) => @@ -256,15 +243,15 @@ export default class BackupManager { .sort() .reverse() - if (roomBackups.length <= this.maxBackupsPerRoom) { + if (roomBackups.length <= Config.MAX_BACKUPS_PER_ROOM) { return } - const filesToDelete = roomBackups.slice(this.maxBackupsPerRoom) + const filesToDelete = roomBackups.slice(Config.MAX_BACKUPS_PER_ROOM) await Promise.all( filesToDelete.map((file) => fs - .unlink(path.join(this.backupDir, file)) + .unlink(path.join(Config.BACKUP_DIR, file)) .catch((error) => { console.error( `Failed to delete backup ${file}:`, @@ -285,7 +272,7 @@ export default class BackupManager { */ async getAllBackups(roomId) { const sanitizedRoomId = this.sanitizeRoomId(roomId) - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) return files .filter( (f) => diff --git a/websocket_server/CleanupManager.js b/websocket_server/CleanupManager.js new file mode 100644 index 0000000..c3cf47f --- /dev/null +++ b/websocket_server/CleanupManager.js @@ -0,0 +1,79 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import Utils from './Utils.js' +import RoomDataManager from './RoomDataManager.js' +import Config from './Config.js' + +/** + * Manages cleanup operations for the whiteboard server + */ +export default class CleanupManager { + + /** + * Creates a new CleanupManager instance + * @param {RoomDataManager} roomDataManager - Manager for room data + */ + constructor(roomDataManager) { + this.roomDataManager = roomDataManager + this.storageManager = roomDataManager.storageManager + + this.cleanupIntervals = new Set() + } + + /** + * Starts periodic cleanup tasks + */ + startPeriodicTasks() { + Utils.logOperation('SYSTEM', 'Starting periodic cleanup tasks...') + + const roomCleanup = setInterval(() => { + this.cleanupRooms() + .catch(error => Utils.logError('SYSTEM', 'Room cleanup failed:', error)) + }, Config.ROOM_CLEANUP_INTERVAL) + + this.cleanupIntervals.add(roomCleanup) + } + + /** + * Performs cleanup of rooms + * @return {Promise} + */ + async cleanupRooms() { + Utils.logOperation('SYSTEM', 'Running room cleanup...') + const rooms = await this.storageManager.getRooms() + + for (const [roomId, room] of rooms.entries()) { + try { + await this.storageManager.delete(roomId) + Utils.logOperation(roomId, 'Auto-saved and cleaned up room data') + } catch (error) { + Utils.logError(roomId, 'Failed to cleanup room:', error) + // Try to restore room in case of error during the cleanup + try { + await this.storageManager.set(roomId, room) + } catch (restoreError) { + Utils.logError( + roomId, + 'Failed to restore room after failed cleanup:', + restoreError, + ) + } + } + } + } + + /** + * Stops all periodic cleanup tasks + */ + stopPeriodicTasks() { + Utils.logOperation('SYSTEM', 'Stopping periodic cleanup tasks...') + for (const interval of this.cleanupIntervals) { + clearInterval(interval) + } + this.cleanupIntervals.clear() + } + +} diff --git a/websocket_server/Config.js b/websocket_server/Config.js new file mode 100644 index 0000000..69c1612 --- /dev/null +++ b/websocket_server/Config.js @@ -0,0 +1,70 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +/* eslint-disable no-console */ + +import dotenv from 'dotenv' +import crypto from 'crypto' +import { DEFAULT_NEXTCLOUD_URL, DEFAULT_PORT, DEFAULT_STORAGE_STRATEGY, DEFAULT_FORCE_CLOSE_TIMEOUT, DEFAULT_REDIS_URL, DEFAULT_ROOM_CLEANUP_INTERVAL, DEFAULT_LOCK_TIMEOUT, DEFAULT_LOCK_RETRY_INTERVAL, DEFAULT_MAX_BACKUPS_PER_ROOM, DEFAULT_BACKUP_DIR, DEFAULT_ROOM_MAX_AGE, DEFAULT_MAX_ROOMS_IN_STORAGE } from './Constants.js' +import Utils from './Utils.js' + +dotenv.config() + +const Config = { + IS_TEST_ENV: process.env.NODE_ENV === 'test', + + BYPASS_SSL_VALIDATION: Utils.parseBooleanFromEnv(process.env.BYPASS_SSL_VALIDATION), + + PORT: process.env.PORT || DEFAULT_PORT, + + USE_TLS: Utils.parseBooleanFromEnv(process.env.TLS), + + TLS_KEY_PATH: process.env.TLS_KEY || null, + + TLS_CERT_PATH: process.env.TLS_CERT || null, + + STORAGE_STRATEGY: process.env.STORAGE_STRATEGY || DEFAULT_STORAGE_STRATEGY, + + REDIS_URL: process.env.REDIS_URL || DEFAULT_REDIS_URL, + + FORCE_CLOSE_TIMEOUT: process.env.FORCE_CLOSE_TIMEOUT || DEFAULT_FORCE_CLOSE_TIMEOUT, + + METRICS_TOKEN: process.env.METRICS_TOKEN || null, + + BACKUP_DIR: process.env.BACKUP_DIR || DEFAULT_BACKUP_DIR, + + MAX_BACKUPS_PER_ROOM: process.env.MAX_BACKUPS_PER_ROOM || DEFAULT_MAX_BACKUPS_PER_ROOM, + + LOCK_TIMEOUT: process.env.LOCK_TIMEOUT || DEFAULT_LOCK_TIMEOUT, + + LOCK_RETRY_INTERVAL: process.env.LOCK_RETRY_INTERVAL || DEFAULT_LOCK_RETRY_INTERVAL, + + ROOM_CLEANUP_INTERVAL: process.env.ROOM_CLEANUP_INTERVAL || DEFAULT_ROOM_CLEANUP_INTERVAL, + + ROOM_MAX_AGE: process.env.ROOM_MAX_AGE || DEFAULT_ROOM_MAX_AGE, + + MAX_ROOMS_IN_STORAGE: process.env.MAX_ROOMS_IN_STORAGE || DEFAULT_MAX_ROOMS_IN_STORAGE, + + get JWT_SECRET_KEY() { + if (!process.env.JWT_SECRET_KEY) { + const newSecret = crypto.randomBytes(32).toString('hex') + process.env.JWT_SECRET_KEY = newSecret + console.log('Generated new JWT_SECRET_KEY:', newSecret) + } else { + console.log('Using existing JWT_SECRET_KEY from environment') + } + return process.env.JWT_SECRET_KEY + }, + + get NEXTCLOUD_WEBSOCKET_URL() { + return Utils.getOriginFromUrl(process.env.NEXTCLOUD_URL || DEFAULT_NEXTCLOUD_URL) + }, + + get NEXTCLOUD_URL() { + return this.NEXTCLOUD_WEBSOCKET_URL + }, +} + +export default Config diff --git a/websocket_server/Constants.js b/websocket_server/Constants.js new file mode 100644 index 0000000..e95016b --- /dev/null +++ b/websocket_server/Constants.js @@ -0,0 +1,33 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +export const DEFAULT_NEXTCLOUD_URL = 'http://nextcloud.local' + +export const DEFAULT_PORT = 3002 + +export const DEFAULT_STORAGE_STRATEGY = 'lru' + +export const DEFAULT_FORCE_CLOSE_TIMEOUT = 60 * 1000 + +export const DEFAULT_REDIS_URL = 'redis://localhost:6379' + +export const DEFAULT_BACKUP_DIR = './backup' + +export const DEFAULT_MAX_BACKUPS_PER_ROOM = 5 + +export const DEFAULT_LOCK_TIMEOUT = 5000 + +export const DEFAULT_LOCK_RETRY_INTERVAL = 50 + +export const DEFAULT_ROOM_CLEANUP_INTERVAL = 5 * 60 * 1000 + +export const DEFAULT_ROOM_MAX_AGE = 30 * 60 * 1000 + +export const DEFAULT_MAX_ROOMS_IN_STORAGE = 1000 + +export const DEFAULT_EMPTY_ROOM_DATA = Object.freeze({ + elements: [], + files: {}, +}) diff --git a/websocket_server/InMemoryStrategy.js b/websocket_server/InMemoryStrategy.js new file mode 100644 index 0000000..955d11c --- /dev/null +++ b/websocket_server/InMemoryStrategy.js @@ -0,0 +1,35 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import StorageStrategy from './StorageStrategy.js' + +export default class InMemoryStrategy extends StorageStrategy { + + constructor() { + super() + this.store = new Map() + } + + async get(key) { + return this.store.get(key) + } + + async set(key, value) { + this.store.set(key, value) + } + + async delete(key) { + this.store.delete(key) + } + + async clear() { + this.store.clear() + } + + getRooms() { + throw new Error('Method not implemented.') + } + +} diff --git a/websocket_server/LRUCacheStrategy.js b/websocket_server/LRUCacheStrategy.js index 73d6610..66249d5 100644 --- a/websocket_server/LRUCacheStrategy.js +++ b/websocket_server/LRUCacheStrategy.js @@ -8,6 +8,7 @@ import StorageStrategy from './StorageStrategy.js' import { LRUCache } from 'lru-cache' import Room from './Room.js' +import Config from './Config.js' export default class LRUCacheStrategy extends StorageStrategy { @@ -15,8 +16,8 @@ export default class LRUCacheStrategy extends StorageStrategy { super() this.apiService = apiService this.cache = new LRUCache({ - max: 1000, - ttl: 30 * 60 * 1000, + max: Config.MAX_ROOMS_IN_STORAGE, + ttl: Config.ROOM_MAX_AGE, ttlAutopurge: true, dispose: async (value, key) => { console.log(`[${key}] Disposing room`) diff --git a/websocket_server/PrometheusDataManager.js b/websocket_server/PrometheusDataManager.js index 1671074..7dffb82 100644 --- a/websocket_server/PrometheusDataManager.js +++ b/websocket_server/PrometheusDataManager.js @@ -4,12 +4,11 @@ */ import { register, Gauge } from 'prom-client' -import SystemMonitor from './SystemMonitor.js' export default class PrometheusDataManager { - constructor(storageManager) { - this.systemMonitor = new SystemMonitor(storageManager) + constructor(systemMonitor) { + this.systemMonitor = systemMonitor this.initializeGauges() } diff --git a/websocket_server/RedisStrategy.js b/websocket_server/RedisStrategy.js index d7c37cf..a00ca07 100644 --- a/websocket_server/RedisStrategy.js +++ b/websocket_server/RedisStrategy.js @@ -8,6 +8,7 @@ import StorageStrategy from './StorageStrategy.js' import { createClient } from 'redis' import Room from './Room.js' +import Config from './Config.js' export default class RedisStrategy extends StorageStrategy { @@ -15,7 +16,7 @@ export default class RedisStrategy extends StorageStrategy { super() this.apiService = apiService this.client = createClient({ - url: process.env.REDIS_URL || 'redis://localhost:6379', + url: Config.REDIS_URL, retry_strategy: (options) => { if (options.error && options.error.code === 'ECONNREFUSED') { return new Error('The server refused the connection') diff --git a/websocket_server/RoomDataManager.js b/websocket_server/RoomDataManager.js index 96bbf99..35039bd 100644 --- a/websocket_server/RoomDataManager.js +++ b/websocket_server/RoomDataManager.js @@ -8,6 +8,7 @@ import Utils from './Utils.js' import ApiService from './ApiService.js' import BackupManager from './BackupManager.js' import StorageManager from './StorageManager.js' +import { DEFAULT_EMPTY_ROOM_DATA } from './Constants.js' /** * @typedef {object} RoomData @@ -30,18 +31,6 @@ import StorageManager from './StorageManager.js' */ export default class RoomDataManager { - /** - * Default configuration for room data - * @static - * @readonly - */ - static CONFIG = Object.freeze({ - defaultData: { - elements: [], - files: {}, - }, - }) - /** * @param {StorageManager} storageManager - Manager for room storage operations * @param {ApiService} apiService - Service for API communications @@ -133,7 +122,7 @@ export default class RoomDataManager { normalizeRoomData(data) { // Always return default data structure if input is null/undefined if (!data) { - return RoomDataManager.CONFIG.defaultData + return DEFAULT_EMPTY_ROOM_DATA } const normalized = { @@ -299,24 +288,4 @@ export default class RoomDataManager { return backupData } - /** - * Handles empty room cleanup - * @param {string} roomId - Room identifier - * @return {Promise} - */ - async handleEmptyRoom(roomId) { - await this.cleanupEmptyRoom(roomId) - return null - } - - /** - * Removes empty room from storage - * @param {string} roomId - Room identifier - * @return {Promise} - */ - async cleanupEmptyRoom(roomId) { - await this.storageManager.delete(roomId) - Utils.logOperation(roomId, 'Empty room removed from cache') - } - } diff --git a/websocket_server/ServerManager.js b/websocket_server/ServerManager.js index 20c008f..ae86168 100644 --- a/websocket_server/ServerManager.js +++ b/websocket_server/ServerManager.js @@ -15,22 +15,48 @@ import StorageManager from './StorageManager.js' import RoomDataManager from './RoomDataManager.js' import AppManager from './AppManager.js' import SocketManager from './SocketManager.js' -import Utils from './Utils.js' import BackupManager from './BackupManager.js' - +import CleanupManager from './CleanupManager.js' +import PrometheusDataManager from './PrometheusDataManager.js' +import SystemMonitor from './SystemMonitor.js' +import SocketDataManager from './SocketDataManager.js' +import Config from './Config.js' export default class ServerManager { - constructor(config) { - this.config = config + constructor() { this.closing = false + this.tokenGenerator = new SharedTokenGenerator() + this.apiService = new ApiService(this.tokenGenerator) - this.backupManager = new BackupManager({}) - this.storageManager = StorageManager.create(this.config.storageStrategy, this.apiService) - this.roomDataManager = new RoomDataManager(this.storageManager, this.apiService, this.backupManager) - this.appManager = new AppManager(this.storageManager) + + this.backupManager = new BackupManager() + + this.roomStorage = StorageManager.create(Config.STORAGE_STRATEGY, this.apiService) + + this.roomDataManager = new RoomDataManager(this.roomStorage, this.apiService, this.backupManager) + + this.systemMonitor = new SystemMonitor(this.roomStorage) + + this.metricsManager = new PrometheusDataManager(this.systemMonitor) + + this.appManager = new AppManager(this.metricsManager) + this.server = this.createConfiguredServer(this.appManager.getApp()) - this.socketManager = new SocketManager(this.server, this.roomDataManager, this.storageManager) + + this.sessionStorage = Config.STORAGE_STRATEGY === 'redis' + ? StorageManager.create('redis', this.apiService) + : StorageManager.create('in-mem') + + this.socketDataManager = new SocketDataManager(this.sessionStorage) + + this.socketManager = new SocketManager(this.server, this.roomDataManager, this.sessionStorage, this.socketDataManager) + + this.cleanupManager = new CleanupManager( + this.roomDataManager, + ) + + this.cleanupManager.startPeriodicTasks() } readTlsCredentials(keyPath, certPath) { @@ -40,18 +66,17 @@ export default class ServerManager { } } - createConfiguredServer(app) { - const useTls = Utils.parseBooleanFromEnv(this.config.tls) + createConfiguredServer(app, useTls = false) { const serverType = useTls ? https : http - const serverOptions = useTls ? this.readTlsCredentials(this.config.keyPath, this.config.certPath) : {} + const serverOptions = useTls ? this.readTlsCredentials(Config.TLS_KEY_PATH, Config.TLS_CERT_PATH) : {} return serverType.createServer(serverOptions, app) } start() { return new Promise((resolve, reject) => { - this.server.listen(this.config.port, () => { - console.log(`Listening on port: ${this.config.port}`) + this.server.listen(Config.PORT, () => { + console.log(`Listening on port: ${Config.PORT}`) resolve() }) @@ -68,9 +93,16 @@ export default class ServerManager { async gracefulShutdown() { if (this.closing) return this.closing = true - console.log('Received shutdown signal, saving all data...') + console.log('Received shutdown signal, performing cleanup...') + try { - await this.roomDataManager.removeAllRoomData() + // Stop periodic cleanup tasks + this.cleanupManager.stopPeriodicTasks() + + // Run one final cleanup + await this.cleanupManager.cleanupRooms() + + // Continue with existing shutdown logic this.socketManager.io.close() console.log('Closing server...') this.server.close(() => { @@ -81,7 +113,7 @@ export default class ServerManager { setTimeout(() => { console.error('Force closing server after timeout') process.exit(1) - }, this.config.forceCloseTimeout) + }, Config.FORCE_CLOSE_TIMEOUT) } catch (error) { console.error('Error during graceful shutdown:', error) process.exit(1) diff --git a/websocket_server/SharedTokenGenerator.js b/websocket_server/SharedTokenGenerator.js index b3a63b6..678b162 100644 --- a/websocket_server/SharedTokenGenerator.js +++ b/websocket_server/SharedTokenGenerator.js @@ -4,20 +4,14 @@ */ import crypto from 'crypto' -import dotenv from 'dotenv' - -dotenv.config() +import Config from './Config.js' export default class SharedTokenGenerator { - constructor() { - this.SHARED_SECRET = process.env.JWT_SECRET_KEY - } - handle(roomId) { const timestamp = Date.now() const payload = `${roomId}:${timestamp}` - const hmac = crypto.createHmac('sha256', this.SHARED_SECRET) + const hmac = crypto.createHmac('sha256', Config.JWT_SECRET_KEY) hmac.update(payload) const signature = hmac.digest('hex') return `${payload}:${signature}` diff --git a/websocket_server/SocketDataManager.js b/websocket_server/SocketDataManager.js index 4b0cad1..74e4ae3 100644 --- a/websocket_server/SocketDataManager.js +++ b/websocket_server/SocketDataManager.js @@ -2,29 +2,80 @@ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors * SPDX-License-Identifier: AGPL-3.0-or-later */ - export default class SocketDataManager { + /** + * @typedef {object} UserData + * @property {string} id - User identifier + * @property {string} name - User display name + */ + + /** + * @typedef {object} DecodedTokenData + * @property {string} userid - User identifier + * @property {number} fileId - File identifier + * @property {boolean} isFileReadOnly - Whether the file is read-only + * @property {UserData} user - User information + * @property {number} iat - Token issued at timestamp + * @property {number} exp - Token expiration timestamp + * @property {string} [jwtToken] - Original JWT token string + */ + + /** + * @param {object} storageManager - Storage manager instance for data persistence + */ constructor(storageManager) { this.storageManager = storageManager } + /** + * Caches decoded token data + * @param {string} token - JWT token + * @param {DecodedTokenData} decodedData - Decoded token payload + */ async setCachedToken(token, decodedData) { await this.storageManager.set(`token:${token}`, decodedData) } + /** + * Removes token from cache + * @param {string} token - JWT token to invalidate + */ + async invalidateToken(token) { + await this.storageManager.delete(`token:${token}`) + } + + /** + * Retrieves cached token data + * @param {string} token - JWT token + * @return {Promise} Decoded token data if exists + */ async getCachedToken(token) { return this.storageManager.get(`token:${token}`) } + /** + * Stores socket session data + * @param {string} socketId - Socket identifier + * @param {object} data - Socket session data + */ async setSocketData(socketId, data) { await this.storageManager.set(`socket:${socketId}`, data) } + /** + * Retrieves socket session data + * @param {string} socketId - Socket identifier + * @return {Promise} Socket session data if exists + */ async getSocketData(socketId) { return this.storageManager.get(`socket:${socketId}`) } + /** + * Removes socket session data + * @param {string} socketId - Socket identifier + */ async deleteSocketData(socketId) { await this.storageManager.delete(`socket:${socketId}`) } diff --git a/websocket_server/SocketManager.js b/websocket_server/SocketManager.js index 7ede85e..5188437 100644 --- a/websocket_server/SocketManager.js +++ b/websocket_server/SocketManager.js @@ -5,61 +5,85 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import { Server as SocketIO } from 'socket.io' +import { Server as SocketIO, Socket } from 'socket.io' import prometheusMetrics from 'socket.io-prometheus' import jwt from 'jsonwebtoken' -import dotenv from 'dotenv' import Utils from './Utils.js' import { createAdapter } from '@socket.io/redis-streams-adapter' import SocketDataManager from './SocketDataManager.js' +import RoomDataManager from './RoomDataManager.js' +import StorageManager from './StorageManager.js' +import { Server } from 'http' +import { Server as HttpsServer } from 'https' +import Config from './Config.js' -dotenv.config() - +/** + * Manages WebSocket connections and room interactions + */ export default class SocketManager { - constructor(server, roomDataManager, storageManager) { + /** + * Creates a new SocketManager instance + * @param {Server|HttpsServer} server - HTTP/HTTPS server instance + * @param {RoomDataManager} roomDataManager - Manager for room data + * @param {StorageManager} storageManager - Manager for storage operations + * @param {SocketDataManager} socketDataManager - Manager for socket data + */ + constructor(server, roomDataManager, storageManager, socketDataManager) { this.roomDataManager = roomDataManager this.storageManager = storageManager - this.socketDataManager = new SocketDataManager(storageManager) + this.socketDataManager = socketDataManager + this.io = this.createSocketServer(server) + this.init() + } - this.io = new SocketIO(server, { + // SERVER SETUP METHODS + /** + * Creates and configures the Socket.IO server + * @param {Server|HttpsServer} server - HTTP/HTTPS server instance + * @return {SocketIO.Server} Configured Socket.IO server instance + */ + createSocketServer(server) { + return new SocketIO(server, { transports: ['websocket', 'polling'], cors: { - origin: process.env.NEXTCLOUD_URL || 'http://nextcloud.local', + origin: Config.NEXTCLOUD_WEBSOCKET_URL, methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], credentials: true, }, }) - - this.init() } + /** + * Initializes the socket server and sets up necessary configurations + * @return {Promise} + */ async init() { + await this.setupAdapter() + this.setupEventHandlers() + } + + /** + * Sets up the appropriate adapter (Redis or in-memory) + * @return {Promise} + */ + async setupAdapter() { if (this.shouldUseRedis()) { await this.setupRedisStreamsAdapter() } else { console.log('Using default in-memory adapter') } - - this.io.use(this.socketAuthenticateHandler.bind(this)) - prometheusMetrics(this.io) - this.io.on('connection', this.handleConnection.bind(this)) - } - - shouldUseRedis() { - return this.storageManager.strategy.constructor.name === 'RedisStrategy' } + /** + * Configures Redis Streams adapter for Socket.IO + * @return {Promise} + */ async setupRedisStreamsAdapter() { console.log('Setting up Redis Streams adapter') try { const redisClient = this.storageManager.strategy.client - this.io.adapter( - createAdapter(redisClient, { - maxLen: 10000, - }), - ) - + this.io.adapter(createAdapter(redisClient, { maxLen: 10000 })) console.log('Redis Streams adapter set up successfully') } catch (error) { console.error('Failed to set up Redis Streams adapter:', error) @@ -67,13 +91,27 @@ export default class SocketManager { } } + /** + * Determines if Redis should be used as the adapter + * @return {boolean} + */ + shouldUseRedis() { + return this.storageManager.strategy.constructor.name === 'RedisStrategy' + } + + // AUTHENTICATION METHODS + /** + * Handles socket authentication + * @param {Socket} socket - Socket.IO socket instance + * @param {Function} next - Next middleware function + * @return {Promise} + */ async socketAuthenticateHandler(socket, next) { try { const { token } = socket.handshake.auth if (!token) throw new Error('No token provided') const decodedData = await this.verifyToken(token) - console.log('decodedData', decodedData) await this.socketDataManager.setSocketData(socket.id, decodedData) if (decodedData.isFileReadOnly) { @@ -81,47 +119,30 @@ export default class SocketManager { } next() } catch (error) { - const { secret } = socket.handshake.auth - - try { - jwt.verify( - secret, - process.env.JWT_SECRET_KEY, - { - algorithm: 'HS256', - }, - ) - next(new Error('Connection verified')) - } catch (e) {} - - next(new Error('Authentication error')) + await this.handleAuthError(socket, next) } } - handleConnection(socket) { - socket.emit('init-room') - socket.on('join-room', (roomID) => this.joinRoomHandler(socket, roomID)) - socket.on('server-broadcast', (roomID, encryptedData, iv) => - this.serverBroadcastHandler(socket, roomID, encryptedData, iv), - ) - socket.on('server-volatile-broadcast', (roomID, encryptedData) => - this.serverVolatileBroadcastHandler(socket, roomID, encryptedData), - ) - socket.on('image-add', (roomID, id, data) => this.imageAddHandler(socket, roomID, id, data)) - socket.on('image-remove', (roomID, id, data) => this.imageRemoveHandler(socket, roomID, id, data)) - socket.on('image-get', (roomID, id, data) => this.imageGetHandler(socket, roomID, id, data)) - socket.on('disconnecting', () => { - const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id) - this.disconnectingHandler(socket, rooms) - }) - socket.on('disconnect', () => this.handleDisconnect(socket)) - } - - async handleDisconnect(socket) { - await this.socketDataManager.deleteSocketData(socket.id) - socket.removeAllListeners() + /** + * Handles authentication errors + * @param {Socket} socket - Socket.IO socket instance + * @param {Function} next - Next middleware function + */ + async handleAuthError(socket, next) { + const { secret } = socket.handshake.auth + try { + jwt.verify(secret, Config.JWT_SECRET_KEY, { algorithm: 'HS256' }) + next(new Error('Connection verified')) + } catch (e) { + next(new Error('Authentication error')) + } } + /** + * Verifies JWT token + * @param {string} token - JWT token to verify + * @return {Promise} Decoded token data + */ async verifyToken(token) { const cachedToken = await this.socketDataManager.getCachedToken(token) if (cachedToken) return cachedToken @@ -129,7 +150,7 @@ export default class SocketManager { return new Promise((resolve, reject) => { jwt.verify( token, - process.env.JWT_SECRET_KEY, + Config.JWT_SECRET_KEY, async (err, decoded) => { if (err) { console.log( @@ -146,25 +167,64 @@ export default class SocketManager { }) } - async isSocketReadOnly(socketId) { - const socketData = await this.socketDataManager.getSocketData(socketId) - return socketData ? !!socketData.isFileReadOnly : false + // EVENT SETUP METHODS + /** + * Sets up all event handlers for the socket server + */ + setupEventHandlers() { + this.io.use(this.socketAuthenticateHandler.bind(this)) + prometheusMetrics(this.io) + this.io.on('connection', this.handleConnection.bind(this)) } - async getUserSocketsAndIds(roomID) { - const sockets = await this.io.in(roomID).fetchSockets() - return Promise.all(sockets.map(async (s) => { - const data = await this.socketDataManager.getSocketData(s.id) - return { - socketId: s.id, - user: data.user, - userId: data.user.id, - } - })) + /** + * Handles new socket connections + * @param {Socket} socket - Socket.IO socket instance + */ + handleConnection(socket) { + socket.emit('init-room') + this.setupSocketEventListeners(socket) } + /** + * Sets up event listeners for a specific socket + * @param {Socket} socket - Socket.IO socket instance + */ + setupSocketEventListeners(socket) { + const events = { + 'join-room': this.joinRoomHandler, + 'server-broadcast': this.serverBroadcastHandler, + 'server-volatile-broadcast': this.serverVolatileBroadcastHandler, + 'image-add': this.imageAddHandler, + 'image-remove': this.imageRemoveHandler, + 'image-get': this.imageGetHandler, + disconnect: this.disconnectHandler, + } + + // Handle regular events + Object.entries(events).forEach(([event, handler]) => { + socket.on(event, (...args) => + this.safeSocketHandler(socket, () => handler.apply(this, [socket, ...args])), + ) + }) + + // Handle disconnecting separately to ensure correct room capture + socket.on('disconnecting', () => { + const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id) + this.safeSocketHandler(socket, () => this.disconnectingHandler(socket, rooms)) + }) + } + + // ROOM EVENT HANDLERS + /** + * Handles room join requests + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @return {Promise} + */ async joinRoomHandler(socket, roomID) { - console.log(`[${roomID}] ${socket.id} has joined ${roomID}`) + const socketData = await this.socketDataManager.getSocketData(socket.id) + console.log(`[${roomID}] ${socketData.user.name} has joined ${roomID}`) await socket.join(roomID) const userSocketsAndIds = await this.getUserSocketsAndIds(roomID) @@ -195,6 +255,14 @@ export default class SocketManager { } } + /** + * Handles broadcast messages to room + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {ArrayBuffer} encryptedData - Encrypted message data + * @param {string} iv - Initialization vector + * @return {Promise} + */ async serverBroadcastHandler(socket, roomID, encryptedData, iv) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomID) || isReadOnly) return @@ -208,26 +276,12 @@ export default class SocketManager { }, socket.id) } - async processRoomDataUpdate(roomID, updateData, socketId) { - const socketData = await this.socketDataManager.getSocketData(socketId) - if (!socketData) return - - const userSocketsAndIds = await this.getUserSocketsAndIds(roomID) - const currentRoom = await this.storageManager.get(roomID) - - const roomData = { - elements: updateData.elements || currentRoom?.data || [], - files: updateData.files || currentRoom?.files || {}, - } - - await this.roomDataManager.syncRoomData( - roomID, - roomData, - userSocketsAndIds.map(u => u.userId), - socketData.user.id, - ) - } - + /** + * Handles volatile broadcasts (e.g., mouse movements) + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {ArrayBuffer} encryptedData - Encrypted message data + */ async serverVolatileBroadcastHandler(socket, roomID, encryptedData) { const payload = JSON.parse( Utils.convertArrayBufferToString(encryptedData), @@ -255,6 +309,15 @@ export default class SocketManager { } } + // IMAGE HANDLING METHODS + /** + * Handles image addition to room + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {string} id - Image identifier + * @param {object} data - Image data + * @return {Promise} + */ async imageAddHandler(socket, roomID, id, data) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomID) || isReadOnly) return @@ -270,6 +333,12 @@ export default class SocketManager { }, socket.id) } + /** + * Handles image removal from room + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {string} id - Image identifier + */ async imageRemoveHandler(socket, roomID, id) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomID) || isReadOnly) return @@ -286,6 +355,12 @@ export default class SocketManager { }, socket.id) } + /** + * Handles image retrieval requests + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomId - Room identifier + * @param {string} id - Image identifier + */ async imageGetHandler(socket, roomId, id) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomId) || isReadOnly) return @@ -302,6 +377,21 @@ export default class SocketManager { } } + // DISCONNECTION HANDLERS + /** + * Handles socket disconnection + * @param {Socket} socket - Socket.IO socket instance + */ + async disconnectHandler(socket) { + await this.socketDataManager.deleteSocketData(socket.id) + socket.removeAllListeners() + } + + /** + * Handles socket disconnecting event + * @param {Socket} socket - Socket.IO socket instance + * @param {string[]} rooms - Array of room IDs + */ async disconnectingHandler(socket, rooms) { const socketData = await this.socketDataManager.getSocketData(socket.id) if (!socketData) return @@ -316,17 +406,105 @@ export default class SocketManager { if (otherUserSockets.length > 0) { this.io.to(roomID).emit('room-user-change', otherUserSockets) } else { - this.roomDataManager.cleanupEmptyRoom(roomID) + await this.storageManager.delete(roomID) } this.queueRoomUpdate(roomID, {}, socket.id) } } + // ROOM DATA MANAGEMENT + /** + * Processes room data updates + * @param {string} roomID - Room identifier + * @param {object} updateData - Data to update + * @param {string} socketId - Socket identifier + * @return {Promise} + */ + async processRoomDataUpdate(roomID, updateData, socketId) { + const socketData = await this.socketDataManager.getSocketData(socketId) + if (!socketData) return + + const userSocketsAndIds = await this.getUserSocketsAndIds(roomID) + const currentRoom = await this.storageManager.get(roomID) + + const roomData = { + elements: updateData.elements || currentRoom?.data || [], + files: updateData.files || currentRoom?.files || {}, + } + + await this.roomDataManager.syncRoomData( + roomID, + roomData, + userSocketsAndIds.map(u => u.userId), + socketData.user.id, + ) + } + + /** + * Queues room updates for processing + * @param {string} roomID - Room identifier + * @param {object} updateData - Data to update + * @param {string} socketId - Socket identifier + */ async queueRoomUpdate(roomID, updateData, socketId) { this.processRoomDataUpdate(roomID, updateData, socketId).catch(error => { console.error(`Failed to process room update for ${roomID}:`, error) }) } + // UTILITY METHODS + /** + * Safely executes socket handlers with error handling + * @param {Socket} socket - Socket.IO socket instance + * @param {Function} handler - Handler function to execute + * @return {Promise} Success status + */ + async safeSocketHandler(socket, handler) { + try { + const socketData = await this.socketDataManager.getSocketData(socket.id) + if (!socketData?.user) { + socket.emit('error', 'Invalid session') + socket.disconnect() + return false + } + return await handler() + } catch (error) { + console.error('Socket handler error:', error) + socket.emit('error', 'Internal server error') + return false + } + } + + /** + * Checks if a socket is in read-only mode + * @param {string} socketId - Socket identifier + * @return {Promise} Read-only status + */ + async isSocketReadOnly(socketId) { + const socketData = await this.socketDataManager.getSocketData(socketId) + return socketData ? !!socketData.isFileReadOnly : false + } + + /** + * Gets user sockets and IDs for a room + * @param {string} roomID - Room identifier + * @return {Promise>} + */ + async getUserSocketsAndIds(roomID) { + const sockets = await this.io.in(roomID).fetchSockets() + return Promise.all(sockets.map(async (s) => { + const data = await this.socketDataManager.getSocketData(s.id) + if (!data?.user?.id) { + console.warn(`Invalid socket data for socket ${s.id}`) + return null + } + return { + socketId: s.id, + user: data.user, + userId: data.user.id, + } + })).then(results => results.filter(Boolean)) + } + } diff --git a/websocket_server/StorageManager.js b/websocket_server/StorageManager.js index e58255c..b3a9d29 100644 --- a/websocket_server/StorageManager.js +++ b/websocket_server/StorageManager.js @@ -8,6 +8,7 @@ import StorageStrategy from './StorageStrategy.js' import LRUCacheStrategy from './LRUCacheStrategy.js' import RedisStrategy from './RedisStrategy.js' +import InMemoryStrategy from './InMemoryStrategy.js' export default class StorageManager { @@ -54,6 +55,9 @@ export default class StorageManager { case 'redis': strategy = new RedisStrategy(apiService) break + case 'in-mem': + strategy = new InMemoryStrategy() + break default: throw new Error('Invalid storage strategy type') } diff --git a/websocket_server/Utils.js b/websocket_server/Utils.js index d230806..b042611 100644 --- a/websocket_server/Utils.js +++ b/websocket_server/Utils.js @@ -19,24 +19,33 @@ export default class Utils { return value === 'true' } + static getOriginFromUrl(url) { + try { + return new URL(url).origin + } catch (error) { + console.error('Invalid URL:', url) + return null + } + } + /** * Logs operation details - * @param {string} roomId - Room identifier + * @param {string} context - Context identifier * @param {string} message - Log message * @param {object} [data] - Additional data to log */ - static logOperation(roomId, message, data = {}) { - console.log(`[${roomId}] ${message}:`, data) + static logOperation(context, message, data = {}) { + console.log(`[${context}] ${message}:`, data) } /** * Logs error details - * @param {string} roomId - Room identifier + * @param {string} context - Context identifier * @param {string} message - Error message * @param {Error} error - Error object */ - static logError(roomId, message, error) { - console.error(`[${roomId}] ${message}:`, error) + static logError(context, message, error) { + console.error(`[${context}] ${message}:`, error) } } diff --git a/websocket_server/main.js b/websocket_server/main.js index 2014c27..01a96e7 100644 --- a/websocket_server/main.js +++ b/websocket_server/main.js @@ -5,36 +5,16 @@ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors * SPDX-License-Identifier: AGPL-3.0-or-later */ - -import dotenv from 'dotenv' import ServerManager from './ServerManager.js' - -dotenv.config() - -const { - PORT = 3002, - TLS, - TLS_KEY: keyPath, - TLS_CERT: certPath, - STORAGE_STRATEGY = 'lru', -} = process.env - -const FORCE_CLOSE_TIMEOUT = 60 * 1000 +import Config from './Config.js' async function main() { try { - const serverManager = new ServerManager({ - port: PORT, - tls: TLS, - keyPath, - certPath, - storageStrategy: STORAGE_STRATEGY, - forceCloseTimeout: FORCE_CLOSE_TIMEOUT, - }) + const serverManager = new ServerManager() await serverManager.start() - console.log(`Server started successfully on port ${PORT}`) + console.log(`Server started successfully on port ${Config.PORT}`) process.on('SIGTERM', () => serverManager.gracefulShutdown()) process.on('SIGINT', () => serverManager.gracefulShutdown())