From 078e2f09a5a513b01936d7441666230be91b8a39 Mon Sep 17 00:00:00 2001 From: FiniteReality Date: Fri, 16 Nov 2018 17:57:50 +0000 Subject: [PATCH 1/7] Add Audio project --- Wumpus.Net.sln | 15 +++++++++++++++ src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj | 16 ++++++++++++++++ 2 files changed, 31 insertions(+) create mode 100644 src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj diff --git a/Wumpus.Net.sln b/Wumpus.Net.sln index 609cadd..ac37d95 100644 --- a/Wumpus.Net.sln +++ b/Wumpus.Net.sln @@ -32,6 +32,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Wumpus.Net.Tests.Server", " EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Wumpus.Net.Bot", "src\Wumpus.Net.Bot\Wumpus.Net.Bot.csproj", "{ED977313-7BC8-4A5E-8A24-1BF42635D293}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Wumpus.Net.Audio", "src\Wumpus.Net.Audio\Wumpus.Net.Audio.csproj", "{5ABF3E1B-45E9-4CA7-A455-8923C54B480F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -150,6 +152,18 @@ Global {ED977313-7BC8-4A5E-8A24-1BF42635D293}.Release|x64.Build.0 = Release|Any CPU {ED977313-7BC8-4A5E-8A24-1BF42635D293}.Release|x86.ActiveCfg = Release|Any CPU {ED977313-7BC8-4A5E-8A24-1BF42635D293}.Release|x86.Build.0 = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x64.ActiveCfg = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x64.Build.0 = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x86.ActiveCfg = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Debug|x86.Build.0 = Debug|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|Any CPU.Build.0 = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x64.ActiveCfg = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x64.Build.0 = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x86.ActiveCfg = Release|Any CPU + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -164,6 +178,7 @@ Global {0967FF88-E435-4132-9524-B252A3D9BCD6} = {9806905E-1B09-4AC2-BF4F-731C9E473F94} {C6BE9630-1E1F-4DD1-9205-045A50181AA7} = {9806905E-1B09-4AC2-BF4F-731C9E473F94} {ED977313-7BC8-4A5E-8A24-1BF42635D293} = {F7B9BFB1-C836-4432-BE64-719A38E0BBEF} + {5ABF3E1B-45E9-4CA7-A455-8923C54B480F} = {F7B9BFB1-C836-4432-BE64-719A38E0BBEF} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B5B02039-F4C2-44C6-91B8-340E44472AED} diff --git a/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj b/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj new file mode 100644 index 0000000..888ac5f --- /dev/null +++ b/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj @@ -0,0 +1,16 @@ + + + + Wumpus + Wumpus.Net.Audio + discord;discordapp;wumpus;audio;websocket;api;rogueexception + Provides a low-level audio client for the Discord API + netstandard2.0 + + + + + + + + From fdf4283bf38963d19cced47579eac52875638c1c Mon Sep 17 00:00:00 2001 From: FiniteReality Date: Sat, 17 Nov 2018 18:50:37 +0000 Subject: [PATCH 2/7] Primitive implementation --- .../Events/VoiceHelloEvent.cs | 15 + .../Events/VoiceReadyEvent.cs | 20 + .../Events/VoiceSessionDescriptionEvent.cs | 23 + .../Events/VoiceSpeakingEvent.cs | 16 + src/Wumpus.Net.Audio/IPUtilities.cs | 38 ++ .../Requests/VoiceIdentifyParams.cs | 17 + .../Requests/VoiceResumeParams.cs | 15 + .../Requests/VoiceSelectProtocolParams.cs | 26 + src/Wumpus.Net.Audio/SpeakingState.cs | 15 + src/Wumpus.Net.Audio/VoiceGatewayOperation.cs | 41 ++ src/Wumpus.Net.Audio/VoiceGatewayPayload.cs | 33 ++ src/Wumpus.Net.Audio/WumpusAudioDataClient.cs | 212 ++++++++ .../WumpusAudioGatewayClient.cs | 452 ++++++++++++++++++ src/Wumpus.Net.Core/AssemblyInfo.cs | 1 + 14 files changed, 924 insertions(+) create mode 100644 src/Wumpus.Net.Audio/Events/VoiceHelloEvent.cs create mode 100644 src/Wumpus.Net.Audio/Events/VoiceReadyEvent.cs create mode 100644 src/Wumpus.Net.Audio/Events/VoiceSessionDescriptionEvent.cs create mode 100644 src/Wumpus.Net.Audio/Events/VoiceSpeakingEvent.cs create mode 100644 src/Wumpus.Net.Audio/IPUtilities.cs create mode 100644 src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs create mode 100644 src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs create mode 100644 src/Wumpus.Net.Audio/Requests/VoiceSelectProtocolParams.cs create mode 100644 src/Wumpus.Net.Audio/SpeakingState.cs create mode 100644 src/Wumpus.Net.Audio/VoiceGatewayOperation.cs create mode 100644 src/Wumpus.Net.Audio/VoiceGatewayPayload.cs create mode 100644 src/Wumpus.Net.Audio/WumpusAudioDataClient.cs create mode 100644 src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs diff --git a/src/Wumpus.Net.Audio/Events/VoiceHelloEvent.cs b/src/Wumpus.Net.Audio/Events/VoiceHelloEvent.cs new file mode 100644 index 0000000..e959c48 --- /dev/null +++ b/src/Wumpus.Net.Audio/Events/VoiceHelloEvent.cs @@ -0,0 +1,15 @@ +using Voltaic.Serialization; + +namespace Wumpus.Events +{ + public class VoiceHelloEvent + { + // Given as float because discord returns json of the form + // {"op":8,"d":{"v":4,"heartbeat_interval":13750.0}} + [ModelProperty("heartbeat_interval")] + public float HeartbeatInterval { get; set; } + + [ModelProperty("v")] + public int GatewayVersion { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Events/VoiceReadyEvent.cs b/src/Wumpus.Net.Audio/Events/VoiceReadyEvent.cs new file mode 100644 index 0000000..e73170c --- /dev/null +++ b/src/Wumpus.Net.Audio/Events/VoiceReadyEvent.cs @@ -0,0 +1,20 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Events +{ + public class VoiceReadyEvent + { + [ModelProperty("ssrc")] + public uint Ssrc { get; set; } + + [ModelProperty("port")] + public int Port { get; set; } + + [ModelProperty("modes")] + public Utf8String[] EncryptionSchemes { get; set; } + + [ModelProperty("ip")] + public Utf8String IpAddress { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Events/VoiceSessionDescriptionEvent.cs b/src/Wumpus.Net.Audio/Events/VoiceSessionDescriptionEvent.cs new file mode 100644 index 0000000..f055b72 --- /dev/null +++ b/src/Wumpus.Net.Audio/Events/VoiceSessionDescriptionEvent.cs @@ -0,0 +1,23 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Events +{ + public class VoiceSessionDescriptionEvent + { + [ModelProperty("video_codec")] + public Utf8String VideoCodec { get; set; } + + [ModelProperty("secret_key")] + public byte[] SecretKey { get; set; } + + [ModelProperty("mode")] + public Utf8String EncryptionScheme { get; set; } + + [ModelProperty("media_session_id")] + public Utf8String VideoSessionId { get; set; } + + [ModelProperty("audio_codec")] + public Utf8String AudioCodec { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Events/VoiceSpeakingEvent.cs b/src/Wumpus.Net.Audio/Events/VoiceSpeakingEvent.cs new file mode 100644 index 0000000..18d16bb --- /dev/null +++ b/src/Wumpus.Net.Audio/Events/VoiceSpeakingEvent.cs @@ -0,0 +1,16 @@ +using Voltaic.Serialization; + +namespace Wumpus.Events +{ + public class VoiceSpeakingEvent + { + [ModelProperty("delay")] + public int DelayMilliseconds { get; set; } + + [ModelProperty("speaking")] + public SpeakingState Speaking { get; set; } + + [ModelProperty("ssrc")] + public uint Ssrc { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/IPUtilities.cs b/src/Wumpus.Net.Audio/IPUtilities.cs new file mode 100644 index 0000000..c88b0aa --- /dev/null +++ b/src/Wumpus.Net.Audio/IPUtilities.cs @@ -0,0 +1,38 @@ +using System; +using System.Net; +using Voltaic.Serialization.Utf8; + +namespace Wumpus +{ + public static class IPUtilities + { + public static bool TryParseIPv4Address(ReadOnlySpan buffer, out IPAddress address) + => TryParseIPv4Address(ref buffer, out address); + + public static bool TryParseIPv4Address(ref ReadOnlySpan buffer, out IPAddress address) + { + address = default; + ulong value = 0; + + for (int i = 0; i < 4; i++) + { + if (!Utf8Reader.TryReadUInt8(ref buffer, out byte section, 'g')) + return false; + + value |= (ulong)section << (i * 8); + + // last value does not have a dot following it + if (i != 3) + { + if (buffer[0] != '.') + return false; + + buffer = buffer.Slice(1); + } + } + + address = new IPAddress((long)value); + return true; + } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs new file mode 100644 index 0000000..40c9f34 --- /dev/null +++ b/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs @@ -0,0 +1,17 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Requests +{ + public class VoiceIdentifyParams + { + [ModelProperty("user_id")] + public Snowflake UserId { get; set; } + [ModelProperty("server_id")] + public Snowflake GuildId { get; set; } + [ModelProperty("session_id")] + public Utf8String SessionId { get; set; } + [ModelProperty("token")] + public Utf8String Token { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs new file mode 100644 index 0000000..5c90fc0 --- /dev/null +++ b/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs @@ -0,0 +1,15 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Requests +{ + public class VoiceResumeParams + { + [ModelProperty("server_id")] + public Snowflake GuildId { get; set; } + [ModelProperty("session_id")] + public Utf8String SessionId { get; set; } + [ModelProperty("token")] + public Utf8String Token { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/Requests/VoiceSelectProtocolParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceSelectProtocolParams.cs new file mode 100644 index 0000000..1ffc047 --- /dev/null +++ b/src/Wumpus.Net.Audio/Requests/VoiceSelectProtocolParams.cs @@ -0,0 +1,26 @@ +using Voltaic; +using Voltaic.Serialization; + +namespace Wumpus.Requests +{ + public class VoiceSelectProtocolParams + { + [ModelProperty("protocol")] + public Utf8String TransportProtocol { get; set; } + + [ModelProperty("data")] + public VoiceSelectProtocolConnectionProperties Properties { get; set; } + } + + public class VoiceSelectProtocolConnectionProperties + { + [ModelProperty("ip")] + public Utf8String Ip { get; set; } + + [ModelProperty("port")] + public int Port { get; set; } + + [ModelProperty("mode")] + public Utf8String EncryptionScheme { get; set; } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/SpeakingState.cs b/src/Wumpus.Net.Audio/SpeakingState.cs new file mode 100644 index 0000000..b2dfdbe --- /dev/null +++ b/src/Wumpus.Net.Audio/SpeakingState.cs @@ -0,0 +1,15 @@ +using System; + +namespace Wumpus +{ + [Flags] + public enum SpeakingState : byte + { + NotSpeaking = 0b0, + Speaking = 0b1, + + Priority = 0b100, + + PrioritySpeaking = Speaking | Priority + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/VoiceGatewayOperation.cs b/src/Wumpus.Net.Audio/VoiceGatewayOperation.cs new file mode 100644 index 0000000..2474171 --- /dev/null +++ b/src/Wumpus.Net.Audio/VoiceGatewayOperation.cs @@ -0,0 +1,41 @@ +namespace Wumpus.Events +{ + /// + /// Voice connections operate in a similar fashion to the Gateway connection. + /// However, they use a different set of payloads and a separate UDP-based connection for voice data transmission. + /// https://discordapp.com/developers/docs/topics/voice-connections#voice + /// + public enum VoiceGatewayOperation : byte + { + /// C→S - Used to begin a voice websocket connection. + Identify = 0, + /// C→S - Used to select the voice protocol. + SelectProtocol = 1, + /// C←S - Used to complete the websocket handshake. + Ready = 2, + /// C→S - Used to keep the websocket connection alive. + Heartbeat = 3, + /// C→S - Used to describe the session. + SessionDescription = 4, + /// C↔S - Used to indicate which users are speaking. + Speaking = 5, + /// C←S - Used to reply to a heartbeat. + HeartbeatAck = 6, + /// C→S - Used to resume a connection. + Resume = 7, + /// C→S - Used to begin the websocket handshake. + Hello = 8, + /// C←S - Used to complete the websocket handshake with an existing session. + Resumed = 9 + + //NOTE: these do not have official names! + //They are documented here for future expansion purposes + + //ssrc update, occurs when a user connects or changes screenshare settings + //SsrcUpdate = 12, + //user disconnected, occurs when a user disconnects + //UserDisconnected = 13, + //change channel, occurs whenever the client gets moved into another channel + //ChangeChannel = 14 + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs new file mode 100644 index 0000000..80e6840 --- /dev/null +++ b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using Voltaic.Serialization; +using Wumpus.Entities; +using Wumpus.Requests; + +namespace Wumpus.Events +{ + public class VoiceGatewayPayload + { + [ModelProperty("op")] + public VoiceGatewayOperation Operation { get; set; } + + [ModelProperty("d"), + ModelTypeSelector(nameof(Operation), nameof(OpCodeTypeSelector))] + public object Data { get; set; } + + private static Dictionary OpCodeTypeSelector => new Dictionary() + { + [VoiceGatewayOperation.Hello] = typeof(VoiceHelloEvent), + [VoiceGatewayOperation.Ready] = typeof(VoiceReadyEvent), + [VoiceGatewayOperation.HeartbeatAck] = typeof(int), + + [VoiceGatewayOperation.Identify] = typeof(VoiceIdentifyParams), + [VoiceGatewayOperation.SelectProtocol] = typeof(VoiceSelectProtocolParams), + [VoiceGatewayOperation.SessionDescription] = typeof(VoiceSessionDescriptionEvent), + [VoiceGatewayOperation.Resume] = typeof(VoiceResumeParams), + [VoiceGatewayOperation.Heartbeat] = typeof(int), + + [VoiceGatewayOperation.Speaking] = typeof(VoiceSpeakingEvent), + }; + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs new file mode 100644 index 0000000..c734f9e --- /dev/null +++ b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs @@ -0,0 +1,212 @@ +using System; +using System.Buffers; +using System.Buffers.Binary; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading; +using System.Threading.Tasks; +using Voltaic; +using Voltaic.Serialization.Utf8; + +namespace Wumpus +{ + public class WumpusAudioDataClient + { + private const int ConnectionTimeoutMillis = 30000; + + public delegate void AudioCallback(ReadOnlySpan payload); + + // Raw events + public event AudioCallback ReceivedPayload; + public event AudioCallback SentPayload; + + // UDP connection events + public event Action ReceivedLocalEndpoint; + + private readonly SemaphoreSlim _stateLock; + + // Instance + private Task _connectionTask; + private CancellationTokenSource _runCts; + private Socket _socket; + + // Run (Start/Stop) + private uint _ssrc; + private IPEndPoint _sendEndpoint; + private IPEndPoint _receiveEndpoint; + + public WumpusAudioDataClient(ArrayPool pool = null) + { + _socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); + _stateLock = new SemaphoreSlim(1, 1); + _connectionTask = Task.CompletedTask; + _runCts = new CancellationTokenSource(); + _runCts.Cancel(); // Start canceled + } + + public void Run(uint ssrc, IPEndPoint sendAddress) + => RunAsync(ssrc, sendAddress).GetAwaiter().GetResult(); + public async Task RunAsync(uint ssrc, IPEndPoint sendAddress) + { + Task exceptionSignal; + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await StopAsyncInternal().ConfigureAwait(false); + + _ssrc = ssrc; + _sendEndpoint = sendAddress; + _runCts = new CancellationTokenSource(); + + _connectionTask = RunTaskAsync(_runCts.Token); + exceptionSignal = _connectionTask; + } + finally + { + _stateLock.Release(); + } + await exceptionSignal.ConfigureAwait(false); + } + private async Task RunTaskAsync(CancellationToken runCancelToken) + { + using (var connectionCts = new CancellationTokenSource()) + using (var cancelTokenCts = CancellationTokenSource.CreateLinkedTokenSource(runCancelToken, connectionCts.Token)) + { + var cancelToken = cancelTokenCts.Token; + try + { + cancelToken.ThrowIfCancellationRequested(); + + // Perform IP discovery if we do not have our local address cached already + if (_receiveEndpoint == null) + { + await SendDiscoveryAsync().ConfigureAwait(false); + var receiveTask = ReceiveDiscoveryAsync(); + await WhenAny(new Task[] { receiveTask }, ConnectionTimeoutMillis, + "Timed out waiting for IP discovery").ConfigureAwait(false); + + var endpoint = await receiveTask.ConfigureAwait(false); + if (endpoint == null) + throw new Exception("First receive was not a discovery"); + + _receiveEndpoint = endpoint; + ReceivedLocalEndpoint?.Invoke(endpoint); + } + + await RunReceiveAsync(cancelToken).ConfigureAwait(false); + } + finally + { + connectionCts.Cancel(); + } + } + } + private async Task WhenAny(IEnumerable tasks, int millis, string errorText) + { + var timeoutTask = Task.Delay(millis); + var task = await Task.WhenAny(tasks.Append(timeoutTask)); + if (task == timeoutTask) + throw new TimeoutException(errorText); + await task.ConfigureAwait(false); + } + + public void Stop() + => StopAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + + public async Task StopAsync() + { + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await StopAsyncInternal().ConfigureAwait(false); + } + finally + { + _stateLock.Release(); + } + } + private async Task StopAsyncInternal() + { + _runCts?.Cancel(); + + try { await _connectionTask.ConfigureAwait(false); } catch { } + } + + public void Dispose() + { + Stop(); + _socket?.Dispose(); + } + + private Task RunReceiveAsync(CancellationToken cancelToken) + { + return Task.Run(async () => + { + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + await ReceiveAsync().ConfigureAwait(false); + } + }); + } + private async Task ReceiveAsync() + { + var payload = new ResizableMemory(10 * 1024); + var receiveResult = await _socket.ReceiveFromAsync( + payload.RequestSegment(10 * 1024), SocketFlags.None, _sendEndpoint).ConfigureAwait(false); + payload.Advance(receiveResult.ReceivedBytes); + + ReceivedPayload?.Invoke(payload.AsReadOnlySpan()); + } + + public async Task SendAsync(ArraySegment buffer) + { + await _socket.SendToAsync(buffer, SocketFlags.None, _sendEndpoint).ConfigureAwait(false); + + SentPayload?.Invoke(buffer.AsSpan()); + } + + private async Task SendDiscoveryAsync() + { + var payload = new ResizableMemory(70); + BinaryPrimitives.WriteUInt32BigEndian(payload.RequestSpan(70), _ssrc); + payload.Advance(70); + + await _socket.SendToAsync( + payload.AsSegment(), SocketFlags.None, _sendEndpoint).ConfigureAwait(false); + } + private async Task ReceiveDiscoveryAsync() + { + var payload = new ResizableMemory(70); + var receiveResult = await _socket.ReceiveFromAsync( + payload.RequestSegment(70), SocketFlags.None, _sendEndpoint).ConfigureAwait(false); + payload.Advance(receiveResult.ReceivedBytes); + + return ParseDiscovery(payload.AsReadOnlySpan()); + + IPEndPoint ParseDiscovery(ReadOnlySpan discovery) + { + // discovery is always of the form: + // <4 bytes>, , , + if (discovery.Length != 70) + return null; + + discovery = discovery.Slice(4); // skip ssrc, it's always 0 + + // HACK: IPAddress does not have a TryParse which accepts Span + // so to save on allocations until it does, we use a custom parser + if (!IPUtilities.TryParseIPv4Address(ref discovery, out var address)) + return null; + + discovery = discovery.Slice(discovery.Length - 2); + if (!BinaryPrimitives.TryReadUInt16BigEndian(discovery, out var port)) + return null; + + return new IPEndPoint(address, port); + } + } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs b/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs new file mode 100644 index 0000000..c51606b --- /dev/null +++ b/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs @@ -0,0 +1,452 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO.Compression; +using System.Linq; +using System.Net.WebSockets; +using System.Reflection; +using System.Threading; +using System.Threading.Tasks; +using Voltaic; +using Voltaic.Serialization; +using Wumpus; +using Wumpus.Events; +using Wumpus.Requests; +using Wumpus.Serialization; + +namespace Wumpus +{ + public enum ConnectionState + { + Disconnected, + Connecting, + Connected, + Disconnecting + } + + public class WumpusAudioGatewayClient : IDisposable + { + public const int ApiVersion = 4; + public static string Version { get; } = + typeof(WumpusAudioGatewayClient).GetTypeInfo().Assembly.GetCustomAttribute()?.InformationalVersion ?? + typeof(WumpusAudioGatewayClient).GetTypeInfo().Assembly.GetName().Version.ToString(3) ?? + "Unknown"; + + private const int InitialBackoffMillis = 1000; // 1 second + private const int MaxBackoffMillis = 60000; // 1 min + private const double BackoffMultiplier = 1.75; // 1.75x + private const double BackoffJitter = 0.25; // 1.5 to 2.0x + private const int ConnectionTimeoutMillis = 30000; // 30 sec + private const int IdentifyTimeoutMillis = 60000; // 1 min + // Typical backoff: 1.75s, 3.06s, 5.36s, 9.38s, 16.41s, 28.72s, 50.27s, 60s, 60s... + + + // Status events + public event Action Connected; + public event Action Disconnected; + public event Action DeserializationError; + + // Raw events + public event Action ReceivedPayload; + public event Action SentPayload; + + // Voice gateway events + public event Action VoiceGatewayHello; + public event Action VoiceGatewayReady; + public event Action VoiceGatewayResumed; + public event Action VoiceSessionDescription; + public event Action VoiceSpeaking; + public event Action VoiceGatewayHeartbeatAck; + + private readonly SemaphoreSlim _stateLock; + + // Instance + private Task _connectionTask; + private CancellationTokenSource _runCts; + + // Run (Start/Stop) + private int _lastSeq; + private string _endpoint; + private Utf8String _session; + private Utf8String _token; + + // Connection (For each WebSocket connection) + private BlockingCollection _sendQueue; + private bool _receivedData; + + public ConnectionState State { get; private set; } + public WumpusJsonSerializer JsonSerializer { get; } + + public Snowflake UserId { get; } + public Snowflake GuildId { get; } + + public WumpusAudioGatewayClient(Snowflake userId, Snowflake guildId, WumpusJsonSerializer serializer = null) + { + UserId = userId; + GuildId = guildId; + JsonSerializer = serializer ?? new WumpusJsonSerializer(); + _stateLock = new SemaphoreSlim(1, 1); + _connectionTask = Task.CompletedTask; + _runCts = new CancellationTokenSource(); + _runCts.Cancel(); // Start canceled + } + + // TODO: Utf8String, string or custom type? + public void Run(string endpoint, string session, string token) + => RunAsync(endpoint, session, token).GetAwaiter().GetResult(); + public async Task RunAsync(string endpoint, string session, string token) + { + string SlicePort() + { + if (endpoint.EndsWith(":80")) + return endpoint.Substring(0, endpoint.Length - 3); + return endpoint; + } + + Task exceptionSignal; + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await StopAsyncInternal().ConfigureAwait(false); + + _endpoint = SlicePort(); + _runCts = new CancellationTokenSource(); + _session = new Utf8String(session); + _token = new Utf8String(token); + _lastSeq = 0; + + _connectionTask = RunTaskAsync(_runCts.Token); + exceptionSignal = _connectionTask; + } + finally + { + _stateLock.Release(); + } + await exceptionSignal.ConfigureAwait(false); + } + private async Task RunTaskAsync(CancellationToken runCancelToken) + { + Task[] tasks = null; + bool isRecoverable = true; + int backoffMillis = InitialBackoffMillis; + int connectionAttempts = 0; + var jitter = new Random(); + + while (isRecoverable) + { + using (var connectionCts = new CancellationTokenSource()) + using (var cancelTokenCts = CancellationTokenSource.CreateLinkedTokenSource(runCancelToken, connectionCts.Token)) + using (var client = new ClientWebSocket()) + { + Exception disconnectEx = null; + var cancelToken = cancelTokenCts.Token; + try + { + cancelToken.ThrowIfCancellationRequested(); + var readySignal = new TaskCompletionSource(); + _receivedData = true; + + // Connect + State = ConnectionState.Connecting; + var uri = new Uri("wss://" + _endpoint + $"/?v={ApiVersion}"); + await client.ConnectAsync(uri, cancelToken).ConfigureAwait(false); + + // Receive HELLO (timeout = ConnectionTimeoutMillis) + var receiveTask = ReceiveAsync(client, readySignal, cancelToken); + await WhenAny(new Task[] { receiveTask }, ConnectionTimeoutMillis, + "Timed out waiting for HELLO").ConfigureAwait(false); + + var evnt = await receiveTask.ConfigureAwait(false); + if (!(evnt.Data is VoiceHelloEvent helloEvent)) + throw new Exception("First event was not a HELLO event"); + int heartbeatRate = (int)helloEvent.HeartbeatInterval; + + // Start tasks here since HELLO must be handled before another thread can send/receive + _sendQueue = new BlockingCollection(); + tasks = new[] + { + RunSendAsync(client, cancelToken), + RunHeartbeatAsync(heartbeatRate, cancelToken), + RunReceiveAsync(client, readySignal, cancelToken) + }; + + SendIdentify(connectionAttempts == 0); + + await WhenAny(tasks.Append(readySignal.Task), IdentifyTimeoutMillis, + "Timed out waiting for READY or InvalidSession").ConfigureAwait(false); + if (await readySignal.Task.ConfigureAwait(false) == false) + continue; // Invalid session + + // Success + backoffMillis = InitialBackoffMillis; + State = ConnectionState.Connected; + Connected?.Invoke(); + + // Wait until an exception occurs (due to cancellation or failure) + await WhenAny(tasks).ConfigureAwait(false); + } + catch (Exception ex) + { + disconnectEx = ex; + isRecoverable = IsRecoverable(ex); + if (!isRecoverable) + throw; + } + finally + { + var oldState = State; + State = ConnectionState.Disconnecting; + + connectionCts.Cancel(); + if (tasks != null) + { + try { await Task.WhenAll(tasks).ConfigureAwait(false); } + catch { } // We already captured the root exception + } + + if (client.State == WebSocketState.Open) + { + try { await client.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None).ConfigureAwait(false); } + catch { } + } + + _sendQueue = null; + State = ConnectionState.Disconnected; + if (oldState == ConnectionState.Connected) + Disconnected?.Invoke(disconnectEx); + } + if (isRecoverable) + { + backoffMillis = (int)(backoffMillis * (BackoffMultiplier + (jitter.NextDouble() * BackoffJitter * 2.0 - BackoffJitter))); + if (backoffMillis > MaxBackoffMillis) + backoffMillis = MaxBackoffMillis; + connectionAttempts++; + await Task.Delay(backoffMillis).ConfigureAwait(false); + } + } + } + _runCts.Cancel(); + } + private Task RunReceiveAsync(ClientWebSocket client, TaskCompletionSource readySignal, CancellationToken cancelToken) + { + return Task.Run(async () => + { + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + try + { + await ReceiveAsync(client, readySignal, cancelToken).ConfigureAwait(false); + } + catch (SerializationException ex) + { + DeserializationError?.Invoke(ex); + } + } + }); + } + private Task RunSendAsync(ClientWebSocket client, CancellationToken cancelToken) + { + return Task.Run(async () => + { + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + var payload = _sendQueue.Take(cancelToken); + await SendAsync(client, cancelToken, payload).ConfigureAwait(false); + } + }); + } + private Task RunHeartbeatAsync(int rate, CancellationToken cancelToken) + { + return Task.Run(async () => + { + // extra delay at the beginning because we can only heartbeat after identifying + await Task.Delay(rate, cancelToken).ConfigureAwait(false); + while (true) + { + cancelToken.ThrowIfCancellationRequested(); + if (!_receivedData) + throw new TimeoutException("No data was received since the last heartbeat"); + _receivedData = false; + SendHeartbeat(); + await Task.Delay(rate, cancelToken).ConfigureAwait(false); + } + }); + } + + private async Task WhenAny(IEnumerable tasks) + { + var task = await Task.WhenAny(tasks).ConfigureAwait(false); + await task.ConfigureAwait(false); + } + private async Task WhenAny(IEnumerable tasks, int millis, string errorText) + { + var timeoutTask = Task.Delay(millis); + var task = await Task.WhenAny(tasks.Append(timeoutTask)).ConfigureAwait(false); + if (task == timeoutTask) + throw new TimeoutException(errorText); + await task.ConfigureAwait(false); + } + + private bool IsRecoverable(Exception ex) + { + switch (ex) + { + case WebSocketException wsEx: + if (wsEx.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely) + return true; + break; + case WebSocketClosedException wscEx: + if (wscEx.CloseStatus.HasValue) + { + switch (wscEx.CloseStatus.Value) + { + case WebSocketCloseStatus.Empty: + case WebSocketCloseStatus.NormalClosure: + case WebSocketCloseStatus.InternalServerError: + case WebSocketCloseStatus.ProtocolError: + return true; + } + } + else + { + switch (wscEx.Code) + { + case 4009: + case 4014: + case 4015: + return true; + } + } + break; + case TimeoutException _: + return true; + } + if (ex.InnerException != null) + return IsRecoverable(ex.InnerException); + return false; + } + + public void Stop() + => StopAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + public async Task StopAsync() + { + await _stateLock.WaitAsync().ConfigureAwait(false); + try + { + await StopAsyncInternal().ConfigureAwait(false); + } + finally + { + _stateLock.Release(); + } + } + private async Task StopAsyncInternal() + { + _runCts?.Cancel(); + + try { await _connectionTask.ConfigureAwait(false); } catch { } + _connectionTask = Task.CompletedTask; + + var state = State; + if (state != ConnectionState.Disconnected) + throw new InvalidOperationException($"Client did not successfully disconnect (State = {state})"); + } + + public void Dispose() + { + Stop(); + } + + private async Task ReceiveAsync(ClientWebSocket client, TaskCompletionSource readySignal, CancellationToken cancelToken) + { + ResizableMemory wireData = new ResizableMemory(10 * 1024); + WebSocketReceiveResult result; + do + { + var buffer = wireData.RequestSegment(10 * 1024); + result = await client.ReceiveAsync(buffer, cancelToken).ConfigureAwait(false); + wireData.Advance(result.Count); + _receivedData = true; + + if (result.CloseStatus != null) + throw new WebSocketClosedException(result.CloseStatus.Value, result.CloseStatusDescription); + } + while (!result.EndOfMessage); + + var payload = JsonSerializer.Read(wireData.AsReadOnlySpan()); + + HandleEvent(payload, readySignal); + ReceivedPayload?.Invoke(payload, wireData.Length); + return payload; + } + private void HandleEvent(VoiceGatewayPayload evnt, TaskCompletionSource readySignal) + { + switch (evnt.Operation) + { + case VoiceGatewayOperation.Ready: + var readyEvent = evnt.Data as VoiceReadyEvent; + readySignal.TrySetResult(true); + VoiceGatewayReady?.Invoke(readyEvent); + break; + case VoiceGatewayOperation.Resumed: + readySignal.TrySetResult(true); + VoiceGatewayResumed?.Invoke(); + break; + case VoiceGatewayOperation.SessionDescription: VoiceSessionDescription?.Invoke(evnt.Data as VoiceSessionDescriptionEvent); break; + case VoiceGatewayOperation.Speaking: VoiceSpeaking?.Invoke(evnt.Data as VoiceSpeakingEvent); break; + case VoiceGatewayOperation.HeartbeatAck: VoiceGatewayHeartbeatAck?.Invoke(); break; + case VoiceGatewayOperation.Hello: VoiceGatewayHello?.Invoke(evnt.Data as VoiceHelloEvent); break; + } + } + + public void Send(VoiceGatewayPayload payload) + { + if (!_runCts.IsCancellationRequested) + _sendQueue?.Add(payload); + } + private async Task SendAsync(ClientWebSocket client, CancellationToken cancelToken, VoiceGatewayPayload payload) + { + var writer = JsonSerializer.Write(payload); + await client.SendAsync(writer.AsSegment(), WebSocketMessageType.Text, true, cancelToken); + SentPayload?.Invoke(payload, writer.Length); + } + + private void SendIdentify(bool identify) + { + if (identify) // IDENTIFY + { + Send(new VoiceGatewayPayload + { + Operation = VoiceGatewayOperation.Identify, + Data = new VoiceIdentifyParams + { + UserId = UserId, + GuildId = GuildId, + SessionId = _session, + Token = _token + } + }); + } + else // RESUME + { + Send(new VoiceGatewayPayload + { + Operation = VoiceGatewayOperation.Resume, + Data = new VoiceResumeParams + { + GuildId = GuildId, + SessionId = _session, + Token = _token + } + }); + } + } + private void SendHeartbeat() => Send(new VoiceGatewayPayload + { + Operation = VoiceGatewayOperation.Heartbeat, + Data = _lastSeq + }); + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Core/AssemblyInfo.cs b/src/Wumpus.Net.Core/AssemblyInfo.cs index ecbc283..b29eb30 100644 --- a/src/Wumpus.Net.Core/AssemblyInfo.cs +++ b/src/Wumpus.Net.Core/AssemblyInfo.cs @@ -1,5 +1,6 @@ using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("Wumpus.Net.Gateway")] +[assembly: InternalsVisibleTo("Wumpus.Net.Audio")] [assembly: InternalsVisibleTo("Wumpus.Net.Rest")] [assembly: InternalsVisibleTo("Wumpus.Net.Rpc")] \ No newline at end of file From 3000445b907cba94dfeae3ad425b87d73edee480 Mon Sep 17 00:00:00 2001 From: FiniteReality Date: Wed, 21 Nov 2018 23:27:41 +0000 Subject: [PATCH 3/7] Attempt to implement voice send --- .../VoiceSpeakingParams.cs} | 12 +- .../Sodium/SodiumPrimitives.cs | 62 +++++ src/Wumpus.Net.Audio/VoiceGatewayPayload.cs | 2 +- src/Wumpus.Net.Audio/WumpusAudioDataClient.cs | 214 ++++++------------ .../WumpusAudioGatewayClient.cs | 4 +- 5 files changed, 135 insertions(+), 159 deletions(-) rename src/Wumpus.Net.Audio/{Events/VoiceSpeakingEvent.cs => Requests/VoiceSpeakingParams.cs} (53%) create mode 100644 src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs diff --git a/src/Wumpus.Net.Audio/Events/VoiceSpeakingEvent.cs b/src/Wumpus.Net.Audio/Requests/VoiceSpeakingParams.cs similarity index 53% rename from src/Wumpus.Net.Audio/Events/VoiceSpeakingEvent.cs rename to src/Wumpus.Net.Audio/Requests/VoiceSpeakingParams.cs index 18d16bb..0b6f96f 100644 --- a/src/Wumpus.Net.Audio/Events/VoiceSpeakingEvent.cs +++ b/src/Wumpus.Net.Audio/Requests/VoiceSpeakingParams.cs @@ -1,14 +1,14 @@ using Voltaic.Serialization; -namespace Wumpus.Events +namespace Wumpus.Requests { - public class VoiceSpeakingEvent + public class VoiceSpeakingParams { - [ModelProperty("delay")] - public int DelayMilliseconds { get; set; } - [ModelProperty("speaking")] - public SpeakingState Speaking { get; set; } + public int Speaking { get; set; } + + [ModelProperty("delay")] + public uint DelayMilliseconds { get; set; } [ModelProperty("ssrc")] public uint Ssrc { get; set; } diff --git a/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs new file mode 100644 index 0000000..c3491d9 --- /dev/null +++ b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs @@ -0,0 +1,62 @@ +using System; +using System.Runtime.InteropServices; + +namespace Wumpus +{ + internal class SodiumPrimitives + { + // sodium.dll on windows, libsodium.so on linux + [DllImport("sodium")] + extern static int crypto_secretbox_macbytes(); + [DllImport("sodium")] + extern static int crypto_secretbox_keybytes(); + [DllImport("sodium")] + extern static int crypto_secretbox_noncebytes(); + + [DllImport("sodium")] + extern static unsafe int crypto_secretbox_easy(byte* c, byte* m, int mlen, byte* n, byte* k); + [DllImport("sodium")] + extern static unsafe int crypto_secretbox_open_easy(byte* m, byte* c, int clen, byte* n, byte* k); + + [DllImport("sodium")] + extern static unsafe void randombytes_buf(byte* buf, int size); + + // use the functions to grab the info so it's not hardcoded in the lib + private static readonly int MACBYTES = crypto_secretbox_macbytes(); + private static readonly int KEYBYTES = crypto_secretbox_keybytes(); + private static readonly int NONCEBYTES = crypto_secretbox_noncebytes(); + + public static int NonceSize => NONCEBYTES; + + public static int ComputeMessageLength(int messageLength) + => messageLength + MACBYTES; + + public static unsafe bool TryEncryptInPlace(Span ciphertext, ReadOnlySpan message, ReadOnlySpan secret, ReadOnlySpan nonce) + { + if (ciphertext.Length != message.Length + MACBYTES) + return false; + if (secret.Length != KEYBYTES) + return false; + if (nonce.Length != NONCEBYTES) + return false; + + int status = 0; + + fixed(byte* c = &ciphertext.GetPinnableReference()) + fixed(byte* m = &message.GetPinnableReference()) + fixed(byte* n = &nonce.GetPinnableReference()) + fixed(byte* k = &secret.GetPinnableReference()) + status = crypto_secretbox_easy(c, + m, message.Length, + n, k); + + return status == 0; + } + + public static unsafe void GenerateRandomBytes(Span buffer) + { + fixed(byte* buf = &buffer.GetPinnableReference()) + randombytes_buf(buf, buffer.Length); + } + } +} \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs index 80e6840..8076db9 100644 --- a/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs +++ b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs @@ -27,7 +27,7 @@ public class VoiceGatewayPayload [VoiceGatewayOperation.Resume] = typeof(VoiceResumeParams), [VoiceGatewayOperation.Heartbeat] = typeof(int), - [VoiceGatewayOperation.Speaking] = typeof(VoiceSpeakingEvent), + [VoiceGatewayOperation.Speaking] = typeof(VoiceSpeakingParams), }; } } \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs index c734f9e..9fb98d2 100644 --- a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs +++ b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs @@ -13,200 +13,114 @@ namespace Wumpus { - public class WumpusAudioDataClient + public class WumpusAudioDataClient : IDisposable { - private const int ConnectionTimeoutMillis = 30000; + private readonly IPEndPoint _endpoint; + private readonly ArrayPool _pool; + private readonly Socket _socket; - public delegate void AudioCallback(ReadOnlySpan payload); - - // Raw events - public event AudioCallback ReceivedPayload; - public event AudioCallback SentPayload; - - // UDP connection events - public event Action ReceivedLocalEndpoint; - - private readonly SemaphoreSlim _stateLock; - - // Instance - private Task _connectionTask; - private CancellationTokenSource _runCts; - private Socket _socket; - - // Run (Start/Stop) - private uint _ssrc; - private IPEndPoint _sendEndpoint; - private IPEndPoint _receiveEndpoint; - - public WumpusAudioDataClient(ArrayPool pool = null) + public WumpusAudioDataClient(IPEndPoint endpoint, ArrayPool pool = null) { + _endpoint = endpoint; + _pool = pool; _socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp); - _stateLock = new SemaphoreSlim(1, 1); - _connectionTask = Task.CompletedTask; - _runCts = new CancellationTokenSource(); - _runCts.Cancel(); // Start canceled + + // Bind to any available port + _socket.Bind(new IPEndPoint(IPAddress.Any, 0)); } - public void Run(uint ssrc, IPEndPoint sendAddress) - => RunAsync(ssrc, sendAddress).GetAwaiter().GetResult(); - public async Task RunAsync(uint ssrc, IPEndPoint sendAddress) + public async Task SendAsync(uint ssrc, ushort sequence, uint samplePosition, ArraySegment audio, Memory secret, IPEndPoint endpoint = null) { - Task exceptionSignal; - await _stateLock.WaitAsync().ConfigureAwait(false); - try - { - await StopAsyncInternal().ConfigureAwait(false); + // TODO: this is broken somewhere. I don't know where. - _ssrc = ssrc; - _sendEndpoint = sendAddress; - _runCts = new CancellationTokenSource(); + endpoint = endpoint ?? _endpoint; - _connectionTask = RunTaskAsync(_runCts.Token); - exceptionSignal = _connectionTask; - } - finally - { - _stateLock.Release(); - } - await exceptionSignal.ConfigureAwait(false); - } - private async Task RunTaskAsync(CancellationToken runCancelToken) - { - using (var connectionCts = new CancellationTokenSource()) - using (var cancelTokenCts = CancellationTokenSource.CreateLinkedTokenSource(runCancelToken, connectionCts.Token)) - { - var cancelToken = cancelTokenCts.Token; - try - { - cancelToken.ThrowIfCancellationRequested(); - - // Perform IP discovery if we do not have our local address cached already - if (_receiveEndpoint == null) - { - await SendDiscoveryAsync().ConfigureAwait(false); - var receiveTask = ReceiveDiscoveryAsync(); - await WhenAny(new Task[] { receiveTask }, ConnectionTimeoutMillis, - "Timed out waiting for IP discovery").ConfigureAwait(false); - - var endpoint = await receiveTask.ConfigureAwait(false); - if (endpoint == null) - throw new Exception("First receive was not a discovery"); - - _receiveEndpoint = endpoint; - ReceivedLocalEndpoint?.Invoke(endpoint); - } - - await RunReceiveAsync(cancelToken).ConfigureAwait(false); - } - finally - { - connectionCts.Cancel(); - } - } - } - private async Task WhenAny(IEnumerable tasks, int millis, string errorText) - { - var timeoutTask = Task.Delay(millis); - var task = await Task.WhenAny(tasks.Append(timeoutTask)); - if (task == timeoutTask) - throw new TimeoutException(errorText); - await task.ConfigureAwait(false); - } - - public void Stop() - => StopAsync().ConfigureAwait(false).GetAwaiter().GetResult(); + var memory = new ResizableMemory(10 * 1024, _pool); + WriteHeader(); + Encrypt(audio.AsSpan(), secret.Span); - public async Task StopAsync() - { - await _stateLock.WaitAsync().ConfigureAwait(false); try { - await StopAsyncInternal().ConfigureAwait(false); + await _socket.SendToAsync(memory.AsSegment(), SocketFlags.None, endpoint).ConfigureAwait(false); } finally { - _stateLock.Release(); + memory.Return(); } - } - private async Task StopAsyncInternal() - { - _runCts?.Cancel(); - try { await _connectionTask.ConfigureAwait(false); } catch { } - } + void WriteHeader() + { + memory.Push(0x80); memory.Push(0x78); - public void Dispose() - { - Stop(); - _socket?.Dispose(); - } + var header = memory.RequestSpan(10); + BinaryPrimitives.WriteUInt16BigEndian(header, sequence); // 2 bytes + BinaryPrimitives.WriteUInt32BigEndian(header.Slice(2), samplePosition); // 4 bytes + BinaryPrimitives.WriteUInt32BigEndian(header.Slice(6), ssrc); // 4 bytes + memory.Advance(10); + } - private Task RunReceiveAsync(CancellationToken cancelToken) - { - return Task.Run(async () => + void Encrypt(Span data, Span key) { - while (true) - { - cancelToken.ThrowIfCancellationRequested(); - await ReceiveAsync().ConfigureAwait(false); - } - }); - } - private async Task ReceiveAsync() - { - var payload = new ResizableMemory(10 * 1024); - var receiveResult = await _socket.ReceiveFromAsync( - payload.RequestSegment(10 * 1024), SocketFlags.None, _sendEndpoint).ConfigureAwait(false); - payload.Advance(receiveResult.ReceivedBytes); + var destinationSize = SodiumPrimitives.ComputeMessageLength(data.Length); + var destinationSpan = memory.RequestSpan(destinationSize); - ReceivedPayload?.Invoke(payload.AsReadOnlySpan()); - } + Span nonce = stackalloc byte[SodiumPrimitives.NonceSize]; + //SodiumPrimitives.GenerateRandomBytes(nonce.Slice(0, 4)); - public async Task SendAsync(ArraySegment buffer) - { - await _socket.SendToAsync(buffer, SocketFlags.None, _sendEndpoint).ConfigureAwait(false); + if (SodiumPrimitives.TryEncryptInPlace(destinationSpan, data, key, nonce)) + memory.Advance(destinationSize); - SentPayload?.Invoke(buffer.AsSpan()); + nonce.Slice(0, 4).CopyTo(memory.RequestSpan(4)); + memory.Advance(4); + } } - private async Task SendDiscoveryAsync() + public async Task DiscoverAsync(uint ssrc, IPEndPoint endpoint = null) { - var payload = new ResizableMemory(70); - BinaryPrimitives.WriteUInt32BigEndian(payload.RequestSpan(70), _ssrc); - payload.Advance(70); + endpoint = endpoint ?? _endpoint; - await _socket.SendToAsync( - payload.AsSegment(), SocketFlags.None, _sendEndpoint).ConfigureAwait(false); - } - private async Task ReceiveDiscoveryAsync() - { - var payload = new ResizableMemory(70); - var receiveResult = await _socket.ReceiveFromAsync( - payload.RequestSegment(70), SocketFlags.None, _sendEndpoint).ConfigureAwait(false); - payload.Advance(receiveResult.ReceivedBytes); + var memory = new ResizableMemory(70, _pool); + BinaryPrimitives.WriteUInt32BigEndian(memory.RequestSpan(70), ssrc); + memory.Advance(70); + + try + { + await _socket.SendToAsync(memory.AsSegment(), SocketFlags.None, endpoint).ConfigureAwait(false); - return ParseDiscovery(payload.AsReadOnlySpan()); + var received = await _socket.ReceiveFromAsync(memory.AsSegment(), SocketFlags.None, endpoint).ConfigureAwait(false); + + if (received.ReceivedBytes != 70) + throw new Exception("Discovery response was not 70 bytes"); + + return ParseDiscovery(memory.AsReadOnlySpan()); + } + finally + { + memory.Return(); + } IPEndPoint ParseDiscovery(ReadOnlySpan discovery) { - // discovery is always of the form: - // <4 bytes>, , , if (discovery.Length != 70) return null; discovery = discovery.Slice(4); // skip ssrc, it's always 0 - // HACK: IPAddress does not have a TryParse which accepts Span - // so to save on allocations until it does, we use a custom parser if (!IPUtilities.TryParseIPv4Address(ref discovery, out var address)) return null; + // trim zeros and parse port discovery = discovery.Slice(discovery.Length - 2); - if (!BinaryPrimitives.TryReadUInt16BigEndian(discovery, out var port)) + if (!BinaryPrimitives.TryReadUInt16LittleEndian(discovery, out var port)) return null; return new IPEndPoint(address, port); } } + + public void Dispose() + { + _socket?.Dispose(); + } } } \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs b/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs index c51606b..bfc4189 100644 --- a/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs +++ b/src/Wumpus.Net.Audio/WumpusAudioGatewayClient.cs @@ -55,7 +55,7 @@ public class WumpusAudioGatewayClient : IDisposable public event Action VoiceGatewayReady; public event Action VoiceGatewayResumed; public event Action VoiceSessionDescription; - public event Action VoiceSpeaking; + public event Action VoiceSpeaking; public event Action VoiceGatewayHeartbeatAck; private readonly SemaphoreSlim _stateLock; @@ -395,7 +395,7 @@ private void HandleEvent(VoiceGatewayPayload evnt, TaskCompletionSource re VoiceGatewayResumed?.Invoke(); break; case VoiceGatewayOperation.SessionDescription: VoiceSessionDescription?.Invoke(evnt.Data as VoiceSessionDescriptionEvent); break; - case VoiceGatewayOperation.Speaking: VoiceSpeaking?.Invoke(evnt.Data as VoiceSpeakingEvent); break; + case VoiceGatewayOperation.Speaking: VoiceSpeaking?.Invoke(evnt.Data as VoiceSpeakingParams); break; case VoiceGatewayOperation.HeartbeatAck: VoiceGatewayHeartbeatAck?.Invoke(); break; case VoiceGatewayOperation.Hello: VoiceGatewayHello?.Invoke(evnt.Data as VoiceHelloEvent); break; } From 66ae68d36d3c27081e202ffbb0e862febacbd12e Mon Sep 17 00:00:00 2001 From: FiniteReality Date: Sun, 25 Nov 2018 18:21:22 +0000 Subject: [PATCH 4/7] Small clean-up --- .../Sodium/SodiumPrimitives.cs | 16 ++++--------- src/Wumpus.Net.Audio/WumpusAudioDataClient.cs | 24 ++++++++++--------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs index c3491d9..bcde63e 100644 --- a/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs +++ b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs @@ -31,26 +31,20 @@ internal class SodiumPrimitives public static int ComputeMessageLength(int messageLength) => messageLength + MACBYTES; - public static unsafe bool TryEncryptInPlace(Span ciphertext, ReadOnlySpan message, ReadOnlySpan secret, ReadOnlySpan nonce) + public static unsafe bool TryEncryptInPlace(Span ciphertext, ReadOnlySpan message, ReadOnlySpan nonce, ReadOnlySpan secret) { - if (ciphertext.Length != message.Length + MACBYTES) + if (ciphertext.Length < message.Length + MACBYTES) return false; - if (secret.Length != KEYBYTES) + if (secret.Length < KEYBYTES) return false; - if (nonce.Length != NONCEBYTES) + if (nonce.Length < NONCEBYTES) return false; - int status = 0; - fixed(byte* c = &ciphertext.GetPinnableReference()) fixed(byte* m = &message.GetPinnableReference()) fixed(byte* n = &nonce.GetPinnableReference()) fixed(byte* k = &secret.GetPinnableReference()) - status = crypto_secretbox_easy(c, - m, message.Length, - n, k); - - return status == 0; + return crypto_secretbox_easy(c, m, message.Length, n, k) == 0; } public static unsafe void GenerateRandomBytes(Span buffer) diff --git a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs index 9fb98d2..e830777 100644 --- a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs +++ b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs @@ -31,7 +31,7 @@ public WumpusAudioDataClient(IPEndPoint endpoint, ArrayPool pool = null) public async Task SendAsync(uint ssrc, ushort sequence, uint samplePosition, ArraySegment audio, Memory secret, IPEndPoint endpoint = null) { - // TODO: this is broken somewhere. I don't know where. + // TODO: this only supports xsalsa20_poly1305_lite - should we support more? endpoint = endpoint ?? _endpoint; @@ -50,13 +50,14 @@ public async Task SendAsync(uint ssrc, ushort sequence, uint samplePosition, Arr void WriteHeader() { - memory.Push(0x80); memory.Push(0x78); + var header = memory.RequestSpan(12); - var header = memory.RequestSpan(10); - BinaryPrimitives.WriteUInt16BigEndian(header, sequence); // 2 bytes - BinaryPrimitives.WriteUInt32BigEndian(header.Slice(2), samplePosition); // 4 bytes - BinaryPrimitives.WriteUInt32BigEndian(header.Slice(6), ssrc); // 4 bytes - memory.Advance(10); + header[0] = 0x80; header[1] = 0x78; + BinaryPrimitives.WriteUInt16BigEndian(header.Slice(2), sequence); // 2 bytes + BinaryPrimitives.WriteUInt32BigEndian(header.Slice(4), samplePosition); // 4 bytes + BinaryPrimitives.WriteUInt32BigEndian(header.Slice(8), ssrc); // 4 bytes + + memory.Advance(12); } void Encrypt(Span data, Span key) @@ -65,10 +66,10 @@ void Encrypt(Span data, Span key) var destinationSpan = memory.RequestSpan(destinationSize); Span nonce = stackalloc byte[SodiumPrimitives.NonceSize]; - //SodiumPrimitives.GenerateRandomBytes(nonce.Slice(0, 4)); + SodiumPrimitives.GenerateRandomBytes(nonce.Slice(0, 4)); - if (SodiumPrimitives.TryEncryptInPlace(destinationSpan, data, key, nonce)) - memory.Advance(destinationSize); + SodiumPrimitives.TryEncryptInPlace(destinationSpan, data, nonce, key); + memory.Advance(destinationSize); nonce.Slice(0, 4).CopyTo(memory.RequestSpan(4)); memory.Advance(4); @@ -111,7 +112,8 @@ IPEndPoint ParseDiscovery(ReadOnlySpan discovery) // trim zeros and parse port discovery = discovery.Slice(discovery.Length - 2); - if (!BinaryPrimitives.TryReadUInt16LittleEndian(discovery, out var port)) + // NOTE: not little endian, like the docs say! + if (!BinaryPrimitives.TryReadUInt16BigEndian(discovery, out var port)) return null; return new IPEndPoint(address, port); From 280f2a289a11cc30b57eb3cbdc2315df45f527a3 Mon Sep 17 00:00:00 2001 From: FiniteReality Date: Sat, 19 Jan 2019 14:16:23 +0000 Subject: [PATCH 5/7] Fix formatting errors and remove unsafe code --- .../Requests/VoiceIdentifyParams.cs | 3 +++ .../Requests/VoiceResumeParams.cs | 2 ++ .../Sodium/SodiumPrimitives.cs | 23 +++++++++---------- src/Wumpus.Net.Audio/VoiceGatewayPayload.cs | 4 ++-- 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs index 40c9f34..1d0fbe8 100644 --- a/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs +++ b/src/Wumpus.Net.Audio/Requests/VoiceIdentifyParams.cs @@ -7,10 +7,13 @@ public class VoiceIdentifyParams { [ModelProperty("user_id")] public Snowflake UserId { get; set; } + [ModelProperty("server_id")] public Snowflake GuildId { get; set; } + [ModelProperty("session_id")] public Utf8String SessionId { get; set; } + [ModelProperty("token")] public Utf8String Token { get; set; } } diff --git a/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs b/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs index 5c90fc0..4dc0760 100644 --- a/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs +++ b/src/Wumpus.Net.Audio/Requests/VoiceResumeParams.cs @@ -7,8 +7,10 @@ public class VoiceResumeParams { [ModelProperty("server_id")] public Snowflake GuildId { get; set; } + [ModelProperty("session_id")] public Utf8String SessionId { get; set; } + [ModelProperty("token")] public Utf8String Token { get; set; } } diff --git a/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs index bcde63e..28d4ad9 100644 --- a/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs +++ b/src/Wumpus.Net.Audio/Sodium/SodiumPrimitives.cs @@ -14,12 +14,12 @@ internal class SodiumPrimitives extern static int crypto_secretbox_noncebytes(); [DllImport("sodium")] - extern static unsafe int crypto_secretbox_easy(byte* c, byte* m, int mlen, byte* n, byte* k); + extern static int crypto_secretbox_easy(ref byte c, in byte m, int mlen, in byte n, in byte k); [DllImport("sodium")] - extern static unsafe int crypto_secretbox_open_easy(byte* m, byte* c, int clen, byte* n, byte* k); + extern static int crypto_secretbox_open_easy(ref byte m, in byte c, int clen, in byte n, in byte k); [DllImport("sodium")] - extern static unsafe void randombytes_buf(byte* buf, int size); + extern static void randombytes_buf(ref byte buf, int size); // use the functions to grab the info so it's not hardcoded in the lib private static readonly int MACBYTES = crypto_secretbox_macbytes(); @@ -31,7 +31,7 @@ internal class SodiumPrimitives public static int ComputeMessageLength(int messageLength) => messageLength + MACBYTES; - public static unsafe bool TryEncryptInPlace(Span ciphertext, ReadOnlySpan message, ReadOnlySpan nonce, ReadOnlySpan secret) + public static bool TryEncryptInPlace(Span ciphertext, ReadOnlySpan message, ReadOnlySpan nonce, ReadOnlySpan secret) { if (ciphertext.Length < message.Length + MACBYTES) return false; @@ -40,17 +40,16 @@ public static unsafe bool TryEncryptInPlace(Span ciphertext, ReadOnlySpan< if (nonce.Length < NONCEBYTES) return false; - fixed(byte* c = &ciphertext.GetPinnableReference()) - fixed(byte* m = &message.GetPinnableReference()) - fixed(byte* n = &nonce.GetPinnableReference()) - fixed(byte* k = &secret.GetPinnableReference()) - return crypto_secretbox_easy(c, m, message.Length, n, k) == 0; + return crypto_secretbox_easy( + ref ciphertext.GetPinnableReference(), + message.GetPinnableReference(), message.Length, + nonce.GetPinnableReference(), + secret.GetPinnableReference()) == 0; } - public static unsafe void GenerateRandomBytes(Span buffer) + public static void GenerateRandomBytes(Span buffer) { - fixed(byte* buf = &buffer.GetPinnableReference()) - randombytes_buf(buf, buffer.Length); + randombytes_buf(ref buffer.GetPinnableReference(), buffer.Length); } } } \ No newline at end of file diff --git a/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs index 8076db9..0645042 100644 --- a/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs +++ b/src/Wumpus.Net.Audio/VoiceGatewayPayload.cs @@ -11,8 +11,8 @@ public class VoiceGatewayPayload [ModelProperty("op")] public VoiceGatewayOperation Operation { get; set; } - [ModelProperty("d"), - ModelTypeSelector(nameof(Operation), nameof(OpCodeTypeSelector))] + [ModelProperty("d")] + [ModelTypeSelector(nameof(Operation), nameof(OpCodeTypeSelector))] public object Data { get; set; } private static Dictionary OpCodeTypeSelector => new Dictionary() From 48425416d9c5b1bc0ab7fee1be810463fa2e0a90 Mon Sep 17 00:00:00 2001 From: FiniteReality Date: Sat, 19 Jan 2019 14:17:34 +0000 Subject: [PATCH 6/7] Remove incorrect assumption in WumpusAudioDataClient --- src/Wumpus.Net.Audio/WumpusAudioDataClient.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs index e830777..04d1952 100644 --- a/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs +++ b/src/Wumpus.Net.Audio/WumpusAudioDataClient.cs @@ -112,8 +112,7 @@ IPEndPoint ParseDiscovery(ReadOnlySpan discovery) // trim zeros and parse port discovery = discovery.Slice(discovery.Length - 2); - // NOTE: not little endian, like the docs say! - if (!BinaryPrimitives.TryReadUInt16BigEndian(discovery, out var port)) + if (!BinaryPrimitives.TryReadUInt16LittleEndian(discovery, out var port)) return null; return new IPEndPoint(address, port); From 9bee8ba0e8d7a95779cd947c170c37d62b568ffc Mon Sep 17 00:00:00 2001 From: FiniteReality Date: Thu, 23 May 2019 22:00:10 +0100 Subject: [PATCH 7/7] Use newer Roslyn to fix some compilation issues --- src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj b/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj index 888ac5f..9b132df 100644 --- a/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj +++ b/src/Wumpus.Net.Audio/Wumpus.Net.Audio.csproj @@ -8,6 +8,9 @@ netstandard2.0 + + all +