源码剖析:如何写一个 redis driver 库(驱动)

2019-10-27 12:46:16 +08:00
 ChristopherWu

from : https://mp.weixin.qq.com/s/D_xlof0mNs4UZi973vaOXw 有些图裂了,看原文比较方便~

前言

最近跟同事请教了一下 redis 相关的事情,就找来了一下 redis 的驱动,看看这些库是怎么做 redis clusterpipeline 以及 transaction的,以下就把相关流程的代码剖析一下,还是有一些有意思的点的。

因为 C 语言比较底层,其他语言感觉描述性都差了一点,我找的是 elixir 的库来看的,质量很高。

事后才发现原来这个 elixir 的 redis 库的作者是 elixir 这门语言的核心开发者; P

正文开始。

首先呢,Elixir 的这个库不支持 redis 集群,后来有人基于它扩展成支持简单的集群,所以先讲普通的怎么做,再扩展。

架构

这个库是单进程异步,当你发命令过来时,此库处理完后会马上发给 Redis 服务器,然后就可以接收新的命令,当 Redis Server 答复时,会返回此Reply给你。

一般连接池有通用的库,所以交给调用方来做,库只处理每个连接的请求。

RESP (REdis Serialization Protocol)

ps,上面这个标题就是来自 redis 官网的,明显 RE是 typo。

Redis 用的协议RESP是自己定的文本协议,客户端与服务端直接通过 TCP 连接通讯。

这个文本协议,其实就是对数据的序列化,以下就是规则:

对于客户端而言,发过去给服务器的命令其实数据结构就是数组,所以只需要*数组长度\r\n$数组[0]里命令的长度\r\n 数组[0]里命令

说起来有点抽象,看看实际例子:

  @doc ~S"""
  Packs a list of Elixir terms to a Redis (RESP) array.

  This function returns an iodata (instead of a binary) because the packed
  result is usually sent to Redis through `:gen_tcp.send/2` or similar. It can
  be converted to a binary with `IO.iodata_to_binary/1`.

  All elements of `elems` are converted to strings with `to_string/1`, hence
  this function supports encoding everything that implements `String.Chars`.

  ## Examples

      iex> iodata = Redix.Protocol.pack(["SET", "mykey", 1])
      iex> IO.iodata_to_binary(iodata)
      "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$1\r\n1\r\n"

  """
  @crlf_iodata [?\r, ?\n]
  @spec pack([binary]) :: iodata
  def pack(items) when is_list(items) do
    pack(items, [], 0)
  end

  defp pack([item | rest], acc, count) do
    item = to_string(item)
    new_acc = [acc, [?$, Integer.to_string(byte_size(item)), @crlf_iodata, item, @crlf_iodata]]
    pack(rest, new_acc, count + 1)
  end

  defp pack([], acc, count) do
    [?*, Integer.to_string(count), @crlf_iodata, acc]
  end

维护长连接

作为 client 的库,维护长连接,避免频繁创建连接,这个是常规操作。

而有趣的是,作者使用了 erlang OTP自带的状态机框架 gen_statem 来维持 TCP 长连接,这个功能是OTP 19也就是 16 年才推出的,在不知道此作者是 elixir 语言的贡献者前,我还小小的膜拜了一下。

状态机如下图,初始状态不是同步连接,就是 connecting 状态;同步的话,成功就是处于 connected 状态。

状态的动作依靠 TCP 的事件消息来驱动,状态转移自己控制。

举例子:

  def disconnected({:timeout, :reconnect}, _timer_info, %__MODULE__{} = data) do
    {:ok, socket_owner} = SocketOwner.start_link(self(), data.opts, data.table)
    new_data = %{data | socket_owner: socket_owner}
    {:next_state, :connecting, new_data}
  end

以上代码就是在 discconected状态收到 TCP{:timeout, :reconnect}消息,创建一个新的TCP socket进程,将状态转移到:connecting

