Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve RCON packet Parsing and Cleanup on connection close to prevent deadlocks #267

Closed
wants to merge 3 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 145 additions & 84 deletions core/rcon.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ export default class Rcon extends EventEmitter {

// bind methods
this.connect = this.connect.bind(this); // we bind this as we call it on the auto reconnect timeout
this.onData = this.onData.bind(this);
this.onPacket = this.onPacket.bind(this);
this.onClose = this.onClose.bind(this);
this.onError = this.onError.bind(this);
this.decodeData = this.decodeData.bind(this);
this.encodePacket = this.encodePacket.bind(this);

// setup socket
this.client = new net.Socket();
this.client.on('data', this.onData);
this.client.on('data', this.decodeData);
this.client.on('close', this.onClose);
this.client.on('error', this.onError);

Expand All @@ -48,85 +50,105 @@ export default class Rcon extends EventEmitter {

this.incomingData = Buffer.from([]);
this.incomingResponse = [];

this.responseCallbackQueue = [];
// Used For tracking Callbacks
this.callbackIds = [];
this.count = 1;
this.loggedin = false;
}

onData(data) {
Logger.verbose('RCON', 4, `Got data: ${this.bufToHexString(data)}`);

onPacket(decodedPacket) {
// the logic in this method simply splits data sent via the data event into packets regardless of how they're
// distributed in the event calls
const packets = this.decodeData(data);

for (const packet of packets) {
Logger.verbose('RCON', 4, `Processing packet: ${this.bufToHexString(packet)}`);
Logger.verbose(
'RCON',
2,
`Processing decoded packet: ${this.decodedPacketToString(decodedPacket)}`
);

switch (decodedPacket.type) {
case SERVERDATA_RESPONSE_VALUE:
case SERVERDATA_AUTH_RESPONSE:
switch (decodedPacket.id) {
case MID_PACKET_ID:
this.incomingResponse.push(decodedPacket);

break;
case END_PACKET_ID:
this.callbackIds = this.callbackIds.filter((p) => p.id !== decodedPacket.count);

this.responseCallbackQueue.shift()(
this.incomingResponse.map((packet) => packet.body).join()
);
this.incomingResponse = [];

break;
default:
Logger.verbose(
'RCON',
1,
`Unknown packet ID ${decodedPacket.id} in: ${this.decodedPacketToString(
decodedPacket
)}`
);
this.onClose('Unknown Packet');
}
break;

const decodedPacket = this.decodePacket(packet);
Logger.verbose(
'RCON',
3,
`Processing decoded packet: ${this.decodedPacketToString(decodedPacket)}`
);
case SERVERDATA_CHAT_VALUE:
this.processChatPacket(decodedPacket);
break;

switch (decodedPacket.type) {
case SERVERDATA_RESPONSE_VALUE:
case SERVERDATA_AUTH_RESPONSE:
switch (decodedPacket.id) {
case MID_PACKET_ID:
this.incomingResponse.push(decodedPacket);
break;
case END_PACKET_ID:
this.responseCallbackQueue.shift()(
this.incomingResponse.map((packet) => packet.body).join()
);
this.incomingResponse = [];
break;
default:
Logger.verbose(
'RCON',
1,
`Unknown packet ID ${decodedPacket.id} in: ${this.decodedPacketToString(
decodedPacket
)}`
);
}
break;

case SERVERDATA_CHAT_VALUE:
this.processChatPacket(decodedPacket);
break;

default:
Logger.verbose(
'RCON',
1,
`Unknown packet type ${decodedPacket.type} in: ${this.decodedPacketToString(
decodedPacket
)}`
);
}
default:
Logger.verbose(
'RCON',
1,
`Unknown packet type ${decodedPacket.type} in: ${this.decodedPacketToString(
decodedPacket
)}`
);
this.onClose('Unknown Packet');
}
}

decodeData(data) {
this.incomingData = Buffer.concat([this.incomingData, data]);
Logger.verbose('RCON', 4, `Got data: ${this.bufToHexString(data)}`);

const packets = [];
this.incomingData = Buffer.concat([this.incomingData, data]);

// we check that it's greater than 4 as if it's not then the length header is not fully present which breaks the
// rest of the code. We just need to wait for more data.
while (this.incomingData.byteLength >= 4) {
const size = this.incomingData.readInt32LE(0);
const packetSize = size + 4;

if (this.incomingData.byteLength < packetSize) {
Logger.verbose(
'RCON',
4,
`Waiting for more data... Have: ${this.incomingData.byteLength} Expected: ${packetSize}`
);
break;
}
const packet = this.incomingData.slice(0, packetSize);

Logger.verbose('RCON', 4, `Processing packet: ${this.bufToHexString(packet)}`);
const decodedPacket = this.decodePacket(packet);

const matchCount = this.callbackIds.filter((d) => d.id === decodedPacket.count);

if (
matchCount.length > 0 ||
[SERVERDATA_AUTH_RESPONSE, SERVERDATA_CHAT_VALUE].includes(decodedPacket.type)
) {
this.onPacket(decodedPacket);
this.incomingData = this.incomingData.slice(packetSize);
continue;
}
// The packet following an empty packet will report to be 10 long (14 including the size header bytes), but in
// it should report 17 long (21 including the size header bytes). Therefore, if the packet is 10 in size
// and there's enough data for it to be a longer packet then we need to probe to check it's this broken packet.
const probeSize = 17;
const probePacketSize = 21;

if (size === 10 && this.incomingData.byteLength >= probeSize) {
if (size === 10 && this.incomingData.byteLength >= 21) {
// copy the section of the incoming data of interest
const probeBuf = this.incomingData.slice(0, probePacketSize);
// decode it
Expand All @@ -141,25 +163,17 @@ export default class Rcon extends EventEmitter {
}
}

if (this.incomingData.byteLength < packetSize) {
Logger.verbose('RCON', 4, `Waiting for more data...`);
break;
}

const packet = this.incomingData.slice(0, packetSize);
packets.push(packet);

this.incomingData = this.incomingData.slice(packetSize);
// We should only get this far into the loop when we are done processing packets from this onData event.
break;
}

return packets;
}

decodePacket(packet) {
return {
size: packet.readInt32LE(0),
id: packet.readInt32LE(4),
type: packet.readInt32LE(8),
size: packet.readUInt32LE(0),
id: packet.readUInt8(4),
count: packet.readUInt16LE(6),
type: packet.readUInt32LE(8),
body: packet.toString('utf8', 12, packet.byteLength - 2)
};
}
Expand All @@ -168,8 +182,34 @@ export default class Rcon extends EventEmitter {

onClose(hadError) {
this.connected = false;
this.loggedin = false;
Logger.verbose(
'RCON',
1,
`Socket closed ${hadError ? 'with' : 'without'} an error. ${hadError}`
);

// Cleanup all local state onClose
if (this.incomingData.length > 0) {
Logger.verbose('RCON', 2, `Clearing Buffered Data`);
this.incomingData = Buffer.from([]);
}
if (this.incomingResponse.length > 0) {
Logger.verbose('RCON', 2, `Clearing Buffered Response Data`);
this.incomingResponse = [];
}
if (this.responseCallbackQueue.length > 0) {
Logger.verbose('RCON', 2, `Clearing Pending Callbacks`);

Logger.verbose('RCON', 1, `Socket closed ${hadError ? 'without' : 'with'} an error.`);
// Cleanup Pending Callbacks; We should maybe retry these on next connection
// However, depending on the reason we got disconnected it may be a while.
// IE, Squad server crash, Squad server shutdown for multiple minutes.

while (this.responseCallbackQueue.length > 0) {
this.responseCallbackQueue.shift()(new Error('RCON DISCONNECTED'));
}
this.callbackIds = [];
}

if (this.autoReconnect) {
Logger.verbose('RCON', 1, `Sleeping ${this.autoReconnectDelay}ms before reconnecting.`);
Expand Down Expand Up @@ -267,6 +307,11 @@ export default class Rcon extends EventEmitter {
return;
}

if (!this.loggedin && type !== SERVERDATA_AUTH) {
reject(new Error('RCON not Logged in'));
return;
}

Logger.verbose('RCON', 2, `Writing packet with type "${type}" and body "${body}".`);

const encodedPacket = this.encodePacket(
Expand All @@ -289,7 +334,9 @@ export default class Rcon extends EventEmitter {
};

// the auth packet also sends a normal response, so we add an extra empty action to ignore it

if (type === SERVERDATA_AUTH) {
this.callbackIds.push({ id: this.count, cmd: body });
this.responseCallbackQueue.push(() => {});
this.responseCallbackQueue.push((decodedPacket) => {
this.client.removeListener('error', onError);
Expand All @@ -298,25 +345,36 @@ export default class Rcon extends EventEmitter {
reject(new Error('Authentication failed.'));
} else {
Logger.verbose('RCON', 1, 'Authentication succeeded.');
this.loggedin = true;
resolve();
}
});
} else {
this.callbackIds.push({ id: this.count, cmd: body });
this.responseCallbackQueue.push((response) => {
this.client.removeListener('error', onError);

Logger.verbose(
'RCON',
2,
`Returning complete response: ${response.replace(/\r\n|\r|\n/g, '\\n')}`
);
if (response instanceof Error) {
// Called from onClose()
reject(response);
} else {
Logger.verbose(
'RCON',
2,
`Returning complete response: ${response.replace(/\r\n|\r|\n/g, '\\n')}`
);

resolve(response);
resolve(response);
}
});
}

this.client.once('error', onError);

if (this.count + 1 > 65535) {
this.count = 1;
}

Logger.verbose('RCON', 4, `Sending packet: ${this.bufToHexString(encodedPacket)}`);
this.client.write(encodedPacket);

Expand All @@ -327,6 +385,7 @@ export default class Rcon extends EventEmitter {
`Sending empty packet: ${this.bufToHexString(encodedEmptyPacket)}`
);
this.client.write(encodedEmptyPacket);
this.count++;
}
});
}
Expand All @@ -335,11 +394,13 @@ export default class Rcon extends EventEmitter {
const size = Buffer.byteLength(body) + 14;
const buf = Buffer.alloc(size);

buf.writeInt32LE(size - 4, 0);
buf.writeInt32LE(id, 4);
buf.writeInt32LE(type, 8);
buf.writeUInt32LE(size - 4, 0);
buf.writeUInt8(id, 4);
buf.writeUInt8(0, 5);
buf.writeUInt16LE(this.count, 6);
buf.writeUInt32LE(type, 8);
buf.write(body, 12, size - 2, encoding);
buf.writeInt16LE(0, size - 2);
buf.writeUInt16LE(0, size - 2);

return buf;
}
Expand Down