Skip to content

Commit

Permalink
Network: Remove built in nagle algo and fully go async
Browse files Browse the repository at this point in the history
  • Loading branch information
killerwife committed Jan 19, 2024
1 parent 49d61a3 commit cd6d946
Show file tree
Hide file tree
Showing 24 changed files with 895 additions and 1,551 deletions.
4 changes: 2 additions & 2 deletions src/game/Server/WorldSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ bool WorldSessionFilter::Process(WorldPacket const& packet) const
/// WorldSession constructor
WorldSession::WorldSession(uint32 id, WorldSocket* sock, AccountTypes sec, uint8 expansion, time_t mute_time, LocaleConstant locale, std::string accountName, uint32 accountFlags, uint32 recruitingFriend, bool isARecruiter) :
LookingForGroup_auto_join(false), LookingForGroup_auto_add(true), m_muteTime(mute_time),
_player(nullptr), m_Socket(sock ? sock->shared<WorldSocket>() : nullptr), m_requestSocket(nullptr), m_localAddress("127.0.0.1"), m_sessionState(WORLD_SESSION_STATE_CREATED),
_player(nullptr), m_Socket(sock ? sock->shared_from_this() : nullptr), m_requestSocket(nullptr), m_localAddress("127.0.0.1"), m_sessionState(WORLD_SESSION_STATE_CREATED),
_security(sec), _accountId(id), m_expansion(expansion), m_accountName(accountName), m_accountFlags(accountFlags),
m_clientOS(CLIENT_OS_UNKNOWN), m_clientPlatform(CLIENT_PLATFORM_UNKNOWN), m_gameBuild(0), m_accountMaxLevel(0), m_orderCounter(0), m_lastAnticheatUpdate(0), m_anticheat(nullptr),
_logoutTime(0), m_playerSave(true), m_inQueue(false), m_playerLoading(false), m_kickSession(false), m_playerLogout(false), m_playerRecentlyLogout(false),
Expand Down Expand Up @@ -166,7 +166,7 @@ bool WorldSession::RequestNewSocket(WorldSocket* socket)
if (m_requestSocket)
return false;

m_requestSocket = socket->shared<WorldSocket>();
m_requestSocket = socket->shared_from_this();
m_sessionState = WORLD_SESSION_STATE_CREATED;
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/game/Server/WorldSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,8 @@ class WorldSession

uint32 m_GUIDLow; // set logged or recently logout player (while m_playerRecentlyLogout set)
Player* _player;
std::shared_ptr<WorldSocket> m_Socket; // socket pointer is owned by the network thread which created it
std::shared_ptr<WorldSocket> m_requestSocket; // a new socket for this session is requested (double connection)
boost::shared_ptr<WorldSocket> m_Socket; // socket pointer is owned by the network thread which created it
boost::shared_ptr<WorldSocket> m_requestSocket; // a new socket for this session is requested (double connection)
std::string m_localAddress;
WorldSessionState m_sessionState; // this session state

Expand Down
234 changes: 101 additions & 133 deletions src/game/Server/WorldSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

#include <boost/asio.hpp>
#include <utility>
#include <vector>

#if defined( __GNUC__ )
#pragma pack(1)
Expand Down Expand Up @@ -84,8 +85,8 @@ std::deque<uint32> WorldSocket::GetIncOpcodeHistory()
return m_opcodeHistoryInc;
}

WorldSocket::WorldSocket(boost::asio::io_service& service, std::function<void (Socket*)> closeHandler) : Socket(service, std::move(closeHandler)), m_lastPingTime(std::chrono::system_clock::time_point::min()), m_overSpeedPings(0), m_existingHeader(),
m_useExistingHeader(false), m_session(nullptr), m_seed(urand()), m_loggingPackets(false)
WorldSocket::WorldSocket(boost::asio::io_service& service) : AsyncSocket(service), m_lastPingTime(std::chrono::system_clock::time_point::min()), m_overSpeedPings(0),
m_session(nullptr), m_seed(urand()), m_loggingPackets(false)
{
}

