Skip to content

Commit

Permalink
Initial rework of sentinels client.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Aug 16, 2024
1 parent c87f30b commit 37594c0
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 77 deletions.
41 changes: 41 additions & 0 deletions .github/workflows/test-sentinel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Test Sentinel

on: [push, pull_request]

permissions:
contents: read

env:
CONSOLE_OUTPUT: XTerm

jobs:
test:
name: ${{matrix.ruby}} on ${{matrix.os}}
runs-on: ${{matrix.os}}-latest
continue-on-error: ${{matrix.experimental}}

strategy:
matrix:
os:
- ubuntu

ruby:
- "3.1"
- "3.2"
- "3.3"

experimental: [false]

steps:
- uses: actions/checkout@v4

- name: Install Docker Compose
run: |
sudo apt-get update
sudo apt-get install -y docker-compose
- name: Run tests
timeout-minutes: 10
env:
RUBY_VERSION: ${{matrix.ruby}}
run: docker-compose -f sentinel/docker-compose.yaml up tests
2 changes: 1 addition & 1 deletion lib/async/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

require_relative 'redis/version'
require_relative 'redis/client'
require_relative 'redis/sentinels'
require_relative 'redis/sentinel_client'
118 changes: 61 additions & 57 deletions lib/async/redis/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,65 @@ module Redis
class Client
include ::Protocol::Redis::Methods

module Methods
def subscribe(*channels)
context = Context::Subscribe.new(@pool, channels)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

def transaction(&block)
context = Context::Transaction.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

alias multi transaction

def pipeline(&block)
context = Context::Pipeline.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

# Deprecated.
alias nested pipeline

def call(*arguments)
@pool.acquire do |connection|
connection.write_request(arguments)

connection.flush

return connection.read_response
end
end

def close
@pool.close
end
end

include Methods

