Skip to content

Commit

Permalink
redis cluster client
Browse files Browse the repository at this point in the history
  • Loading branch information
sundream committed Jun 3, 2017
1 parent 94374cf commit 2798c77
Show file tree
Hide file tree
Showing 3 changed files with 559 additions and 0 deletions.
383 changes: 383 additions & 0 deletions lualib/skynet/db/redis/cluster.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,383 @@
-- a simple redis-cluster client
-- rewrite from https://github.com/antirez/redis-rb-cluster

local skynet = require "skynet"
local redis = require "skynet.db.redis"
local crc16 = require "skynet.db.redis.crc16"

local RedisClusterHashSlots = 16384
local RedisClusterRequestTTL = 16


local _M = {}

local rediscluster = {}
rediscluster.__index = rediscluster
_M.rediscluster = rediscluster

function _M.new(startup_nodes,opt)
if #startup_nodes == 0 then
startup_nodes = {startup_nodes,}
end
opt = opt or {}
local self = {
startup_nodes = startup_nodes,
max_connections = opt.max_connections or 16,
connections = setmetatable({},{__mode = "kv"}),
opt = opt,
refresh_table_asap = false,
}
setmetatable(self,rediscluster)
self:initialize_slots_cache()
return self
end

local function nodename(node)
return string.format("%s:%s",node.host,node.port)
end

function rediscluster:get_redis_link(node)
local conf = {
host = node.host,
port = node.port,
auth = self.opt.auth,
db = self.opt.db or 0,
}
return redis.connect(conf)
end

-- Given a node (that is just a Ruby hash) give it a name just
-- concatenating the host and port. We use the node name as a key
-- to cache connections to that node.
function rediscluster:set_node_name(node)
if not node.name then
node.name = nodename(node)
end
if not node.slaves then
local oldnode = self.name_node[node.name]
if oldnode then
node.slaves = oldnode.slaves
end
end
self.name_node[node.name] = node
end

-- Contact the startup nodes and try to fetch the hash slots -> instances
-- map in order to initialize the @slots hash.
function rediscluster:initialize_slots_cache()
self.slots = {}
self.nodes = {}
self.name_node = {}
for _,startup_node in ipairs(self.startup_nodes) do
local ok = pcall(function ()
local name = nodename(startup_node)
local conn = self.connections[name] or self:get_redis_link(startup_node)
local list = conn:cluster("slots")
for _,result in ipairs(list) do
local ip,port,runid = table.unpack(result[3])
local master_node = {
host = ip,
port = port,
runid = runid,
slaves = {},
}
self:set_node_name(master_node)
for i=4,#result do
local ip,port,runid = table.unpack(result[i])
local slave_node = {
host = ip,
port = port,
runid = runid,
}
self:set_node_name(slave_node)
table.insert(master_node.slaves,slave_node)
end
for slot=tonumber(result[1]),tonumber(result[2]) do
table.insert(self.nodes,master_node)
self.slots[slot] = master_node
end
end
self.refresh_table_asap = false
if not self.connections[name] then
self.connections[name] = conn
end
end)
-- Exit the loop as long as the first node replies
if ok then
break
end
end
end

-- Flush the cache, mostly useful for debugging when we want to force
-- redirection.
function rediscluster:flush_slots_cache()
self.slots = {}
end