socket 进程在初始化时,会发送connect消息给自己:

  def handle_info(:connect, state) do
    with {:ok, socket, address} <- Connector.connect(state.opts),
         :ok <- setopts(state, socket, active: :once) do
      send(state.conn, {:connected, self(), socket, address})
      {:noreply, %{state | socket: socket}}
    else
      {:error, reason} -> stop(reason, state)
      {:stop, reason} -> stop(reason, state)
    end
  end

成功了,就发送connected消息给原来的状态机进程(也就是 connection 进程)connection进程处于connecting状态时,接受此消息,更新 socket 信息,状态转移到 connected

  def connecting(
        :info,
        {:connected, owner, socket, address},
        %__MODULE__{socket_owner: owner} = data
      ) do
    if data.backoff_current do
      :telemetry.execute([:redix, :reconnection], %{}, %{
        connection: data.opts[:name] || self(),
        address: address
      })
    end

    data = %{data | socket: socket, backoff_current: nil, connected_address: address}
    {:next_state, :connected, %{data | socket: socket}}
  end

执行命令

Redis 执行命令主要有 ComandPipeline以及Trasaction三种概念:

库里把 Command 命令用 Pipeline来做,其实本质是一样的。

Pipeline

以下的pipeline就是负责用户调用的函数,:gen_statem.cast就是把消息数据传给状态机,接着就是起了一个进程来监控这个连接,挂了就退出;同时阻塞等待状态机完成处理获得数据后发消息过来。

  def pipeline(conn, commands, timeout) do
    conn = GenServer.whereis(conn)

    request_id = Process.monitor(conn)

    # We cast to the connection process knowing that it will reply at some point,
    # either after roughly timeout or when a response is ready.
    cast = {:pipeline, commands, _from = {self(), request_id}, timeout}
    :ok = :gen_statem.cast(conn, cast)

    receive do
      {^request_id, resp} ->
        _ = Process.demonitor(request_id, [:flush])
        resp

      {:DOWN, ^request_id, _, _, reason} ->
        exit(reason)
    end
  end

状态机这块的代码就是:

  def connected(:cast, {:pipeline, commands, from, timeout}, data) do
    {ncommands, data} = get_client_reply(data, commands)

    if ncommands > 0 do
      {counter, data} = get_and_update_in(data.counter, &{&1, &1 + 1})

      row = {counter, from, ncommands, _timed_out? = false}
      :ets.insert(data.table, row)

      case data.transport.send(data.socket, Enum.map(commands, &Protocol.pack/1)) do
        :ok ->
          actions =
            case timeout do
              :infinity -> []
              _other -> [{{:timeout, {:client_timed_out, counter}}, timeout, from}]
            end

          {:keep_state, data, actions}

        {:error, _reason} ->
          # The socket owner will get a closed message at some point, so we just move to the
          # disconnected state.
          :ok = data.transport.close(data.socket)
          {:next_state, :disconnected, data}
      end
    else
      reply(from, {:ok, []})
      {:keep_state, data}
    end
  end

没什么特别的,get_client_reply就是处理客户端是否想得到服务器回复的命令的 CLIENT REPLY的各种指令,

  defp get_client_reply([command | rest], ncommands, client_reply) do
    case parse_client_reply(command) do
      :off -> get_client_reply(rest, ncommands, :off)
      :skip when client_reply == :off -> get_client_reply(rest, ncommands, :off)
      :skip -> get_client_reply(rest, ncommands, :skip)
      :on -> get_client_reply(rest, ncommands + 1, :on)
      nil when client_reply == :on -> get_client_reply(rest, ncommands + 1, client_reply)
      nil when client_reply == :off -> get_client_reply(rest, ncommands, client_reply)
      nil when client_reply == :skip -> get_client_reply(rest, ncommands, :on)
    end
  end

接着就是把命令序列号成 RESP,使用data.transport.send发送给服务器,其实 Redis 除了 TCP 外还可以使用 SSL/TLS 协议,所以就有了这一层抽象。