def initialize(endpoint = Endpoint.local, protocol: endpoint.protocol, **options)
@endpoint = endpoint
@protocol = protocol
Expand All @@ -38,8 +97,8 @@ def initialize(endpoint = Endpoint.local, protocol: endpoint.protocol, **options

# @return [client] if no block provided.
# @yield [client, task] yield the client in an async task.
def self.open(*arguments, &block)
client = self.new(*arguments)
def self.open(*arguments, **options, &block)
client = self.new(*arguments, **options)

return client unless block_given?

Expand All @@ -52,61 +111,6 @@ def self.open(*arguments, &block)
end.wait
end

def close
@pool.close
end

def subscribe(*channels)
context = Context::Subscribe.new(@pool, channels)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

def transaction(&block)
context = Context::Transaction.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

alias multi transaction

def pipeline(&block)
context = Context::Pipeline.new(@pool)

return context unless block_given?

begin
yield context
ensure
context.close
end
end

# Deprecated.
alias nested pipeline

def call(*arguments)
@pool.acquire do |connection|
connection.write_request(arguments)

connection.flush

return connection.read_response
end
end

protected

def connect(**options)
Expand Down
6 changes: 5 additions & 1 deletion lib/async/redis/endpoint.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ class Endpoint < ::IO::Endpoint::Generic

def self.local(**options)
self.new(LOCALHOST, **options)
end
end

def self.remote(host, port = 6379, **options)
self.new(URI.parse("redis://#{host}:#{port}"), **options)
end

SCHEMES = {
'redis' => URI::Generic,
Expand Down
40 changes: 22 additions & 18 deletions lib/async/redis/sentinels.rb → lib/async/redis/sentinel_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,29 @@
# Copyright, 2020, by David Ortiz.
# Copyright, 2023-2024, by Samuel Williams.

require_relative 'client'
require 'io/stream'

module Async
module Redis
class SentinelsClient < Client
def initialize(master_name, sentinels, role = :master, protocol = Protocol::RESP2, **options)
class SentinelClient
DEFAULT_MASTER_NAME = 'mymaster'

include ::Protocol::Redis::Methods
include Client::Methods

def initialize(endpoints, master_name: DEFAULT_MASTER_NAME, role: :master, protocol: Protocol::RESP2, **options)
@endpoints = endpoints
@master_name = master_name

@sentinel_endpoints = sentinels.map do |sentinel|
::IO::Endpoint.tcp(sentinel[:host], sentinel[:port])
end

@role = role
@protocol = protocol

@pool = connect(**options)
end

private
protected

# Override the parent method. The only difference is that this one needs
# to resolve the master/slave address.
# Override the parent method. The only difference is that this one needs to resolve the master/slave address.
def connect(**options)
Async::Pool::Controller.wrap(**options) do
endpoint = resolve_address
Expand All @@ -45,28 +47,30 @@ def resolve_address
raise ArgumentError, "Unknown instance role #{@role}"
end => address

Console.info(self, "Resolved #{@role} address: #{address}")

address or raise RuntimeError, "Unable to fetch #{@role} via Sentinel."
end

def resolve_master
@sentinel_endpoints.each do |sentinel_endpoint|
client = Client.new(sentinel_endpoint, protocol: @protocol)
@endpoints.each do |endpoint|
client = Client.new(endpoint)

begin
address = client.call('sentinel', 'get-master-addr-by-name', @master_name)
rescue Errno::ECONNREFUSED
next
end

return ::IO::Endpoint.tcp(address[0], address[1]) if address
return Endpoint.remote(address[0], address[1]) if address
end

nil
return nil
end

def resolve_slave
@sentinel_endpoints.each do |sentinel_endpoint|
client = Client.new(sentinel_endpoint, protocol: @protocol)
@endpoints.each do |endpoint|
client = Client.new(endpoint)

begin
reply = client.call('sentinel', 'slaves', @master_name)
Expand All @@ -78,10 +82,10 @@ def resolve_slave
next if slaves.empty?

slave = select_slave(slaves)
return ::IO::Endpoint.tcp(slave['ip'], slave['port'])
return Endpoint.remote(slave['ip'], slave['port'])
end

nil
return nil
end

def available_slaves(reply)
Expand Down
25 changes: 25 additions & 0 deletions sentinel/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
services:
redis-master:
image: redis
redis-slave:
image: redis
command: redis-server --slaveof redis-master 6379
depends_on:
- redis-master
redis-sentinel:
image: redis
command: redis-sentinel /etc/redis/sentinel.conf
volumes:
- ./sentinel.conf:/etc/redis/sentinel.conf
depends_on:
- redis-master
- redis-slave
tests:
image: ruby:${RUBY_VERSION:-latest}
volumes:
- ../:/code
command: bash -c "cd /code && bundle install && bundle exec sus sentinel/test"
depends_on:
- redis-master
- redis-slave
- redis-sentinel
19 changes: 19 additions & 0 deletions sentinel/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Sentinel Testing

To test sentinels, you need to set up master, slave and sentinel instances.

## Setup

``` bash
$ docker-compose -f config/sentinel/docker-compose.yaml up -d
[+] Running 3/3
✔ Container sentinel-redis-master-1 Running 0.0s
✔ Container sentinel-redis-slave-1 Running 0.0s
✔ Container sentinel-redis-sentinel-1 Started 0.2s
```

## Test

``` bash
$ ASYNC_REDIS_MASTER=redis://redis-master:6379 ASYNC_REDIS_SLAVE=redis://redis-slave:6379 ASYNC_REDIS_SENTINEL=redis://redis-sentinel:26379 bundle exec sus
```
6 changes: 6 additions & 0 deletions sentinel/sentinel.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
port 26379
sentinel resolve-hostnames yes
sentinel monitor mymaster redis-master 6379 1
sentinel down-after-milliseconds mymaster 1000
sentinel failover-timeout mymaster 1000
sentinel parallel-syncs mymaster 1
35 changes: 35 additions & 0 deletions sentinel/test/async/redis/sentinel_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# frozen_string_literal: true

# Released under the MIT License.
# Copyright, 2018-2024, by Samuel Williams.
# Copyright, 2018, by Huba Nagy.
# Copyright, 2019, by David Ortiz.

require 'async/clock'
require 'async/redis/sentinel_client'
require 'sus/fixtures/async'

describe Async::Redis::SentinelClient do
include Sus::Fixtures::Async::ReactorContext

let(:master_host) {"redis://redis-master:6379"}
let(:slave_host) {"redis://redis-slave:6379"}
let(:sentinel_host) {"redis://redis-sentinel:26379"}

let(:sentinels) {[
Async::Redis::Endpoint.parse(sentinel_host)
]}

let(:client) {subject.new(sentinels)}
let(:slave_client) {subject.new(sentinels, role: :slave)}

it "should resolve master address" do
unless master_host and slave_host and sentinel_host
skip("No sentinel host provided.")
end

client.set("key", "value")

expect(slave_client.get("key")).to be == "value"
end
end

0 comments on commit 37594c0

Please sign in to comment.