Expand Down Expand Up @@ -113,24 +114,32 @@ void WorldSocket::SendPacket(const WorldPacket& pct, bool immediate)

m_crypt.EncryptSend(reinterpret_cast<uint8*>(&header), sizeof(header));

if (pct.size() > 0)
Write(reinterpret_cast<const char*>(&header), sizeof(header), reinterpret_cast<const char*>(pct.contents()), pct.size());
else
Write(reinterpret_cast<const char*>(&header), sizeof(header));

if (immediate)
ForceFlushOut();
uint32 opcode = pct.GetOpcode();

m_opcodeHistoryOut.push_front(uint32(pct.GetOpcode()));
m_opcodeHistoryOut.push_front(uint32(opcode));
if (m_opcodeHistoryOut.size() > 50)
m_opcodeHistoryOut.resize(30);

if (pct.size() > 0)
{
std::shared_ptr<std::vector<char>> fullMessage = std::make_shared<std::vector<char>>();
fullMessage->resize(sizeof(header) + pct.size()); // allocate array for full message
std::memcpy(fullMessage->data(), reinterpret_cast<const char*>(&header), sizeof(header)); // copy header
std::memcpy((fullMessage->data() + sizeof(header)), reinterpret_cast<const char*>(pct.contents()), pct.size()); // copy packet
auto self(shared_from_this());
Write(fullMessage->data(), fullMessage->size(), [self, fullMessage](const boost::system::error_code& error, std::size_t read) {});
}
else
{
std::shared_ptr<ServerPktHeader> sharedHeader = std::make_shared<ServerPktHeader>();
*sharedHeader = header;
auto self(shared_from_this());
Write(reinterpret_cast<const char*>(sharedHeader.get()), sizeof(header), [self, sharedHeader](const boost::system::error_code& error, std::size_t read) {});
}
}

