From a8e41a25cf5841d77a5fd1a1ff1d79f9267df550 Mon Sep 17 00:00:00 2001 From: sanych-sun Date: Tue, 5 Dec 2023 23:05:33 -0800 Subject: [PATCH] Minor changes --- src/RabbitMQ.Next/Sockets/EndpointResolver.cs | 45 ++++++++++++------- src/RabbitMQ.Next/Sockets/SocketWrapper.cs | 33 +++----------- 2 files changed, 36 insertions(+), 42 deletions(-) diff --git a/src/RabbitMQ.Next/Sockets/EndpointResolver.cs b/src/RabbitMQ.Next/Sockets/EndpointResolver.cs index 082eaf8..740d46c 100644 --- a/src/RabbitMQ.Next/Sockets/EndpointResolver.cs +++ b/src/RabbitMQ.Next/Sockets/EndpointResolver.cs @@ -1,12 +1,13 @@ using System; using System.Collections.Generic; +using System.IO; using System.Linq; using System.Net; +using System.Net.Security; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Next.Exceptions; -using RabbitMQ.Next.Transport; namespace RabbitMQ.Next.Sockets; @@ -44,32 +45,44 @@ IPAddress FindAddress(IReadOnlyList address, AddressFamily family) var ipV6Address = FindAddress(addresses, AddressFamily.InterNetworkV6); if (ipV6Address != null) { - var socket = await ConnectAsync(ipV6Address, endpoint.Port, cancellation); - if (socket != null) - { - return new SocketWrapper(socket, endpoint); - } + return await ConnectAsync(ipV6Address, endpoint, cancellation); } // 2. Try IP v4 var ipV4Address = FindAddress(addresses, AddressFamily.InterNetwork); if (ipV4Address != null) { - var socket = await ConnectAsync(ipV4Address, endpoint.Port, cancellation); - if (socket != null) - { - return new SocketWrapper(socket, endpoint); - } + return await ConnectAsync(ipV4Address, endpoint, cancellation); } throw new NotSupportedException("Cannot connect to the endpoint: no supported protocols is available"); } - private static async Task ConnectAsync(IPAddress address, int port, CancellationToken cancellation) + private static async Task ConnectAsync(IPAddress address, Endpoint endpoint, CancellationToken cancellation) + { + var ipEndPoint = new IPEndPoint(address, endpoint.Port); + var socket = new Socket(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + await socket.ConnectAsync(ipEndPoint, cancellation); + var stream = ConfigureStream(socket, endpoint); + return new SocketWrapper(socket, stream); + } + + private static Stream ConfigureStream(Socket socket, Endpoint endpoint) { - var endpoint = new IPEndPoint(address, port); - var socket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - await socket.ConnectAsync(endpoint, cancellation); - return socket; + Stream stream = new NetworkStream(socket) + { + ReadTimeout = 60000, + WriteTimeout = 60000, + }; + + if (endpoint.UseSsl) + { + var sslStream = new SslStream(stream, false); + sslStream.AuthenticateAsClient(endpoint.Host); + + stream = sslStream; + } + + return stream; } } \ No newline at end of file diff --git a/src/RabbitMQ.Next/Sockets/SocketWrapper.cs b/src/RabbitMQ.Next/Sockets/SocketWrapper.cs index 460c9cb..860085f 100644 --- a/src/RabbitMQ.Next/Sockets/SocketWrapper.cs +++ b/src/RabbitMQ.Next/Sockets/SocketWrapper.cs @@ -1,5 +1,4 @@ using System.IO; -using System.Net.Security; using System.Net.Sockets; using RabbitMQ.Next.Buffers; @@ -8,29 +7,12 @@ namespace RabbitMQ.Next.Sockets; internal class SocketWrapper : ISocket { private readonly Socket socket; - private readonly Stream readStream; - private readonly Stream writeStream; + private readonly Stream stream; - public SocketWrapper(Socket socket, Endpoint endpoint) + public SocketWrapper(Socket socket, Stream stream) { this.socket = socket; - - Stream stream = new NetworkStream(socket) - { - ReadTimeout = 60000, - WriteTimeout = 60000, - }; - - if (endpoint.UseSsl) - { - var sslStream = new SslStream(stream, false); - sslStream.AuthenticateAsClient(endpoint.Host); - - stream = sslStream; - } - - this.readStream = stream; - this.writeStream = stream; + this.stream = stream; } public void Send(IMemoryAccessor payload) @@ -38,11 +20,11 @@ public void Send(IMemoryAccessor payload) var current = payload; while (current != null) { - current.WriteTo(this.writeStream); + current.WriteTo(this.stream); current = current.Next; } - this.writeStream.Flush(); + this.stream.Flush(); } public int Receive(byte[] buffer, int offset, int minBytes) @@ -50,7 +32,7 @@ public int Receive(byte[] buffer, int offset, int minBytes) var received = 0; while (received < minBytes) { - var readBytes = this.readStream.Read(buffer, offset, buffer.Length - offset); + var readBytes = this.stream.Read(buffer, offset, buffer.Length - offset); if (readBytes == 0 && this.IsConnectionClosedByServer()) { throw new SocketException(); @@ -68,8 +50,7 @@ private bool IsConnectionClosedByServer() public void Dispose() { - this.readStream?.Dispose(); - this.writeStream?.Dispose(); + this.stream.Dispose(); this.socket?.Dispose(); } } \ No newline at end of file