From 3720008d9dd69bedd36a9accaee9a084e925e842 Mon Sep 17 00:00:00 2001 From: ect0s <73128770+ect0s@users.noreply.github.com> Date: Sun, 15 Jan 2023 21:33:51 -0500 Subject: [PATCH 1/2] Packet Parsing changes, Clear pending callbacks/buffers onClose() to prevent deadlocks, call onClose on Unknown packets Restructure Packet Decode Add counter for packets/callbacks onClose() cleanups unknown packet should call decode. --- core/rcon.js | 220 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 136 insertions(+), 84 deletions(-) diff --git a/core/rcon.js b/core/rcon.js index 4cfa2e483..beffea51e 100644 --- a/core/rcon.js +++ b/core/rcon.js @@ -28,13 +28,15 @@ export default class Rcon extends EventEmitter { // bind methods this.connect = this.connect.bind(this); // we bind this as we call it on the auto reconnect timeout - this.onData = this.onData.bind(this); + this.onPacket = this.onPacket.bind(this); this.onClose = this.onClose.bind(this); this.onError = this.onError.bind(this); + this.decodeData = this.decodeData.bind(this); + this.encodePacket = this.encodePacket.bind(this); // setup socket this.client = new net.Socket(); - this.client.on('data', this.onData); + this.client.on('data', this.decodeData); this.client.on('close', this.onClose); this.client.on('error', this.onError); @@ -48,85 +50,104 @@ export default class Rcon extends EventEmitter { this.incomingData = Buffer.from([]); this.incomingResponse = []; - this.responseCallbackQueue = []; + // Used For tracking Callbacks + this.callbackIds = []; + this.count = 1; } - onData(data) { - Logger.verbose('RCON', 4, `Got data: ${this.bufToHexString(data)}`); - + onPacket(decodedPacket) { // the logic in this method simply splits data sent via the data event into packets regardless of how they're // distributed in the event calls - const packets = this.decodeData(data); - - for (const packet of packets) { - Logger.verbose('RCON', 4, `Processing packet: ${this.bufToHexString(packet)}`); + Logger.verbose( + 'RCON', + 2, + `Processing decoded packet: ${this.decodedPacketToString(decodedPacket)}` + ); + + switch (decodedPacket.type) { + case SERVERDATA_RESPONSE_VALUE: + case SERVERDATA_AUTH_RESPONSE: + switch (decodedPacket.id) { + case MID_PACKET_ID: + this.incomingResponse.push(decodedPacket); + + break; + case END_PACKET_ID: + this.callbackIds = this.callbackIds.filter((p) => p.id !== decodedPacket.count); + + this.responseCallbackQueue.shift()( + this.incomingResponse.map((packet) => packet.body).join() + ); + this.incomingResponse = []; + + break; + default: + Logger.verbose( + 'RCON', + 1, + `Unknown packet ID ${decodedPacket.id} in: ${this.decodedPacketToString( + decodedPacket + )}` + ); + this.onClose('Unknown Packet'); + } + break; - const decodedPacket = this.decodePacket(packet); - Logger.verbose( - 'RCON', - 3, - `Processing decoded packet: ${this.decodedPacketToString(decodedPacket)}` - ); + case SERVERDATA_CHAT_VALUE: + this.processChatPacket(decodedPacket); + break; - switch (decodedPacket.type) { - case SERVERDATA_RESPONSE_VALUE: - case SERVERDATA_AUTH_RESPONSE: - switch (decodedPacket.id) { - case MID_PACKET_ID: - this.incomingResponse.push(decodedPacket); - break; - case END_PACKET_ID: - this.responseCallbackQueue.shift()( - this.incomingResponse.map((packet) => packet.body).join() - ); - this.incomingResponse = []; - break; - default: - Logger.verbose( - 'RCON', - 1, - `Unknown packet ID ${decodedPacket.id} in: ${this.decodedPacketToString( - decodedPacket - )}` - ); - } - break; - - case SERVERDATA_CHAT_VALUE: - this.processChatPacket(decodedPacket); - break; - - default: - Logger.verbose( - 'RCON', - 1, - `Unknown packet type ${decodedPacket.type} in: ${this.decodedPacketToString( - decodedPacket - )}` - ); - } + default: + Logger.verbose( + 'RCON', + 1, + `Unknown packet type ${decodedPacket.type} in: ${this.decodedPacketToString( + decodedPacket + )}` + ); + this.onClose('Unknown Packet'); } } decodeData(data) { - this.incomingData = Buffer.concat([this.incomingData, data]); + Logger.verbose('RCON', 4, `Got data: ${this.bufToHexString(data)}`); - const packets = []; + this.incomingData = Buffer.concat([this.incomingData, data]); - // we check that it's greater than 4 as if it's not then the length header is not fully present which breaks the - // rest of the code. We just need to wait for more data. while (this.incomingData.byteLength >= 4) { const size = this.incomingData.readInt32LE(0); const packetSize = size + 4; + if (this.incomingData.byteLength < packetSize) { + Logger.verbose( + 'RCON', + 4, + `Waiting for more data... Have: ${this.incomingData.byteLength} Expected: ${packetSize}` + ); + break; + } + const packet = this.incomingData.slice(0, packetSize); + + Logger.verbose('RCON', 4, `Processing packet: ${this.bufToHexString(packet)}`); + const decodedPacket = this.decodePacket(packet); + + const matchCount = this.callbackIds.filter((d) => d.id === decodedPacket.count); + + if ( + matchCount.length > 0 || + [SERVERDATA_AUTH_RESPONSE, SERVERDATA_CHAT_VALUE].includes(decodedPacket.type) + ) { + this.onPacket(decodedPacket); + this.incomingData = this.incomingData.slice(packetSize); + continue; + } // The packet following an empty packet will report to be 10 long (14 including the size header bytes), but in // it should report 17 long (21 including the size header bytes). Therefore, if the packet is 10 in size // and there's enough data for it to be a longer packet then we need to probe to check it's this broken packet. - const probeSize = 17; const probePacketSize = 21; - if (size === 10 && this.incomingData.byteLength >= probeSize) { + if (size === 10 && this.incomingData.byteLength >= 21) { // copy the section of the incoming data of interest const probeBuf = this.incomingData.slice(0, probePacketSize); // decode it @@ -141,25 +162,17 @@ export default class Rcon extends EventEmitter { } } - if (this.incomingData.byteLength < packetSize) { - Logger.verbose('RCON', 4, `Waiting for more data...`); - break; - } - - const packet = this.incomingData.slice(0, packetSize); - packets.push(packet); - - this.incomingData = this.incomingData.slice(packetSize); + // We should only get this far into the loop when we are done processing packets from this onData event. + break; } - - return packets; } decodePacket(packet) { return { - size: packet.readInt32LE(0), - id: packet.readInt32LE(4), - type: packet.readInt32LE(8), + size: packet.readUInt32LE(0), + id: packet.readUInt8(4), + count: packet.readUInt16LE(6), + type: packet.readUInt32LE(8), body: packet.toString('utf8', 12, packet.byteLength - 2) }; } @@ -169,7 +182,33 @@ export default class Rcon extends EventEmitter { onClose(hadError) { this.connected = false; - Logger.verbose('RCON', 1, `Socket closed ${hadError ? 'without' : 'with'} an error.`); + Logger.verbose( + 'RCON', + 1, + `Socket closed ${hadError ? 'with' : 'without'} an error. ${hadError}` + ); + + // Cleanup all local state onClose + if (this.incomingData.length > 0) { + Logger.verbose('RCON', 2, `Clearing Buffered Data`); + this.incomingData = Buffer.from([]); + } + if (this.incomingResponse.length > 0) { + Logger.verbose('RCON', 2, `Clearing Buffered Response Data`); + this.incomingResponse = []; + } + if (this.responseCallbackQueue.length > 0) { + Logger.verbose('RCON', 2, `Clearing Pending Callbacks`); + + // Cleanup Pending Callbacks; We should maybe retry these on next connection + // However, depending on the reason we got disconnected it may be a while. + // IE, Squad server crash, Squad server shutdown for multiple minutes. + + while (this.responseCallbackQueue.length > 0) { + this.responseCallbackQueue.shift()(new Error('RCON DISCONNECTED')); + } + this.callbackIds = []; + } if (this.autoReconnect) { Logger.verbose('RCON', 1, `Sleeping ${this.autoReconnectDelay}ms before reconnecting.`); @@ -289,6 +328,7 @@ export default class Rcon extends EventEmitter { }; // the auth packet also sends a normal response, so we add an extra empty action to ignore it + this.callbackIds.push({ id: this.count, cmd: body }); if (type === SERVERDATA_AUTH) { this.responseCallbackQueue.push(() => {}); this.responseCallbackQueue.push((decodedPacket) => { @@ -305,18 +345,27 @@ export default class Rcon extends EventEmitter { this.responseCallbackQueue.push((response) => { this.client.removeListener('error', onError); - Logger.verbose( - 'RCON', - 2, - `Returning complete response: ${response.replace(/\r\n|\r|\n/g, '\\n')}` - ); + if (response instanceof Error) { + // Called from onClose() + reject(response); + } else { + Logger.verbose( + 'RCON', + 2, + `Returning complete response: ${response.replace(/\r\n|\r|\n/g, '\\n')}` + ); - resolve(response); + resolve(response); + } }); } this.client.once('error', onError); + if (this.count + 1 > 65535) { + this.count = 1; + } + Logger.verbose('RCON', 4, `Sending packet: ${this.bufToHexString(encodedPacket)}`); this.client.write(encodedPacket); @@ -327,6 +376,7 @@ export default class Rcon extends EventEmitter { `Sending empty packet: ${this.bufToHexString(encodedEmptyPacket)}` ); this.client.write(encodedEmptyPacket); + this.count++; } }); } @@ -335,11 +385,13 @@ export default class Rcon extends EventEmitter { const size = Buffer.byteLength(body) + 14; const buf = Buffer.alloc(size); - buf.writeInt32LE(size - 4, 0); - buf.writeInt32LE(id, 4); - buf.writeInt32LE(type, 8); + buf.writeUInt32LE(size - 4, 0); + buf.writeUInt8(id, 4); + buf.writeUInt8(0, 5); + buf.writeUInt16LE(this.count, 6); + buf.writeUInt32LE(type, 8); buf.write(body, 12, size - 2, encoding); - buf.writeInt16LE(0, size - 2); + buf.writeUInt16LE(0, size - 2); return buf; } From 643395843365c802d736c7d7fbda42dd335f6011 Mon Sep 17 00:00:00 2001 From: ect0s <73128770+ect0s@users.noreply.github.com> Date: Wed, 18 Jan 2023 16:01:09 -0500 Subject: [PATCH 2/2] Add check for "logged in", avoid race with timed admin commands --- core/rcon.js | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/core/rcon.js b/core/rcon.js index beffea51e..af6b8997b 100644 --- a/core/rcon.js +++ b/core/rcon.js @@ -54,6 +54,7 @@ export default class Rcon extends EventEmitter { // Used For tracking Callbacks this.callbackIds = []; this.count = 1; + this.loggedin = false; } onPacket(decodedPacket) { @@ -181,7 +182,7 @@ export default class Rcon extends EventEmitter { onClose(hadError) { this.connected = false; - + this.loggedin = false; Logger.verbose( 'RCON', 1, @@ -306,6 +307,11 @@ export default class Rcon extends EventEmitter { return; } + if (!this.loggedin && type !== SERVERDATA_AUTH) { + reject(new Error('RCON not Logged in')); + return; + } + Logger.verbose('RCON', 2, `Writing packet with type "${type}" and body "${body}".`); const encodedPacket = this.encodePacket( @@ -328,8 +334,9 @@ export default class Rcon extends EventEmitter { }; // the auth packet also sends a normal response, so we add an extra empty action to ignore it - this.callbackIds.push({ id: this.count, cmd: body }); + if (type === SERVERDATA_AUTH) { + this.callbackIds.push({ id: this.count, cmd: body }); this.responseCallbackQueue.push(() => {}); this.responseCallbackQueue.push((decodedPacket) => { this.client.removeListener('error', onError); @@ -338,10 +345,12 @@ export default class Rcon extends EventEmitter { reject(new Error('Authentication failed.')); } else { Logger.verbose('RCON', 1, 'Authentication succeeded.'); + this.loggedin = true; resolve(); } }); } else { + this.callbackIds.push({ id: this.count, cmd: body }); this.responseCallbackQueue.push((response) => { this.client.removeListener('error', onError);