bool WorldSocket::Open()
bool WorldSocket::OnOpen()
{
if (!Socket::Open())
return false;

// Send startup packet.
WorldPacket packet(SMSG_AUTH_CHALLENGE, 40);
packet << m_seed;
Expand All @@ -150,143 +159,102 @@ bool WorldSocket::Open()

bool WorldSocket::ProcessIncomingData()
{
ClientPktHeader header;
std::shared_ptr<ClientPktHeader> header = std::make_shared<ClientPktHeader>();

if (m_useExistingHeader)
auto self(shared_from_this());
Read((char*)header.get(), sizeof(ClientPktHeader), [self, header](const boost::system::error_code& error, std::size_t read) -> void
{
m_useExistingHeader = false;
header = m_existingHeader;

ReadSkip(sizeof(ClientPktHeader));
}
else
{
if (!Read((char*)&header, sizeof(ClientPktHeader)))
{
errno = EBADMSG;
return false;
}

if (error) return;
// thread safe due to always being called from service context
m_crypt.DecryptRecv((uint8*)&header, sizeof(ClientPktHeader));

EndianConvertReverse(header.size);
EndianConvert(header.cmd);
}

// there must always be at least four bytes for the opcode,
// and 0x2800 is the largest supported buffer in the client
if ((header.size < 4) || (header.size > 0x2800) || (header.cmd >= NUM_MSG_TYPES))
{
sLog.outError("WorldSocket::ProcessIncomingData: client sent malformed packet size = %u , cmd = %u", header.size, header.cmd);

errno = EINVAL;
return false;
}

// the minus four is because we've already read the four byte opcode value
const uint16 validBytesRemaining = header.size - 4;

// check if the client has told us that there is more data than there is
if (validBytesRemaining > ReadLengthRemaining())
{
// we must preserve the decrypted header so as not to corrupt the crypto state, and to prevent duplicating work
m_useExistingHeader = true;
m_existingHeader = header;

// we move the read pointer backward because it will be skipped again later. this is a slight kludge, but to solve
// it more elegantly would require introducing protocol awareness into the socket library, which we want to avoid
ReadSkip(-static_cast<int>(sizeof(ClientPktHeader)));

errno = EBADMSG;
return false;
}

const Opcodes opcode = static_cast<Opcodes>(header.cmd);
self->m_crypt.DecryptRecv((uint8*)header.get(), sizeof(ClientPktHeader));

if (IsClosed())
return false;
EndianConvertReverse(header->size);
EndianConvert(header->cmd);

std::unique_ptr<WorldPacket> pct(new WorldPacket(opcode, validBytesRemaining));
const Opcodes opcode = static_cast<Opcodes>(header->cmd);

if (validBytesRemaining)
{
pct->append(InPeak(), validBytesRemaining);
ReadSkip(validBytesRemaining);
}
size_t packetSize = header->size - 4;
std::shared_ptr<std::vector<uint8>> packetBuffer = std::make_shared<std::vector<uint8>>(packetSize);

if (sPacketLog->CanLogPacket() && IsLoggingPackets())
sPacketLog->LogPacket(*pct, CLIENT_TO_SERVER, GetRemoteIpAddress(), GetRemotePort());
self->Read(reinterpret_cast<char*>(packetBuffer->data()), packetBuffer->size(), [self, packetBuffer, opcode, packetSize](const boost::system::error_code& error, std::size_t read) -> void
{
std::unique_ptr<WorldPacket> pct(new WorldPacket(opcode, packetSize));
pct->append(*packetBuffer.get());
if (sPacketLog->CanLogPacket() && self->IsLoggingPackets())
sPacketLog->LogPacket(*pct, CLIENT_TO_SERVER, self->GetRemoteIpAddress(), self->GetRemotePort());

sLog.outWorldPacketDump(GetRemoteEndpoint().c_str(), pct->GetOpcode(), pct->GetOpcodeName(), *pct, true);
sLog.outWorldPacketDump(self->GetRemoteEndpoint().c_str(), pct->GetOpcode(), pct->GetOpcodeName(), *pct, true);

if (WorldSocket::m_packetCooldowns[opcode])
{
auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(Clock::now());
if (now < m_lastPacket[opcode]) // packet on cooldown
return true;
else // start cooldown and allow execution
m_lastPacket[opcode] = now + std::chrono::milliseconds(WorldSocket::m_packetCooldowns[opcode]);
}
if (WorldSocket::m_packetCooldowns[opcode])
{
auto now = std::chrono::time_point_cast<std::chrono::milliseconds>(Clock::now());
if (now < self->m_lastPacket[opcode]) // packet on cooldown
return;
else // start cooldown and allow execution
self->m_lastPacket[opcode] = now + std::chrono::milliseconds(WorldSocket::m_packetCooldowns[opcode]);
}

try
{
switch (opcode)
{
case CMSG_AUTH_SESSION:
if (m_session)
try
{
switch (opcode)
{
sLog.outError("WorldSocket::ProcessIncomingData: Player send CMSG_AUTH_SESSION again");
return false;
case CMSG_AUTH_SESSION:
if (self->m_session)
{
sLog.outError("WorldSocket::ProcessIncomingData: Player send CMSG_AUTH_SESSION again");
return;
}

if (!self->HandleAuthSession(*pct))
return;
break;
case CMSG_PING:
if (!self->HandlePing(*pct))
return;
break;
case CMSG_KEEP_ALIVE:
DEBUG_LOG("CMSG_KEEP_ALIVE, size: " SIZEFMTD " ", pct->size());
break;
case CMSG_TIME_SYNC_RESP:
pct->SetReceivedTime(std::chrono::steady_clock::now());
default:
{
self->m_opcodeHistoryInc.push_front(uint32(pct->GetOpcode()));
if (self->m_opcodeHistoryInc.size() > 50)
self->m_opcodeHistoryInc.resize(30);

if (!self->m_session)
{
sLog.outError("WorldSocket::ProcessIncomingData: Client not authed opcode = %u", uint32(opcode));
return;
}

self->m_session->QueuePacket(std::move(pct));
break;
}
}

return HandleAuthSession(*pct);

case CMSG_PING:
return HandlePing(*pct);

case CMSG_KEEP_ALIVE:
DEBUG_LOG("CMSG_KEEP_ALIVE, size: " SIZEFMTD " ", pct->size());
return true;

case CMSG_TIME_SYNC_RESP:
pct->SetReceivedTime(std::chrono::steady_clock::now());
default:
}
catch (ByteBufferException&)
{
m_opcodeHistoryInc.push_front(uint32(pct->GetOpcode()));
if (m_opcodeHistoryInc.size() > 50)
m_opcodeHistoryInc.resize(30);
sLog.outError("WorldSocket::ProcessIncomingData ByteBufferException occured while parsing an instant handled packet (opcode: %u) from client %s, accountid=%i.",
opcode, self->GetRemoteAddress().c_str(), self->m_session ? self->m_session->GetAccountId() : -1);

if (!m_session)
if (sLog.HasLogLevelOrHigher(LOG_LVL_DEBUG))
{
sLog.outError("WorldSocket::ProcessIncomingData: Client not authed opcode = %u", uint32(opcode));
return false;
DEBUG_LOG("Dumping error-causing packet:");
pct->hexlike();
}

m_session->QueuePacket(std::move(pct));

return true;
if (sWorld.getConfig(CONFIG_BOOL_KICK_PLAYER_ON_BAD_PACKET))
{
DETAIL_LOG("Disconnecting session [account id %i / address %s] for badly formatted packet.",
self->m_session ? self->m_session->GetAccountId() : -1, self->GetRemoteAddress().c_str());
return;
}
}
}
}
catch (ByteBufferException&)
{
sLog.outError("WorldSocket::ProcessIncomingData ByteBufferException occured while parsing an instant handled packet (opcode: %u) from client %s, accountid=%i.",
opcode, GetRemoteAddress().c_str(), m_session ? m_session->GetAccountId() : -1);

if (sLog.HasLogLevelOrHigher(LOG_LVL_DEBUG))
{
DEBUG_LOG("Dumping error-causing packet:");
pct->hexlike();
}

if (sWorld.getConfig(CONFIG_BOOL_KICK_PLAYER_ON_BAD_PACKET))
{
DETAIL_LOG("Disconnecting session [account id %i / address %s] for badly formatted packet.",
m_session ? m_session->GetAccountId() : -1, GetRemoteAddress().c_str());
return false;
}
}
self->ProcessIncomingData();
});
});

return true;
}
Expand Down
11 changes: 4 additions & 7 deletions src/game/Server/WorldSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
#include "Common.h"
#include "AuthCrypt.h"
#include "Auth/BigNumber.h"
#include "Network/Socket.hpp"
#include "Network/AsyncSocket.hpp"