如果是 TCP,那么 socket 服务就会在 redis 服务器返回消息后,此函数接收自动处理:

  def handle_info({transport, socket, data}, %__MODULE__{socket: socket} = state)
      when transport in [:tcp, :ssl] do
    :ok = setopts(state, socket, active: :once)
    state = new_data(state, data)
    {:noreply, state}
  end

支持 Redis Cluster

Redis Cluster 的分布式算法

官网写的很好了,我简单说一下好了。

Redis Cluster does not use consistent hashing, but a different form of sharding where every key is conceptually part of what we call an hash slot.

Redis Cluster没有用一致性哈希算法,而是用了hash slot(哈希桶)

There are 16384 hash slots in Redis Cluster, and to compute what is the hash slot of a given key, we simply take the CRC16 of the key modulo 16384.

redis 会固定分配 16384 个 slots 到不同的节点,用的算法就是对 key 做 CRC16 然后对 16384 取模: HASH_SLOT = CRC16(key) mod 16384

例子如下:

Every node in a Redis Cluster is responsible for a subset of the hash slots, so for example you may have a cluster with 3 nodes, where:

- Node A contains hash slots from 0 to 5500.
- Node B contains hash slots from 5501 to 11000.
- Node C contains hash slots from 11001 to 16383.

This allows to add and remove nodes in the cluster easily. For example if I want to add a new node D, I need to move some hash slot from nodes A, B, C to D. Similarly if I want to remove node A from the cluster I can just move the hash slots served by A to B and C. When the node A will be empty I can remove it from the cluster completely.

用这样的算法,比一致性哈希方便,更有操作性:

Redis Cluster implements a concept called hash tags that can be used in order to force certain keys to be stored in the same hash slot.

Because moving hash slots from a node to another does not require to stop operations, adding and removing nodes, or changing the percentage of hash slots hold by nodes, does not require any downtime.

对于 redis 或者对用户来说,可以轻松地分配移动 slots ;

而一致性哈希就只能自己算虚拟节点,并且『祈求』之后请求量多了最终达到想要的平衡了。

#####redix-cluster

原版没有支持集群,zhongwencool/redix-cluster 写了一个简单的包装版本。

只需要看这段,就很清楚为了集群做了些啥:

  @spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term}
  def pipeline(pipeline, opts) do
    case RedixCluster.Monitor.get_slot_cache do
      {:cluster, slots_maps, slots, version} ->
         pipeline
           |> parse_keys_from_pipeline
           |> keys_to_slot_hashs
           |> is_same_slot_hashs
           |> get_pool_by_slot(slots_maps, slots, version)
           |> query_redis_pool(pipeline, :pipeline, opts)
      {:not_cluster, version, pool_name} ->
         query_redis_pool({version, pool_name}, pipeline, :pipeline, opts)
    end
  end

|> 就是类似 unix 的 管道 |,把函数返回值当做下个函数的第一个参数传给他。

get_slot_cache就是获取 redis 的cluster slots这个记录,并且缓存起来。

CLUSTER SLOTS returns details about which cluster slots map to which Redis instances.

简单来说,这个库就是残废的,哈哈哈。。。

不支持分布不同 slot,就是玩具。

后文

总的来说就是这样子,还算是有挺多有趣的地方的。

2194 次点击
所在节点    程序员
2 条回复
yyfearth
2019-10-27 14:09:07 +08:00
@ChristopherWu “RESP (REdis Serialization Protocol) ”这样写不算是 typo 啦
只是说明一下 RESP 是 “RE”dis “S”erialization “P”rotocol 的缩写
取自 Redis 里面的 RE 加上 Serialization 的 S 和 Protocol 的 P
因为 Redis 里面取了两个字母 所以大写了一下
ChristopherWu
2019-10-27 14:19:44 +08:00
@yyfearth 对的,后来我意识到了,哈哈

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/613375

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX