from : https://mp.weixin.qq.com/s/D_xlof0mNs4UZi973vaOXw 有些图裂了,看原文比较方便~
最近跟同事请教了一下 redis 相关的事情,就找来了一下 redis 的驱动,看看这些库是怎么做 redis cluster
的 pipeline
以及 transaction
的,以下就把相关流程的代码剖析一下,还是有一些有意思的点的。
因为 C 语言比较底层,其他语言感觉描述性都差了一点,我找的是 elixir 的库来看的,质量很高。
事后才发现原来这个 elixir 的 redis 库的作者是 elixir 这门语言的核心开发者; P
正文开始。
首先呢,Elixir 的这个库不支持 redis 集群,后来有人基于它扩展成支持简单的集群,所以先讲普通的怎么做,再扩展。
这个库是单进程异步,当你发命令过来时,此库处理完后会马上发给 Redis 服务器,然后就可以接收新的命令,当 Redis Server 答复时,会返回此Reply
给你。
一般连接池有通用的库,所以交给调用方来做,库只处理每个连接的请求。
ps,上面这个标题就是来自 redis 官网的,明显 RE
是 typo。
Redis 用的协议RESP
是自己定的文本协议,客户端与服务端直接通过 TCP 连接通讯。
这个文本协议,其实就是对数据的序列化,以下就是规则:
*
"对于客户端而言,发过去给服务器的命令其实数据结构就是数组,所以只需要*数组长度\r\n$数组[0]里命令的长度\r\n 数组[0]里命令
。
说起来有点抽象,看看实际例子:
LLEN mylist
按照协议 encode 就变成 *2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
的文本,
LLEN
以及 6 个字符的mylist
SET mykey 1
按协议 encode 就变成*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"
SET
以及 5 个字符的mykey
,还有 1 个字符的1
可以看看这个库是怎么做的,就是递归拼接,记录数组的长度,最后在最开头拼上*数组长度
。
@doc ~S"""
Packs a list of Elixir terms to a Redis (RESP) array.
This function returns an iodata (instead of a binary) because the packed
result is usually sent to Redis through `:gen_tcp.send/2` or similar. It can
be converted to a binary with `IO.iodata_to_binary/1`.
All elements of `elems` are converted to strings with `to_string/1`, hence
this function supports encoding everything that implements `String.Chars`.
## Examples
iex> iodata = Redix.Protocol.pack(["SET", "mykey", 1])
iex> IO.iodata_to_binary(iodata)
"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"
"""
@crlf_iodata [?\r, ?\n]
@spec pack([binary]) :: iodata
def pack(items) when is_list(items) do
pack(items, [], 0)
end
defp pack([item | rest], acc, count) do
item = to_string(item)
new_acc = [acc, [?$, Integer.to_string(byte_size(item)), @crlf_iodata, item, @crlf_iodata]]
pack(rest, new_acc, count + 1)
end
defp pack([], acc, count) do
[?*, Integer.to_string(count), @crlf_iodata, acc]
end
作为 client 的库,维护长连接,避免频繁创建连接,这个是常规操作。
而有趣的是,作者使用了 erlang OTP
自带的状态机框架 gen_statem
来维持 TCP 长连接,这个功能是OTP 19
也就是 16 年才推出的,在不知道此作者是 elixir 语言的贡献者前,我还小小的膜拜了一下。
状态机如下图,初始状态不是同步连接,就是 connecting 状态;同步的话,成功就是处于 connected 状态。
状态的动作依靠 TCP
的事件消息来驱动,状态转移自己控制。
举例子:
def disconnected({:timeout, :reconnect}, _timer_info, %__MODULE__{} = data) do
{:ok, socket_owner} = SocketOwner.start_link(self(), data.opts, data.table)
new_data = %{data | socket_owner: socket_owner}
{:next_state, :connecting, new_data}
end
以上代码就是在 discconected
状态收到 TCP
的{:timeout, :reconnect}
消息,创建一个新的TCP socket
进程,将状态转移到:connecting
。
而 socket
进程在初始化时,会发送connect
消息给自己:
def handle_info(:connect, state) do
with {:ok, socket, address} <- Connector.connect(state.opts),
:ok <- setopts(state, socket, active: :once) do
send(state.conn, {:connected, self(), socket, address})
{:noreply, %{state | socket: socket}}
else
{:error, reason} -> stop(reason, state)
{:stop, reason} -> stop(reason, state)
end
end
成功了,就发送connected
消息给原来的状态机进程(也就是 connection 进程)
,connection
进程处于connecting
状态时,接受此消息,更新 socket 信息,状态转移到 connected
。
def connecting(
:info,
{:connected, owner, socket, address},
%__MODULE__{socket_owner: owner} = data
) do
if data.backoff_current do
:telemetry.execute([:redix, :reconnection], %{}, %{
connection: data.opts[:name] || self(),
address: address
})
end
data = %{data | socket: socket, backoff_current: nil, connected_address: address}
{:next_state, :connected, %{data | socket: socket}}
end
Redis 执行命令主要有 Comand
、Pipeline
以及Trasaction
三种概念:
command
:一问一答式的,客户端等待 server 返回消息;Pipeline
:发送一连串命令,这些命令发往 server,不用一问一答,收到命令马上返回。sever 以队列执行,执行完后全部结果返回回来;Trasaction
:依靠MULTI
/EXEC
命令,MULTI
命令开始Trasaction
,此后发送的命令都存到 server 的队列里,EXEC
命令发送后马上这队列里所有命令;期间不会有其他命令影响这些命令的执行。库里把 Command
命令用 Pipeline
来做,其实本质是一样的。
以下的pipeline
就是负责用户调用的函数,:gen_statem.cast
就是把消息数据传给状态机,接着就是起了一个进程来监控这个连接,挂了就退出;同时阻塞等待状态机完成处理获得数据后发消息过来。
def pipeline(conn, commands, timeout) do
conn = GenServer.whereis(conn)
request_id = Process.monitor(conn)
# We cast to the connection process knowing that it will reply at some point,
# either after roughly timeout or when a response is ready.
cast = {:pipeline, commands, _from = {self(), request_id}, timeout}
:ok = :gen_statem.cast(conn, cast)
receive do
{^request_id, resp} ->
_ = Process.demonitor(request_id, [:flush])
resp
{:DOWN, ^request_id, _, _, reason} ->
exit(reason)
end
end
状态机这块的代码就是:
def connected(:cast, {:pipeline, commands, from, timeout}, data) do
{ncommands, data} = get_client_reply(data, commands)
if ncommands > 0 do
{counter, data} = get_and_update_in(data.counter, &{&1, &1 + 1})
row = {counter, from, ncommands, _timed_out? = false}
:ets.insert(data.table, row)
case data.transport.send(data.socket, Enum.map(commands, &Protocol.pack/1)) do
:ok ->
actions =
case timeout do
:infinity -> []
_other -> [{{:timeout, {:client_timed_out, counter}}, timeout, from}]
end
{:keep_state, data, actions}
{:error, _reason} ->
# The socket owner will get a closed message at some point, so we just move to the
# disconnected state.
:ok = data.transport.close(data.socket)
{:next_state, :disconnected, data}
end
else
reply(from, {:ok, []})
{:keep_state, data}
end
end
没什么特别的,get_client_reply
就是处理客户端是否想得到服务器回复的命令的 CLIENT REPLY
的各种指令,
defp get_client_reply([command | rest], ncommands, client_reply) do
case parse_client_reply(command) do
:off -> get_client_reply(rest, ncommands, :off)
:skip when client_reply == :off -> get_client_reply(rest, ncommands, :off)
:skip -> get_client_reply(rest, ncommands, :skip)
:on -> get_client_reply(rest, ncommands + 1, :on)
nil when client_reply == :on -> get_client_reply(rest, ncommands + 1, client_reply)
nil when client_reply == :off -> get_client_reply(rest, ncommands, client_reply)
nil when client_reply == :skip -> get_client_reply(rest, ncommands, :on)
end
end
接着就是把命令序列号成 RESP,使用data.transport.send
发送给服务器,其实 Redis 除了 TCP 外还可以使用 SSL/TLS 协议,所以就有了这一层抽象。
如果是 TCP,那么 socket 服务就会在 redis 服务器返回消息后,此函数接收自动处理:
def handle_info({transport, socket, data}, %__MODULE__{socket: socket} = state)
when transport in [:tcp, :ssl] do
:ok = setopts(state, socket, active: :once)
state = new_data(state, data)
{:noreply, state}
end
官网写的很好了,我简单说一下好了。
Redis Cluster does not use consistent hashing, but a different form of sharding where every key is conceptually part of what we call an hash slot.
Redis Cluster
没有用一致性哈希算法,而是用了hash slot
(哈希桶)
There are 16384 hash slots in Redis Cluster, and to compute what is the hash slot of a given key, we simply take the CRC16 of the key modulo 16384.
redis 会固定分配 16384 个 slots 到不同的节点,用的算法就是对 key 做 CRC16 然后对 16384 取模: HASH_SLOT = CRC16(key) mod 16384
例子如下:
Every node in a Redis Cluster is responsible for a subset of the hash slots, so for example you may have a cluster with 3 nodes, where:
- Node A contains hash slots from 0 to 5500.
- Node B contains hash slots from 5501 to 11000.
- Node C contains hash slots from 11001 to 16383.
This allows to add and remove nodes in the cluster easily. For example if I want to add a new node D, I need to move some hash slot from nodes A, B, C to D. Similarly if I want to remove node A from the cluster I can just move the hash slots served by A to B and C. When the node A will be empty I can remove it from the cluster completely.
用这样的算法,比一致性哈希方便,更有操作性:
Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same hash slot.
Because moving hash slots from a node to another does not require to stop operations, adding and removing nodes, or changing the percentage of hash slots hold by nodes, does not require any downtime.
对于 redis 或者对用户来说,可以轻松地分配移动 slots ;
而一致性哈希就只能自己算虚拟节点,并且『祈求』之后请求量多了最终达到想要的平衡了。
#####redix-cluster
原版没有支持集群,zhongwencool/redix-cluster 写了一个简单的包装版本。
只需要看这段,就很清楚为了集群做了些啥:
@spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term}
def pipeline(pipeline, opts) do
case RedixCluster.Monitor.get_slot_cache do
{:cluster, slots_maps, slots, version} ->
pipeline
|> parse_keys_from_pipeline
|> keys_to_slot_hashs
|> is_same_slot_hashs
|> get_pool_by_slot(slots_maps, slots, version)
|> query_redis_pool(pipeline, :pipeline, opts)
{:not_cluster, version, pool_name} ->
query_redis_pool({version, pool_name}, pipeline, :pipeline, opts)
end
end
|>
就是类似 unix 的 管道 |
,把函数返回值当做下个函数的第一个参数传给他。
get_slot_cache
就是获取 redis 的cluster slots
这个记录,并且缓存起来。
CLUSTER SLOTS returns details about which cluster slots map to which Redis instances.
parse_keys_from_pipeline
将全部 keys 从Pineline
命令里提取出来keys_to_slot_hashs
找出 各个 key 在哪个 hash slotis_same_slot_hashs
判断所有 key 是不是在同一个 hash slot,是的,这个还不支持跨 slot,我在准备帮他写一个get_pool_by_slot
项目用了连接池来管理,所以要根据名字找对应的连接query_redis_pool
就是调用 原来的 Redix 做处理了简单来说,这个库就是残废的,哈哈哈。。。
不支持分布不同 slot,就是玩具。
总的来说就是这样子,还算是有挺多有趣的地方的。
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.