#include <chrono>
#include <functional>
Expand Down Expand Up @@ -74,7 +74,7 @@ class WorldSession;
*
*/

class WorldSocket : public MaNGOS::Socket
class WorldSocket : public MaNGOS::AsyncSocket<WorldSocket>
{
private:
#if defined( __GNUC__ )
Expand All @@ -99,9 +99,6 @@ class WorldSocket : public MaNGOS::Socket
/// Keep track of over-speed pings ,to prevent ping flood.
uint32 m_overSpeedPings;

ClientPktHeader m_existingHeader;
bool m_useExistingHeader;

/// Class used for managing encryption of the headers
AuthCrypt m_crypt;

Expand Down Expand Up @@ -129,14 +126,14 @@ class WorldSocket : public MaNGOS::Socket
bool m_loggingPackets;

public:
WorldSocket(boost::asio::io_service& service, std::function<void (Socket*)> closeHandler);
WorldSocket(boost::asio::io_service& service);

// send a packet \o/
void SendPacket(const WorldPacket& pct, bool immediate = false);

void FinalizeSession() { m_session = nullptr; }

virtual bool Open() override;
bool OnOpen() override;

/// Return the session key
BigNumber& GetSessionKey() { return m_s; }
Expand Down
Loading

0 comments on commit cd6d946

Please sign in to comment.