-- Return the hash slot from the key.
function rediscluster:keyslot(key)
-- Only hash what is inside {...} if there is such a pattern in the key.
-- Note that the specification requires the content that is between
-- the first { and the first } after the first {. If we found {} without
-- nothing in the middle, the whole key is hashed as usually.
local startpos = string.find(key,"{",1,true)
if startpos then
local endpos = string.find(key,"}",startpos+1,true)
if endpos and endpos ~= startpos + 1 then
key = string.sub(key,startpos+1,endpos-1)
end
end
return crc16(key) % RedisClusterHashSlots
end

-- Return the first key in the command arguments.
--
-- Currently we just return argv[1], that is, the first argument
-- after the command name.
--
-- This is indeed the key for most commands, and when it is not true
-- the cluster redirection will point us to the right node anyway.
--
-- For commands we want to explicitly bad as they don't make sense
-- in the context of cluster, nil is returned.
function rediscluster:get_key_from_command(argv)
local cmd,key = table.unpack(argv)
cmd = string.lower(cmd)
if cmd == "info" or
cmd == "multi" or
cmd == "exec" or
cmd == "slaveof" or
cmd == "config" or
cmd == "shutdown" then
return nil
end
-- Unknown commands, and all the commands having the key
-- as first argument are handled here:
-- set, get, ...
return key
end

-- If the current number of connections is already the maximum number
-- allowed, close a random connection. This should be called every time
-- we cache a new connection in the @connections hash.
function rediscluster:close_existing_connection()
local length = 0
for name,conn in pairs(self.connections) do
length = length + 1
end
if length >= self.max_connections then
pcall(function ()
local name,conn = next(self.connections)
self.connections[name] = nil
conn:disconnect()
end)
end
end

function rediscluster:close_all_connection()
local connections = self.connections
self.connections = setmetatable({},{__mode = "kv"})
for name,conn in pairs(connections) do
pcall(conn.disconnect,conn)
end
end

function rediscluster:get_connection(node)
if type(node) == "string" then
local ip,port = string.match(node,"^([^:]+):([^:]+)$")
node = {host=ip,port=port}
end
local name = node.name or nodename(node)
local conn = self.connections[name]
if not conn then
conn = self:get_redis_link(node)
self.connections[name] = conn
end
return self.connections[name]
end

-- Return a link to a random node, or raise an error if no node can be
-- contacted. This function is only called when we can't reach the node
-- associated with a given hash slot, or when we don't know the right
-- mapping.
-- The function will try to get a successful reply to the PING command,
-- otherwise the next node is tried.
function rediscluster:get_random_connection()
-- shuffle
local shuffle_idx = {}
local startpos = 1
local endpos = #self.nodes
for i=startpos,endpos do
shuffle_idx[i] = i
end
for i=startpos,endpos do
local idx = math.random(i,endpos)
local tmp = shuffle_idx[i]
shuffle_idx[i] = shuffle_idx[idx]
shuffle_idx[idx] = tmp
end
for i,idx in ipairs(shuffle_idx) do
local ok,conn = pcall(function ()
local node = self.nodes[idx]
local conn = self.connections[node.name]
if not conn then
-- Connect the node if it is not connected
conn = self:get_redis_link(node)
if conn:ping() == "PONG" then
self:close_existing_connection()
self.connections[node.name] = conn
return conn
else
-- If the connection is not good close it ASAP in order
-- to avoid waiting for the GC finalizer. File
-- descriptors are a rare resource.
conn:disconnect()
end
else
-- The node was already connected, test the connection.
if conn:ping() == "PONG" then
return conn
end
end
end)
if ok and conn then
return conn
end
end
error("Can't reach a single startup node.")
end

-- Given a slot return the link (Redis instance) to the mapped node.
-- Make sure to create a connection with the node if we don't have
-- one.
function rediscluster:get_connection_by_slot(slot)
local node = self.slots[slot]
-- If we don't know what the mapping is, return a random node.
if not node then
return self:get_random_connection()
end
if not self.connections[node.name] then
local ok = pcall(function ()
self:close_existing_connection()
self.connections[node.name] = self:get_redis_link(node)
end)
if not ok then
if self.opt.read_slave and node.slaves and #node.slaves > 0 then
local slave_node = node.slaves[math.random(1,#node.slaves)]
local ok2,conn = pcall(self.get_connection,self,slave_node)
if ok2 then
conn:readonly() -- allow this connection read-slave
return conn
end
end
-- This will probably never happen with recent redis-rb
-- versions because the connection is enstablished in a lazy
-- way only when a command is called. However it is wise to
-- handle an instance creation error of some kind.
return self:get_random_connection()
end
end
return self.connections[node.name]
end

-- Dispatch commands.
function rediscluster:call(...)
local argv = table.pack(...)
if self.refresh_table_asap then
self:initialize_slots_cache()
end
local ttl = RedisClusterRequestTTL -- Max number of redirections
local err
local asking = false
local try_random_node = false
while ttl > 0 do
ttl = ttl - 1
local key = self:get_key_from_command(argv)
if not key then
error("No way to dispatch this command to Redis Cluster: " .. tostring(argv[1]))
end
local conn
local slot = self:keyslot(key)
if asking then
conn = self:get_connection(asking)
elseif try_random_node then
conn = self:get_random_connection()
try_random_node = false
else
conn = self:get_connection_by_slot(slot)
end
local result = {pcall(function ()
-- TODO: use pipelining to send asking and save a rtt.
if asking then
conn:asking()
end
asking = false
local cmd = argv[1]
local func = conn[cmd]
return func(conn,table.unpack(argv,2))
end)}
local ok = result[1]
if not ok then
err = table.unpack(result,2)
err = tostring(err)
if err == "[Error: socket]" then
-- may be nerver come here?
try_random_node = true
if ttl < RedisClusterRequestTTL/2 then
skynet.sleep(10)
end
else
-- err: ./lualib/skynet/socketchannel.lua:371: xxx
err = string.match(err,".+:%d+:%s(.*)$") or err
local errlist = {}
for e in string.gmatch(err,"([^%s]+)%s?") do
table.insert(errlist,e)
end
if (errlist[1] ~= "MOVED" and errlist[1] ~= "ASK") then
error(err)
else
if errlist[1] == "ASK" then
asking = true
else
-- Serve replied with MOVED. It's better for us to
-- ask for CLUSTER SLOTS the next time.
self.refresh_table_asap = true
end
local newslot = tonumber(errlist[2])
local node_ip,node_port = string.match(errlist[3],"^([^:]+):([^:]+)$")
local node = {
host = node_ip,
port = tonumber(node_port),
}
if not asking then
self:set_node_name(node)
self.slots[newslot] = node
else
asking = node
end
end
end
else
return table.unpack(result,2)
end
end
error(string.format("Too many Cluster redirections?,maybe node is disconnected (last error: %q)",err))
end

-- Currently we handle all the commands using method_missing for
-- simplicity. For a Cluster client actually it will be better to have
-- every single command as a method with the right arity and possibly
-- additional checks (example: RPOPLPUSH with same src/dst key, SORT
-- without GET or BY, and so forth).
setmetatable(rediscluster,{
__index = function (t,cmd)
t[cmd] = function (self,...)
return self:call(cmd,...)
end
return t[cmd]
end,
})


return _M
Loading

0 comments on commit 2798c77

Please sign in to comment.