diff --git a/packages/client/src/app/jobs/executeJobModal.tsx b/packages/client/src/app/jobs/executeJobModal.tsx index a221580..e3008b3 100644 --- a/packages/client/src/app/jobs/executeJobModal.tsx +++ b/packages/client/src/app/jobs/executeJobModal.tsx @@ -1,12 +1,12 @@ import { Button, Modal, Table, Text } from '@nextui-org/react'; -import { MitmControlDTO } from '@rotom/connections'; +import { DeviceControlDTO } from '@rotom/connections'; import { Selection } from '@react-types/shared/src/selection'; import { toast } from 'react-toastify'; import { useCallback, useState } from 'react'; interface ExecuteJobModalProps { closeModal: () => void; - devices?: MitmControlDTO[]; + devices?: DeviceControlDTO[]; jobId: string; } diff --git a/packages/client/src/app/jobs/jobsPage.tsx b/packages/client/src/app/jobs/jobsPage.tsx index ac8d70a..98be557 100644 --- a/packages/client/src/app/jobs/jobsPage.tsx +++ b/packages/client/src/app/jobs/jobsPage.tsx @@ -1,5 +1,5 @@ import { Text } from '@nextui-org/react'; -import { JobsDTO, JobsStatusDTO, MitmControlDTO, StatusDTO } from '@rotom/types'; +import { JobsDTO, JobsStatusDTO, DeviceControlDTO, StatusDTO } from '@rotom/types'; import { useQuery } from '@tanstack/react-query'; import { JobsTable } from './jobsTable'; import { JobsStatusesTable } from './jobsStatusesTable'; @@ -19,7 +19,7 @@ export const JobsPage = (): JSX.Element => { refetchInterval: 5000, }); - const { data: devices, refetch: refetchDevices } = useQuery( + const { data: devices, refetch: refetchDevices } = useQuery( ['status'], fetchStatus, { diff --git a/packages/client/src/app/jobs/jobsTable.tsx b/packages/client/src/app/jobs/jobsTable.tsx index bddc60d..aa568f5 100644 --- a/packages/client/src/app/jobs/jobsTable.tsx +++ b/packages/client/src/app/jobs/jobsTable.tsx @@ -1,11 +1,11 @@ import { Button, Loading, Modal, Table, useModal } from '@nextui-org/react'; -import { JobsDTO, MitmControlDTO } from '@rotom/types'; +import { JobsDTO, DeviceControlDTO } from '@rotom/types'; import { useState } from 'react'; import { ExecuteJobModal } from './executeJobModal'; interface JobsTableProps { - devices?: MitmControlDTO[]; + devices?: DeviceControlDTO[]; isLoading: boolean; jobs: JobsDTO; refetchDevices: () => void; diff --git a/packages/client/src/app/server.ts b/packages/client/src/app/server.ts index cdb89cd..0f960b9 100644 --- a/packages/client/src/app/server.ts +++ b/packages/client/src/app/server.ts @@ -1,6 +1,6 @@ import { createServer, Model, Factory } from 'miragejs'; import { faker } from '@faker-js/faker'; -import { StatusDTO, WorkerDTO, MitmControlDTO } from '@rotom/types'; +import { StatusDTO, WorkerDTO, DeviceControlDTO } from '@rotom/types'; const getRecentTimestamp = (): number => { const thirtyMinutesAgo = new Date(); @@ -14,7 +14,7 @@ export const makeServer = ({ environment = 'test' } = {}) => { environment, models: { - device: Model.extend>({ version: '15' }), + device: Model.extend>({ version: '15' }), worker: Model.extend>({}), }, @@ -24,7 +24,7 @@ export const makeServer = ({ environment = 'test' } = {}) => { }, factories: { - device: Factory.extend>({ + device: Factory.extend>({ dateLastMessageReceived() { return getRecentTimestamp(); }, @@ -61,7 +61,7 @@ export const makeServer = ({ environment = 'test' } = {}) => { workerId(i) { return faker.vehicle.vrm() + `${i}`; }, - mitm(i) { + worker(i) { return { dateLastMessageReceived: getRecentTimestamp(), dateLastMessageSent: getRecentTimestamp(), diff --git a/packages/client/src/app/status/devicesTable.tsx b/packages/client/src/app/status/devicesTable.tsx index a549266..cd3a1e7 100644 --- a/packages/client/src/app/status/devicesTable.tsx +++ b/packages/client/src/app/status/devicesTable.tsx @@ -1,5 +1,5 @@ import { memo, useCallback, useMemo, useState } from 'react'; -import { StatusDTO, MitmControlDTO } from '@rotom/types'; +import { StatusDTO, DeviceControlDTO } from '@rotom/types'; import { Table, Dropdown, SortDescriptor } from '@nextui-org/react'; import { toast } from 'react-toastify'; @@ -57,7 +57,7 @@ export const DevicesTable = ({ devices, workers }: StatusDTO): JSX.Element => { [], ); - const list = useTableSort({ + const list = useTableSort({ items: devices, initialSortDescriptor, }); diff --git a/packages/client/src/app/status/workersTable.tsx b/packages/client/src/app/status/workersTable.tsx index 8ebb587..9934467 100644 --- a/packages/client/src/app/status/workersTable.tsx +++ b/packages/client/src/app/status/workersTable.tsx @@ -21,9 +21,9 @@ export const WorkersTable = ({ workers }: { workers: StatusDTO['workers'] }): JS return list.items.filter( (worker) => !lowercaseSearch || - worker.mitm.origin?.toLowerCase().includes(lowercaseSearch) || + worker.worker.origin?.toLowerCase().includes(lowercaseSearch) || worker.workerId.toLowerCase().includes(lowercaseSearch) || - worker.scanner?.workerName.toLowerCase().includes(lowercaseSearch), + worker.controller?.workerName.toLowerCase().includes(lowercaseSearch), ); }, [search, list.items]); @@ -60,15 +60,15 @@ export const WorkersTable = ({ workers }: { workers: StatusDTO['workers'] }): JS {filteredItems.map((worker, index) => ( - {worker.mitm.origin} + {worker.worker.origin} {worker.workerId} {worker.isAllocated ? '✅' : '❌'} - {worker.scanner?.workerName} + {worker.controller?.workerName} - + - + ))} diff --git a/packages/connections/README.md b/packages/connections/README.md index 4412124..ec6d529 100644 --- a/packages/connections/README.md +++ b/packages/connections/README.md @@ -1,3 +1,3 @@ # connections -This library is a node library to handle connections with MITM and Scanner +This library is a node library to handle connections with MITM and Controller diff --git a/packages/connections/src/index.ts b/packages/connections/src/index.ts index aa9435f..9ab1c88 100644 --- a/packages/connections/src/index.ts +++ b/packages/connections/src/index.ts @@ -1,3 +1,3 @@ -export * from './lib/scannerConnection'; -export * from './lib/mitmControlConnection'; -export * from './lib/mitmWorkerConnection'; +export * from './lib/controllerConnection'; +export * from './lib/deviceControlConnection'; +export * from './lib/deviceWorkerConnection'; diff --git a/packages/connections/src/lib/scannerConnection.ts b/packages/connections/src/lib/controllerConnection.ts similarity index 71% rename from packages/connections/src/lib/scannerConnection.ts rename to packages/connections/src/lib/controllerConnection.ts index 4ae1fac..edc4241 100644 --- a/packages/connections/src/lib/scannerConnection.ts +++ b/packages/connections/src/lib/controllerConnection.ts @@ -1,22 +1,25 @@ import { EventEmitter } from 'events'; -import { WebSocket } from 'ws'; import { Logger } from 'winston'; -import { MitmWorkerConnection } from './mitmWorkerConnection'; -import { DTO } from './utils/type'; +import { WebSocket } from 'ws'; +import { DeviceWorkerConnection } from './deviceWorkerConnection'; import { RotomProtos } from './utils/mitmProto'; +import { DTO } from './utils/type'; import MitmRequest = RotomProtos.MitmRequest; //import MitmCommand = RotomProtos.MitmCommand; let instanceNo = 0; -export type ScannerConnectionDTO = Omit, 'ws' | 'log' | 'heartbeatHandle' | 'mitmConnection'>; +export type ControllerConnectionDTO = Omit< + DTO, + 'ws' | 'log' | 'heartbeatHandle' | 'deviceWorkerConnection' +>; -export class ScannerConnection extends EventEmitter { - _mitm_disconnect_handler: () => void; - _mitm_message_handler: (data: ArrayBuffer) => void; +export class ControllerConnection extends EventEmitter { + _device_disconnect_handler: () => void; + _device_message_handler: (data: ArrayBuffer) => void; _ws_close_handler: () => void; _ws_message_handler: (data: ArrayBuffer) => void; - mitmConnection: MitmWorkerConnection; + deviceWorkerConnection: DeviceWorkerConnection; ws: WebSocket; log: Logger; heartbeatHandle: NodeJS.Timer; @@ -26,7 +29,7 @@ export class ScannerConnection extends EventEmitter { isAlive: boolean; loginListener: number; - constructor(log: Logger, ws: WebSocket, mitmConnection: MitmWorkerConnection) { + constructor(log: Logger, ws: WebSocket, deviceWorkerConnection: DeviceWorkerConnection) { super(); this.ws = ws; this.log = log; @@ -38,16 +41,16 @@ export class ScannerConnection extends EventEmitter { this.instanceNo = instanceNo++; this._ws_message_handler = (data: ArrayBuffer) => this.#handleScannerMessage(data); - this._ws_close_handler = () => this.#handleScannerDisconnection(); + this._ws_close_handler = () => this.#handleControllerDisconnection(); ws.on('message', this._ws_message_handler); ws.on('close', this._ws_close_handler); ws.on('pong', () => this.heartbeat()); - this.mitmConnection = mitmConnection; - this._mitm_message_handler = (data: ArrayBuffer) => this.#handleMitmMessage(data); - this._mitm_disconnect_handler = () => this.#handleMitmDisconnection(); - this.mitmConnection.on('received', this._mitm_message_handler); - this.mitmConnection.on('disconnected', this._mitm_disconnect_handler); + this.deviceWorkerConnection = deviceWorkerConnection; + this._device_message_handler = (data: ArrayBuffer) => this.#handleMitmMessage(data); + this._device_disconnect_handler = () => this.#handleMitmDisconnection(); + this.deviceWorkerConnection.on('received', this._device_message_handler); + this.deviceWorkerConnection.on('disconnected', this._device_disconnect_handler); this.heartbeatHandle = setInterval(() => this.checkHeartbeat(), 30000); } @@ -73,7 +76,7 @@ export class ScannerConnection extends EventEmitter { * @returns {string} */ get workerId(): string { - return this.mitmConnection.workerId as string; + return this.deviceWorkerConnection.workerId as string; } /** @@ -81,7 +84,7 @@ export class ScannerConnection extends EventEmitter { * @returns {string} */ get origin() { - return this.mitmConnection.origin; + return this.deviceWorkerConnection.origin; } /** @@ -110,7 +113,7 @@ export class ScannerConnection extends EventEmitter { } } - this.mitmConnection.send(message); + this.deviceWorkerConnection.send(message); } /** @@ -124,33 +127,33 @@ export class ScannerConnection extends EventEmitter { this.ws.send(message, { binary: true }); } - #handleScannerDisconnection() { + #handleControllerDisconnection() { this.isAlive = false; clearInterval(this.heartbeatHandle); // Tell users scanner is disconnected this.emit('disconnected', this); - this.#disconnectFromMitmWebsocket(); + this.#disconnectFromDeviceWebsocket(); } /** * Disconnect scanner */ disconnect() { - this.#disconnectFromMitmWebsocket(); + this.#disconnectFromDeviceWebsocket(); this.ws.close(3000, 'Device has been disconnected'); } /** - * Disconnect associated mitm + * Disconnect associated device */ - disconnectMitm() { - this.mitmConnection.ws.close(3005, 'Scanner disconnected'); + disconnectDevice() { + this.deviceWorkerConnection.ws.close(3005, 'Controller disconnected'); } - #disconnectFromMitmWebsocket() { - this.mitmConnection.removeListener('received', this._mitm_message_handler); - this.mitmConnection.removeListener('disconnected', this._mitm_disconnect_handler); + #disconnectFromDeviceWebsocket() { + this.deviceWorkerConnection.removeListener('received', this._device_message_handler); + this.deviceWorkerConnection.removeListener('disconnected', this._device_disconnect_handler); } heartbeat() { @@ -183,7 +186,7 @@ export class ScannerConnection extends EventEmitter { this.ws.ping(); } - serialize(): ScannerConnectionDTO { + serialize(): ControllerConnectionDTO { return { dateLastMessageSent: this.dateLastMessageSent, instanceNo: this.instanceNo, diff --git a/packages/connections/src/lib/mitmControlConnection.ts b/packages/connections/src/lib/deviceControlConnection.ts similarity index 91% rename from packages/connections/src/lib/mitmControlConnection.ts rename to packages/connections/src/lib/deviceControlConnection.ts index 61e1d42..b049ccc 100644 --- a/packages/connections/src/lib/mitmControlConnection.ts +++ b/packages/connections/src/lib/deviceControlConnection.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; -import { WebSocket } from 'ws'; import { Logger } from 'winston'; +import { WebSocket } from 'ws'; import { DTO } from './utils/type'; @@ -19,7 +19,7 @@ class Deferred { } } -export type MitmControlDTO = Omit, 'ws' | 'log' | 'heartbeatHandle'>; +export type DeviceControlDTO = Omit, 'ws' | 'log' | 'heartbeatHandle'>; interface MemoryStatus { memFree: number; @@ -27,7 +27,7 @@ interface MemoryStatus { memStart: number; } -export class MitmControlConnection extends EventEmitter { +export class DeviceControlConnection extends EventEmitter { deviceId?: string; log: Logger; init: boolean; @@ -91,7 +91,7 @@ export class MitmControlConnection extends EventEmitter { this.noMessagesReceived = 0; this.responses = {}; } catch (e) { - this.log.error(`MITM /control - error decoding welcome message, disconnecting`); + this.log.error(`Device /control - error decoding welcome message, disconnecting`); this.ws.close(); return; } @@ -112,13 +112,13 @@ export class MitmControlConnection extends EventEmitter { promise.reject(`Status ${response.status} ${response.body?.errorReason ?? ''}`); } } else { - this.log.warn(`${this.deviceId}: ${message}`); + this.log.debug(`${this.deviceId}: DEVICEC> ${message}`); this.noMessagesSent++; this.dateLastMessageSent = Date.now(); this.ws.send(message.toString()); @@ -131,7 +131,7 @@ export class MitmControlConnection extends EventEmitter { checkHeartbeat() { if (!this.isAlive) { // Pong has not been received in last interval seconds - this.log.warn(`${this.deviceId}/${this.instanceNo}: MITM - No response to ping - forcing disconnect`); + this.log.warn(`${this.deviceId}/${this.instanceNo}: DEVICE - No response to ping - forcing disconnect`); clearInterval(this.heartbeatHandle); this.ws.terminate(); @@ -209,7 +209,7 @@ export class MitmControlConnection extends EventEmitter { return this.responses[id].promise; } - serialize(): MitmControlDTO { + serialize(): DeviceControlDTO { return { dateLastMessageReceived: this.dateLastMessageReceived, dateLastMessageSent: this.dateLastMessageSent, diff --git a/packages/connections/src/lib/mitmWorkerConnection.ts b/packages/connections/src/lib/deviceWorkerConnection.ts similarity index 95% rename from packages/connections/src/lib/mitmWorkerConnection.ts rename to packages/connections/src/lib/deviceWorkerConnection.ts index 72c7a9f..9f946a1 100644 --- a/packages/connections/src/lib/mitmWorkerConnection.ts +++ b/packages/connections/src/lib/deviceWorkerConnection.ts @@ -1,16 +1,16 @@ import { EventEmitter } from 'events'; -import { WebSocket } from 'ws'; import { Logger } from 'winston'; +import { WebSocket } from 'ws'; -import { DTO } from './utils/type'; import { RotomProtos } from './utils/mitmProto'; +import { DTO } from './utils/type'; import WelcomeMessage = RotomProtos.WelcomeMessage; let instanceNo = 0; -export type MitmWorkerDTO = Omit, 'ws' | 'log' | 'heartbeatHandle'>; +export type DeviceWorkerDTO = Omit, 'ws' | 'log' | 'heartbeatHandle'>; -export class MitmWorkerConnection extends EventEmitter { +export class DeviceWorkerConnection extends EventEmitter { workerId?: string; deviceId?: string; userAgent?: string; @@ -125,7 +125,7 @@ export class MitmWorkerConnection extends EventEmitter { this.emit('disconnected', this); } - serialize(): MitmWorkerDTO { + serialize(): DeviceWorkerDTO { return { dateLastMessageReceived: this.dateLastMessageReceived, dateLastMessageSent: this.dateLastMessageSent, diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index da0ae2b..70ef5de 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -1,34 +1,38 @@ process.title = 'Rotom'; import { config } from '@rotom/config'; +import { ControllerConnection, DeviceControlConnection, DeviceWorkerConnection } from '@rotom/connections'; +import { JobsDTO, JobsStatusDTO, StatusDTO, WorkerDTO } from '@rotom/types'; import { FastifyInstance } from 'fastify'; import { inspect } from 'util'; -import { MitmWorkerConnection, ScannerConnection, MitmControlConnection } from '@rotom/connections'; import { WebSocketServer } from 'ws'; -import { StatusDTO, WorkerDTO, JobsDTO, JobsStatusDTO } from '@rotom/types'; import { - promRegistry, - workersTotalGauge, - workersActiveGauge, - devicesTotalGauge, - devicesAliveGauge, deviceMemoryFree, deviceMemoryMitm, deviceMemoryStart, + devicesAliveGauge, + devicesTotalGauge, + promRegistry, + valueOrZero, + workersActiveGauge, + workersTotalGauge, } from './utils'; import { JobExecutor } from './jobExecutor'; -import { jobs, JobLoader } from './jobLoader'; +import { JobLoader, jobs } from './jobLoader'; import { log } from './logger'; -import { startWebserver, fastify } from './webserver'; +import { fastify, startWebserver } from './webserver'; //import fa from '@faker-js/faker/locales/fa'; /* Initialise websocket server from Mitm */ -const wssMitm = new WebSocketServer({ port: config.deviceListener.port, perMessageDeflate: false }); +const wssDevice = new WebSocketServer({ port: config.deviceListener.port, perMessageDeflate: false }); -const controlConnections: Record = {}; -const currentConnections: Record = {}; +const controlConnections: Record = {}; +const currentConnections: Record< + string, + { deviceWorker: DeviceWorkerConnection; controller: ControllerConnection | null } +> = {}; const unallocatedConnections: string[] = []; -const deviceInformation: Record = {}; +const deviceInformation: Record = {}; process .on('unhandledRejection', (reason, p) => { @@ -46,47 +50,47 @@ process process.exit(); }); -wssMitm.on('connection', (ws, req) => { +wssDevice.on('connection', (ws, req) => { if (config.deviceListener.secret) { if (config.deviceListener.secret != req.headers['x-rotom-secret']) { - log.info(`MITM: New connection from ${req.socket.remoteAddress} url ${req.url} - incorrect secret, rejecting`); + log.info(`Device: New connection from ${req.socket.remoteAddress} url ${req.url} - incorrect secret, rejecting`); ws.close(3401, 'Invalid secret presented'); return; } } - log.info(`MITM: New connection from ${req.socket.remoteAddress} url ${req.url}`); + log.info(`Device: New connection from ${req.socket.remoteAddress} url ${req.url}`); if (req.url === '/control') { - const mitmControlConnection = new MitmControlConnection(log, ws); - mitmControlConnection.on('init', (mitm: MitmControlConnection) => { + const deviceControlConnection = new DeviceControlConnection(log, ws); + deviceControlConnection.on('init', (device: DeviceControlConnection) => { log.info( - `${mitm.deviceId}/${mitm.instanceNo}: Control Channel received id packet origin ${mitm.origin} - version ${mitm.version}`, + `${device.deviceId}/${device.instanceNo}: Control Channel received id packet origin ${device.origin} - version ${device.version}`, ); - const deviceId = mitm.deviceId as string; + const deviceId = device.deviceId as string; - controlConnections[deviceId] = mitm; + controlConnections[deviceId] = device; deviceInformation[deviceId] = { - lastScannerConnection: Date.now() / 1000, + lastControllerConnection: Date.now() / 1000, }; - const mitmTestIntervalHandle = setInterval(async () => { - // MITM current internal reboot logic: + const deviceTestIntervalHandle = setInterval(async () => { + // Device current internal reboot logic: // (((currentMemory/memoryUsageStart) > 2f && memFree < 80000) || memFree < 50000 || (currentMemory/memoryUsageStart) < 4f) && (Settings.memoryDetection && Settings.scanmode == 0) try { - const memoryStatus = await mitm.getMemoryUsage(); - log.info(`${mitm.deviceId}/${mitm.instanceNo}:Memory = ${JSON.stringify(memoryStatus)}`); + const memoryStatus = await device.getMemoryUsage(); + log.info(`${device.deviceId}/${device.instanceNo}:Memory = ${JSON.stringify(memoryStatus)}`); let restartRequired = false; if (memoryStatus.memFree && memoryStatus.memFree < config.monitor.minMemory) { log.warn( - `${mitm.deviceId}/${mitm.instanceNo}: ${memoryStatus.memFree} < ${config.monitor.minMemory} - RESTART REQUIRED`, + `${device.deviceId}/${device.instanceNo}: ${memoryStatus.memFree} < ${config.monitor.minMemory} - RESTART REQUIRED`, ); restartRequired = true; } if (memoryStatus.memStart) { const prefix = Object.keys(config.monitor.maxMemStartMultipleOverwrite).find((key) => - mitm.origin?.startsWith(key), + device.origin?.startsWith(key), ); const value = prefix @@ -95,7 +99,7 @@ wssMitm.on('connection', (ws, req) => { if (memoryStatus.memMitm > memoryStatus.memStart * value) { log.warn( - `${mitm.deviceId}/${mitm.instanceNo}: ${memoryStatus.memMitm} > ${memoryStatus.memStart} * ${value} - RESTART REQUIRED`, + `${device.deviceId}/${device.instanceNo}: ${memoryStatus.memMitm} > ${memoryStatus.memStart} * ${value} - RESTART REQUIRED`, ); restartRequired = true; } @@ -103,13 +107,13 @@ wssMitm.on('connection', (ws, req) => { if (restartRequired) { if (config.monitor.reboot) { - log.warn(`${mitm.deviceId}/${mitm.instanceNo}: Asking for reboot`); + log.warn(`${device.deviceId}/${device.instanceNo}: Asking for reboot`); // eslint-disable-next-line @typescript-eslint/no-empty-function - mitm.reboot().catch(() => {}); + device.reboot().catch(() => {}); } else { - log.warn(`${mitm.deviceId}/${mitm.instanceNo}: Asking for restart`); + log.warn(`${device.deviceId}/${device.instanceNo}: Asking for restart`); // eslint-disable-next-line @typescript-eslint/no-empty-function - mitm.restartApp().catch(() => {}); + device.restartApp().catch(() => {}); } } } catch { @@ -117,59 +121,59 @@ wssMitm.on('connection', (ws, req) => { } }, 60000); - mitm.on('disconnected', (/* mitmControl: MitmControlConnection */) => { + device.on('disconnected', (/* mitmControl: MitmControlConnection */) => { // This would remove disconnected entries immediately // if (controlConnections[deviceId] && controlConnections[deviceId] == mitmControl) { // delete controlConnections[deviceId]; // } - clearInterval(mitmTestIntervalHandle); + clearInterval(deviceTestIntervalHandle); }); }); return; } - const mitmConnection = new MitmWorkerConnection(log, ws); - mitmConnection.on('init', (mitmWorker: MitmWorkerConnection) => { + const deviceConnection = new DeviceWorkerConnection(log, ws); + deviceConnection.on('init', (deviceWorker: DeviceWorkerConnection) => { log.info( - `${mitmWorker.workerId}/${mitmWorker.instanceNo}: Received id packet origin ${mitmWorker.origin} - version ${mitmWorker.version}`, + `${deviceWorker.workerId}/${deviceWorker.instanceNo}: Received id packet origin ${deviceWorker.origin} - version ${deviceWorker.version}`, ); - const workerId = mitmWorker.workerId as string; + const workerId = deviceWorker.workerId as string; const currentConnection = currentConnections[workerId]; if (currentConnection) { - log.info(`${mitmWorker.workerId}/${mitmWorker.instanceNo}: This is a reconnection, making this current`); - if (currentConnection.scanner) { - log.info(`${mitmWorker.workerId}/${mitmWorker.instanceNo}: Scanner was connected - closing it`); - currentConnection.scanner.disconnect(); + log.info(`${deviceWorker.workerId}/${deviceWorker.instanceNo}: This is a reconnection, making this current`); + if (currentConnection.controller) { + log.info(`${deviceWorker.workerId}/${deviceWorker.instanceNo}: Controller was connected - closing it`); + currentConnection.controller.disconnect(); } } currentConnections[workerId] = { - mitm: mitmWorker, - scanner: null, + deviceWorker: deviceWorker, + controller: null, }; if (!unallocatedConnections.includes(workerId)) unallocatedConnections.push(workerId); log.info(`${workerId}: unallocated connections = ${unallocatedConnections.join(',')}`); }); - mitmConnection.on('disconnected', (mitmWorker: MitmWorkerConnection) => { - const workerId = mitmWorker.workerId as string; - const instanceNo = mitmWorker.instanceNo; + deviceConnection.on('disconnected', (deviceWorker: DeviceWorkerConnection) => { + const workerId = deviceWorker.workerId as string; + const instanceNo = deviceWorker.instanceNo; console.log(`${workerId}/${instanceNo}: Disconnected; performing disconnection activities`); if (workerId) { const currentConnection = currentConnections[workerId]; if (currentConnection) { - if (currentConnection.mitm !== mitmWorker) { + if (currentConnection.deviceWorker !== deviceWorker) { log.info(`${workerId}/${instanceNo}: Disconnection of non-current connection, ignoring`); return; } - if (currentConnection.scanner) { - log.info(`${workerId}: Disconnect: There was a Scanner connected, disconnecting`); - currentConnection.scanner.disconnect(); + if (currentConnection.controller) { + log.info(`${workerId}: Disconnect: There was a Controller connected, disconnecting`); + currentConnection.controller.disconnect(); } } @@ -184,16 +188,16 @@ wssMitm.on('connection', (ws, req) => { }); }); -/* Initialise websocket server from Scanner */ +/* Initialize websocket server from Controller */ -const wssScanner = new WebSocketServer({ port: config.controllerListener.port }); +const wssController = new WebSocketServer({ port: config.controllerListener.port }); function identifyControlChannelFromWorkerId(workerId: string): string | null { // Try to look up connected worker id and see if it presented us with a device id const connection = currentConnections[workerId]; if (connection) { - const deviceId = connection.mitm?.deviceId; + const deviceId = connection.deviceWorker?.deviceId; if (deviceId) { return deviceId; } @@ -207,19 +211,19 @@ function identifyControlChannelFromWorkerId(workerId: string): string | null { return null; } -wssScanner.on('connection', (ws, req) => { +wssController.on('connection', (ws, req) => { if (config.controllerListener.secret) { if (config.controllerListener.secret != req.headers['x-rotom-secret']) { - log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - incorrect secret, rejecting`); + log.info(`CONTROLLER: New connection from ${req.socket.remoteAddress} - incorrect secret, rejecting`); ws.close(3401, 'Invalid secret presented'); return; } } if (!unallocatedConnections.length) { - log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - no spare MITMs, rejecting`); + log.info(`CONTROLLER: New connection from ${req.socket.remoteAddress} - no spare Workers, rejecting`); // error! - ws.close(3001, 'No devices available'); + ws.close(3001, 'No workers available'); return; } @@ -228,19 +232,21 @@ wssScanner.on('connection', (ws, req) => { const firstSpareWorkerId = nextSpareWorkerId; do { const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId); - log.info(`SCANNER: Found ${mainDeviceId} connects to workerId ${nextSpareWorkerId}`); + log.info(`CONTROLLER: Found ${mainDeviceId} connects to workerId ${nextSpareWorkerId}`); if (mainDeviceId == null) { - log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device`); + log.info(`CONTROLLER: Warning - found ${nextSpareWorkerId} in pool with no record of main device`); unallocatedConnections.push(nextSpareWorkerId); nextSpareWorkerId = unallocatedConnections.shift() as string; } else { const mainDeviceInfo = deviceInformation[mainDeviceId]; if (!mainDeviceInfo) { - log.info(`SCANNER: Warning - found ${nextSpareWorkerId} in pool with no record of main device ${mainDeviceId}`); + log.info( + `CONTROLLER: Warning - found ${nextSpareWorkerId} in pool with no record of main device ${mainDeviceId}`, + ); unallocatedConnections.push(nextSpareWorkerId); nextSpareWorkerId = unallocatedConnections.shift() as string; } else { - if (mainDeviceInfo.lastScannerConnection + config.monitor.deviceCooldown > Date.now() / 1000) { + if (mainDeviceInfo.lastControllerConnection + config.monitor.deviceCooldown > Date.now() / 1000) { // device was allocated to someone else too recently, find another unallocatedConnections.push(nextSpareWorkerId); nextSpareWorkerId = unallocatedConnections.shift() as string; @@ -255,7 +261,7 @@ wssScanner.on('connection', (ws, req) => { // no devices found, return the original one back to pool unallocatedConnections.push(nextSpareWorkerId); log.info( - `SCANNER: New connection from ${req.socket.remoteAddress} - no MITMs available outside cooldown, rejecting`, + `CONTROLLER: New connection from ${req.socket.remoteAddress} - no Devices available outside cooldown, rejecting`, ); // error! ws.close(3001, 'All devices are in cooldown'); @@ -264,31 +270,31 @@ wssScanner.on('connection', (ws, req) => { // Set last connection time on device const mainDeviceId = identifyControlChannelFromWorkerId(nextSpareWorkerId) as string; - deviceInformation[mainDeviceId].lastScannerConnection = Date.now() / 1000; + deviceInformation[mainDeviceId].lastControllerConnection = Date.now() / 1000; - log.info(`SCANNER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareWorkerId}`); + log.info(`CONTROLLER: New connection from ${req.socket.remoteAddress} - will allocate ${nextSpareWorkerId}`); const currentConnection = currentConnections[nextSpareWorkerId]; - const scannerConnection = new ScannerConnection(log, ws, currentConnection.mitm); - currentConnection.scanner = scannerConnection; + const controllerConnection = new ControllerConnection(log, ws, currentConnection.deviceWorker); + currentConnection.controller = controllerConnection; - scannerConnection.on('disconnected', (con: ScannerConnection) => { + controllerConnection.on('disconnected', (con: ControllerConnection) => { // Replace webservice connection as available const workerId = con.workerId; log.info( - `SCANNER: Disconnected worker ${con.workerName}/${con.instanceNo} device - disconnecting from mitm to trigger cleanup`, + `CONTROLLER: Disconnected worker ${con.workerName}/${con.instanceNo} device - disconnecting from device to trigger cleanup`, ); - // Mark this Scanner as not in use + // Mark this Controller as not in use const currentConnection = currentConnections[workerId]; if (currentConnection) { - currentConnection.scanner = null; + currentConnection.controller = null; } else { - log.info(`SCANNER: did not find a connection from this MITM`); + log.info(`CONTROLLER: did not find a connection from this device`); } - // Now disconnect mitm - con.disconnectMitm(); + // Now disconnect device + con.disconnectDevice(); }); }); @@ -302,15 +308,15 @@ if (config.logging.consoleStatus) { const dateNow = Date.now(); for (const connections of Object.values(currentConnections)) { - if (connections && connections.mitm && connections.mitm.origin) { - const mitm = connections.mitm; - const SCANNER = connections.scanner; + if (connections && connections.deviceWorker && connections.deviceWorker.origin) { + const deviceWorker = connections.deviceWorker; + const CONTROLLER = connections.controller; connectionCounts.push( - `${mitm.origin}[${mitm.workerId}]: ${mitm.noMessagesSent}${ - dateNow - mitm.dateLastMessageSent > 10000 ? '*' : '' - }/${mitm.noMessagesReceived}${dateNow - mitm.dateLastMessageReceived > 10000 ? '*' : ''} ${ - SCANNER ? `SCANNER ${SCANNER.workerName}/${SCANNER.instanceNo}` : 'Unused' + `${deviceWorker.origin}[${deviceWorker.workerId}]: ${deviceWorker.noMessagesSent}${ + dateNow - deviceWorker.dateLastMessageSent > 10000 ? '*' : '' + }/${deviceWorker.noMessagesReceived}${dateNow - deviceWorker.dateLastMessageReceived > 10000 ? '*' : ''} ${ + CONTROLLER ? `CONTROLLER ${CONTROLLER.workerName}/${CONTROLLER.instanceNo}` : 'Unused' }`, ); } @@ -335,13 +341,11 @@ setInterval(() => { .forEach(([, connection]) => { const origin = connection?.origin || 'Unknown'; connectedDevices += 1; + const { memMitm, memFree, memStart } = connection.lastMemory; - const validMemFree = Number.isFinite(connection.lastMemory.memFree) ? connection.lastMemory.memFree : 0; - deviceMemoryFree.labels(origin).set(validMemFree); - const validMemMitm = Number.isFinite(connection.lastMemory.memMitm) ? connection.lastMemory.memMitm : 0; - deviceMemoryMitm.labels(origin).set(validMemMitm); - const validMemStart = Number.isFinite(connection.lastMemory.memStart) ? connection.lastMemory.memStart : 0; - deviceMemoryStart.labels(origin).set(validMemStart); + deviceMemoryFree.labels(origin).set(valueOrZero(memFree)); + deviceMemoryMitm.labels(origin).set(valueOrZero(memMitm)); + deviceMemoryStart.labels(origin).set(valueOrZero(memStart)); }); // Set device counts @@ -352,7 +356,7 @@ setInterval(() => { const originActiveWorkers: Record = {}; const originTotalWorkers: Record = {}; Object.entries(currentConnections).forEach(([, connection]) => { - const origin = connection.mitm?.origin || 'Unknown'; + const origin = connection.deviceWorker?.origin || 'Unknown'; if (!(origin in originActiveWorkers)) { originActiveWorkers[origin] = 0; } @@ -361,7 +365,7 @@ setInterval(() => { } originTotalWorkers[origin] += 1; - if (connection.scanner && connection.scanner.isAlive && connection.mitm.isAlive) { + if (connection.controller && connection.controller.isAlive && connection.deviceWorker.isAlive) { originActiveWorkers[origin] += 1; } }); @@ -370,12 +374,10 @@ setInterval(() => { workersTotalGauge.reset(); workersActiveGauge.reset(); Object.entries(originTotalWorkers).forEach(([name, number]) => { - const validNumber = Number.isFinite(number) ? number : 0; - workersTotalGauge.labels(name).set(validNumber); + workersTotalGauge.labels(name).set(valueOrZero(number)); }); Object.entries(originActiveWorkers).forEach(([name, number]) => { - const validNumber = Number.isFinite(number) ? number : 0; - workersActiveGauge.labels(name).set(validNumber); + workersActiveGauge.labels(name).set(valueOrZero(number)); }); }, 5000); @@ -439,14 +441,14 @@ const routes = async (fastifyInstance: FastifyInstance) => { const isAllocated = !unallocatedConnections.includes(workerId); const deviceId = - connection.mitm?.deviceId ?? + connection.deviceWorker?.deviceId ?? Object.keys(controlConnections).find((deviceId) => workerId.startsWith(deviceId)); return { deviceId, - scanner: connection.scanner?.serialize(), + controller: connection.controller?.serialize(), isAllocated, - mitm: connection.mitm.serialize(), + worker: connection.deviceWorker.serialize(), workerId, }; }), @@ -532,7 +534,7 @@ const routes = async (fastifyInstance: FastifyInstance) => { return; } - const devices: MitmControlConnection[] = deviceIdsOrOrigins + const devices: DeviceControlConnection[] = deviceIdsOrOrigins .map((deviceIdOrOrigin) => { if (deviceIdOrOrigin in controlConnections) { return controlConnections[deviceIdOrOrigin]; @@ -540,7 +542,7 @@ const routes = async (fastifyInstance: FastifyInstance) => { return Object.values(controlConnections).find((device) => device.origin === deviceIdOrOrigin); }) - .filter((device): device is MitmControlConnection => !!device); + .filter((device): device is DeviceControlConnection => !!device); if (devices.length === 0) { reply.code(404).send({ status: 'error', error: `No device found with IDS or origins ${deviceIdsOrOrigins}` }); diff --git a/packages/server/src/jobExecutor.ts b/packages/server/src/jobExecutor.ts index 0b4be35..d6d7460 100644 --- a/packages/server/src/jobExecutor.ts +++ b/packages/server/src/jobExecutor.ts @@ -1,4 +1,4 @@ -import { MitmControlConnection } from '@rotom/connections'; +import { DeviceControlConnection } from '@rotom/connections'; import { Job, JobStatus } from '@rotom/types'; let jobExecutionNo = 1; @@ -10,7 +10,7 @@ export class JobExecutor { this.jobStatus = {}; } - runJob(device: MitmControlConnection, job: Job): number { + runJob(device: DeviceControlConnection, job: Job): number { const jobNo = jobExecutionNo++; this.jobStatus[jobNo] = { diff --git a/packages/server/src/utils.ts b/packages/server/src/utils.ts index 82fd7ad..033b776 100644 --- a/packages/server/src/utils.ts +++ b/packages/server/src/utils.ts @@ -51,3 +51,10 @@ export const deviceMemoryStart = new Gauge({ labelNames: ['origin'], registers: [promRegistry], }); + +export function valueOrZero(value?: number): number { + if (value === undefined) { + return 0; + } + return Number.isNaN(value) ? 0 : value; +} diff --git a/packages/types/src/status.ts b/packages/types/src/status.ts index cbbe4ef..bf9ce65 100644 --- a/packages/types/src/status.ts +++ b/packages/types/src/status.ts @@ -1,16 +1,16 @@ -import type { ScannerConnectionDTO, MitmControlDTO, MitmWorkerDTO } from '@rotom/connections'; +import type { ControllerConnectionDTO, DeviceControlDTO, DeviceWorkerDTO } from '@rotom/connections'; -export { ScannerConnectionDTO, MitmControlDTO, MitmWorkerDTO }; +export { ControllerConnectionDTO, DeviceControlDTO, DeviceWorkerDTO }; export type WorkerDTO = { + workerId: string; deviceId?: string; - scanner?: ScannerConnectionDTO; + controller?: ControllerConnectionDTO; isAllocated: boolean; - mitm: MitmWorkerDTO; - workerId: string; + worker: DeviceWorkerDTO; }; export interface StatusDTO { workers: WorkerDTO[]; - devices: MitmControlDTO[]; + devices: DeviceControlDTO[]; }