用 Golang 实现基于 Redis 的安全高效 RPC 通信

2020-03-17 09:28:11 +08:00
 tikazyq

前言

RPC ( Remote Procedure Call ),翻译过来为“远程过程调用”,是一种分布式系统中服务或节点之间的有效通信机制。通过 RPC,某个节点(或客户端)可以很轻松的调用远端(或服务端)的方法或服务,就像在本地调用一样简单。现有的很多 RPC 框架都要求暴露服务端地址,也就是需要知道服务器的 IP 和 RPC 端口。而本篇文章将介绍一种不需要暴露 IP 地址和端口的 RPC 通信方式。这种方式是基于 Redis BRPOP/BLPOP 操作实现的延迟队列,以及 Golang 中的 goroutine 协程异步机制,整个框架非常简单和易于理解,同时也很高效、稳定和安全。这种方式已经应用到了 Crawlab 中的节点通信当中,成为了各节点即时传输信息的主要方式。下面我们将从 Crawlab 早期节点通信方案 PubSub 开始,介绍当时遇到的问题和解决方案,然后如何过渡到现在的 RPC 解决方案,以及它是如何在 Crawlab 中发挥作用的。

基于 PubSub 的方案

PubSub

早期的 Crawlab 是基于 Redis 的 PubSub,也就是发布订阅模式。这是 Redis 中主要用于一对多的单向通信的方案。其用法非常简单:

  1. 订阅者利用SUBSCRIBE channel1 channel2 ...来订阅一个或多个频道;
  2. 发布者利用PUBLISH channelx message来发布消息给该频道的订阅者。

Redis 的PubSub可以用作广播模式,即一个发布者对应多个订阅者。而在 Crawlab 中,我们只有一个订阅者对应一个发布者的情况(主节点->工作节点:nodes:<node_id>)或一个订阅者对应多个发布者的情况(工作节点->主节点:nodes:master>)。这是为了方便双向通信。

以下为节点通信原理示意图。

各个节点会通过 Redis 的PubSub功能来做相互通信。

所谓PubSub,简单来说是一个发布订阅模式。订阅者( Subscriber )会在 Redis 上订阅( Subscribe )一个通道,其他任何一个节点都可以作为发布者( Publisher )在该通道上发布( Publish )消息。

通信架构

在 Crawlab 中,主节点会订阅 nodes:master 通道,其他节点如果需要向主节点发送消息,只需要向 nodes:master 发布消息就可以了。同理,各工作节点会各自订阅一个属于自己的通道 nodes:<node_id>node_id 是 MongoDB 里的节点 ID,是 MongoDB ObjectId ),如果需要给工作节点发送消息,只需要发布消息到该通道就可以了。

一个网络请求的简单过程如下:

  1. 客户端(前端应用)发送请求给主节点( API );
  2. 主节点通过 Redis PubSub<nodes:<node_id> 通道发布消息给相应的工作节点;
  3. 工作节点收到消息之后,执行一些操作,并将相应的消息通过 <nodes:master> 通道发布给主节点;
  4. 主节点收到消息之后,将消息返回给客户端。

不是所有节点通信都是双向的,也就是说,主节点只会单方面对工作节点通信,工作节点并不会返回响应给主节点,所谓的单向通信。以下是 Crawlab 的通信类型。

changoroutine

如果您在阅读 Crawlab 源码,会发现节点通信中有大量的 chan 语法,这是 Golang 的一个并发特性。

chan 表示为一个通道,在 Golang 中分为无缓冲和有缓冲的通道,我们用了无缓冲通道来阻塞协程,只有当 chan 接收到信号(chan <- "some signal"),该阻塞才会释放,协程进行下一步操作)。在请求响应模式中,如果为双向通信,主节点收到请求后会起生成一个无缓冲通道来阻塞该请求,当收到来自工作节点的消息后,向该无缓冲通道赋值,阻塞释放,返回响应给客户端。

go 命令会起一个 goroutine(协程)来完成并发,配合 chan,该协程可以利用无缓冲通道挂起,等待信号执行接下来的操作。

基于延迟队列的方案

PubSub 方案的问题

PubSub 这种消息订阅-发布设计模式是一种有效的实现节点通信的方式,但是它有两个问题:

  1. PubSub 的数据是即时的,会随着 Redis 宕机而丢失;
  2. 写基于 PubSub 的通信服务会要求用到 goroutinechannel,这加大了开发难度,降低了可维护性。

其中,第二个问题是比较棘手的。如果我们希望加入更多的功能,需要写大量的异步代码,这会加大系统模块间的耦合度,造成扩展性很差,而且代码阅读起来很痛苦。

因此,为了解决这个问题,我们采用了基于 Redis 延迟消息队列的 RPC 服务。

延迟队列架构

下图是基于延迟队列架构的 RPC 实现示意图。

每一个节点都有一个客户端( Client )和服务端( Server )。客户端用于发送消息到目标节点( Target Node )并接收其返回的消息,服务端用于接收、处理源节点( Source Node )的消息并返回消息给源节点的客户端。

整个 RPC 通信的流程如下:

  1. 源节点的客户端通过 LPUSH 将消息推送到 Redis 的 nodes:<node_id> 中,并执行 BRPOP nodes:<node_id>:<msg_id> 阻塞并监听这个消息队列;
  2. 目标节点的服务端通过 BRPOP 一直在监听 nodes:<node_id>,收到消息后,通过消息中的 Method 字段执行对应的程序;
  3. 目标节点执行完毕后,服务端通过 LPUSH 将消息推送到 Redis 的 nodes:<node_id>:<msg_id> 中;
  4. 由于源节点客户端一直在监听 nodes:<node_id>:<msg_id> 这个消息队列,当目标节点服务端推送消息到这个队列后,源节点客户端将立即收到返回的消息,再做后续处理。

这样,整个节点的通信流程就通过 Redis 完成了。这样做的好处在于不用暴露 HTTP 的 IP 地址和端口,只需要知道节点 ID 即可完成 RPC 通信。

这样设计后的 RPC 代码比较容易理解和维护。每次需要扩展新的通信类别时,只需要继承 rpc.Service 类,实现 ClientHandle(客户端处理方法)和 ServerHandle(服务端处理方法)方法就可以了。

这里多说一下 BRPOP。它将移出并获取消息队列的最后一个元素, 如果消息队列没有元素会阻塞队列直到等待超时或发现可弹出元素为止。因此,使用 BRPOP 命令相对于轮训或其他方式,可以避免不间断的请求 Redis,避免浪费网络和计算资源。

如果对 Redis 的操作命令不熟悉的,可以参考一下掘金小册《 Redis 深度历险:核心原理与应用实践》,这本小册深入介绍了 Redis 的原理以及工程实践,对于应用 Redis 到实际开发中非常实用。

代码实践

讲了这么多理论知识,我们还是需要看看代码的。老师常教育我们:“Talk is cheap. Show me the code.”

由于 Crawlab 后端是 Golang 开发的,要理解以下代码需要一些 Golang 的基础知识。

消息数据结构

首先我们需要定一个传输消息的数据结构。代码如下。

package entity

type RpcMessage struct {
	Id      string            `json:"id"`      // 消息 ID
	Method  string            `json:"method"`  // 消息方法
	NodeId  string            `json:"node_id"` // 节点 ID
	Params  map[string]string `json:"params"`  // 参数
	Timeout int               `json:"timeout"` // 超时
	Result  string            `json:"result"`  // 结果
	Error   string            `json:"error"`   // 错误
}

这里,我们定义了消息 ID、方法、节点 ID、参数等字段。消息 ID 是 UUID,保证了消息 ID 的唯一性。

基础接口

首先,我们定义一个抽象基础接口,方便让实际业务逻辑模块继承。服务端的处理逻辑在 ServerHandle 中,返回 entity 里的 RpcMessage,而客户端的逻辑在 ClientHandle 中。

// RPC 服务基础类
type Service interface {
	ServerHandle() (entity.RpcMessage, error)
	ClientHandle() (interface{}, error)
}

客户端通用方法

当我们调用客户端的通用方法的时候,需要实现两个逻辑:

  1. 发送消息:生成消息 ID,将消息序列化为 JSON,LPUSH 推入 Redis 消息队列;
  2. 通过 BRPOP 延迟获取返回的消息,返回给调用方。

以下是实现的代码。

// 客户端处理消息函数
func ClientFunc(msg entity.RpcMessage) func() (entity.RpcMessage, error) {
	return func() (replyMsg entity.RpcMessage, err error) {
		// 请求 ID
		msg.Id = uuid.NewV4().String()

		// 发送 RPC 消息
		msgStr := utils.ObjectToString(msg)
		if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s", msg.NodeId), msgStr); err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 获取 RPC 回复消息
		dataStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s:%s", msg.NodeId, msg.Id), msg.Timeout)
		if err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 反序列化消息
		if err := json.Unmarshal([]byte(dataStr), &replyMsg); err != nil {
			log.Errorf("RpcClientFunc error: " + err.Error())
			debug.PrintStack()
			return replyMsg, err
		}

		// 如果返回消息有错误,返回错误
		if replyMsg.Error != "" {
			return replyMsg, errors.New(replyMsg.Error)
		}

		return
	}
}

服务端处理

服务端处理的逻辑如下,大致的逻辑是:

  1. 在一个循环中,通过 BRPOP 获取该节点对应的消息;
  2. 当获取到消息时,生成一个 goroutine 异步处理该消息;
  3. 继续等待。

您可以在 InitRpcService 这个方法中看到上述逻辑。私有方法 handleMsg 实现了序列化、调用服务端 RPC 服务方法、发送返回消息的逻辑。如果需要拓展 RPC 方法类型,在工厂类方法 GetService 里添加就可以了。

// 获取 RPC 服务
func GetService(msg entity.RpcMessage) Service {
	switch msg.Method {
	case constants.RpcInstallLang:
		return &InstallLangService{msg: msg}
	case constants.RpcInstallDep:
		return &InstallDepService{msg: msg}
	case constants.RpcUninstallDep:
		return &UninstallDepService{msg: msg}
	case constants.RpcGetLang:
		return &GetLangService{msg: msg}
	case constants.RpcGetInstalledDepList:
		return &GetInstalledDepsService{msg: msg}
	}
	return nil
}

// 处理 RPC 消息
func handleMsg(msgStr string, node model.Node) {
	// 反序列化消息
	var msg entity.RpcMessage
	if err := json.Unmarshal([]byte(msgStr), &msg); err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	// 获取 service
	service := GetService(msg)

	// 根据 Method 调用本地方法
	replyMsg, err := service.ServerHandle()
	if err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}

	// 发送返回消息
	if err := database.RedisClient.LPush(fmt.Sprintf("rpc:%s:%s", node.Id.Hex(), replyMsg.Id), utils.ObjectToString(replyMsg)); err != nil {
		log.Errorf(err.Error())
		debug.PrintStack()
	}
}

// 初始化服务端 RPC 服务
func InitRpcService() error {
	go func() {
		for {
			// 获取当前节点
			node, err := model.GetCurrentNode()
			if err != nil {
				log.Errorf(err.Error())
				debug.PrintStack()
				continue
			}

			// 获取获取消息队列信息
			msgStr, err := database.RedisClient.BRPop(fmt.Sprintf("rpc:%s", node.Id.Hex()), 0)
			if err != nil {
				if err != redis.ErrNil {
					log.Errorf(err.Error())
					debug.PrintStack()
				}
				continue
			}

			// 处理消息
			go handleMsg(msgStr, node)
		}
	}()
	return nil
}

调用方法例子

Crawlab 的节点上经常需要为爬虫安装一些第三方依赖,例如 pymongo、requests 等。而其中,我们也需要直到某个节点上是否已经安装了某个依赖,这需要跨服务器通信,也就是需要在分布式网络中进行双向通信。而这个逻辑是通过 RPC 实现的。主节点向目标节点发起 RPC 调用,目标节点运行被调用方法,将运行结果也就是安装的依赖列表返回给客户端,客户端再返回给调用者。

下面的代码实现了获取目标节点上已安装的依赖的 RPC 方法。

// 获取已安装依赖服务
// 继承 Service 基础类
type GetInstalledDepsService struct {
	msg entity.RpcMessage
}

// 服务端处理方法
// 重载 ServerHandle
func (s *GetInstalledDepsService) ServerHandle() (entity.RpcMessage, error) {
	lang := utils.GetRpcParam("lang", s.msg.Params)
	deps, err := GetInstalledDepsLocal(lang)
	if err != nil {
		s.msg.Error = err.Error()
		return s.msg, err
	}
	resultStr, _ := json.Marshal(deps)
	s.msg.Result = string(resultStr)
	return s.msg, nil
}

// 客户端处理方法
// 重载 ClientHandle
func (s *GetInstalledDepsService) ClientHandle() (o interface{}, err error) {
	// 发起 RPC 请求,获取服务端数据
	s.msg, err = ClientFunc(s.msg)()
	if err != nil {
		return o, err
	}

	// 反序列化
	var output []entity.Dependency
	if err := json.Unmarshal([]byte(s.msg.Result), &output); err != nil {
		return o, err
	}
	o = output

	return
}

发起调用

写好了 RPC 服务端和客户端处理方法,就可以轻松编写调用逻辑了。以下是调用获取远端已安装依赖列表的方法。首先由 GetService 工厂类获取之前定义好的 GetInstalledDepsService,再调用其客户端处理方法 ClientHandle,然后返回结果。这就像在本地调用方法一样。是不是很简单?

// 获取远端已安装依赖
func GetInstalledDepsRemote(nodeId string, lang string) (deps []entity.Dependency, err error) {
	params := make(map[string]string)
	params["lang"] = lang
	s := GetService(entity.RpcMessage{
		NodeId:  nodeId,
		Method:  constants.RpcGetInstalledDepList,
		Params:  params,
		Timeout: 60,
	})
	o, err := s.ClientHandle()
	if err != nil {
		return
	}
	deps = o.([]entity.Dependency)
	return
}

结语

本篇文章主要介绍了一种基于 Redis 延迟队列的 RPC 通信方式,这种方式不用暴露各个节点或服务的 IP 地址或端口,是一种非常安全的方式。而且,这种方式已经用 Golang 在 Crawlab 中实现了双向通信,特别是 Golang 中的天生支持异步的 goroutine,让这种方式的实现变得简单。实际上,这种方式理论上是非常高效的,能够支持高并发数据传输。

但是,在 Crawlab 的实现中还存在一些隐患,也就是它并没有限制服务端的处理并发数量。因此如果传输消息过多时,服务端资源会被占满,导致处理速度变慢甚至宕机的风险。修复方式是在服务端限制并发数量。另外,限于时间的原因,作者还没有来得及测试这种 RPC 通信方式的实际传输效率,容错机制也没有加入。因此总的来说还有很大的提升和优化空间。

虽然如此,这种方式对于 Crawlab 的低并发远程通信来说是足够的了,在实际使用中也没有出现问题,非常稳定。对于隐秘性有要求、希望不暴露地址信息的开发者,我们也推荐将该种方式在实际应用中尝试。

参考

4993 次点击
所在节点    程序员
34 条回复
tikazyq
2020-03-17 13:22:54 +08:00
@kaneg 确实是的,我们的项目也是因为只能靠 Redis 连着,就索性干脆用 Redis 来做 RPC
tikazyq
2020-03-17 13:25:33 +08:00
@iyaozhen 这种方案性能上肯定是没有常规 RPC 高的,Redis 单点故障可能可以通过集群的方式来解决,json 的方式确实问题很多,但这是最简单的方式了。

总的来说,优化空间很多,这种方式比起传统的 RPC 来说肯定是有局限性的,但在实际特殊的应用场景中却可以发挥作用
xwhxbg
2020-03-17 16:18:23 +08:00
支持 crawlab 先,我在用 node 爬虫,要是能做成支持 docker 爬虫就好了,用户直接丢个镜像过去,也不用装依赖了
tikazyq
2020-03-17 16:36:27 +08:00
@xwhxbg 在路线规划中 ;)
zunceng
2020-03-17 17:52:27 +08:00
myzWILLmake
2020-03-17 18:09:05 +08:00
@tikazyq 是的用的 Redis。队列性能的话只有单机测试 LPOP/LPUSH,基本可以达到每秒 10w 请求,够我们业务模型用了,实际业务压力测试瓶颈不在消息队列上。
tikazyq
2020-03-17 19:05:20 +08:00
@zunceng 太给力了,简直就是这个思路的 rabbitmq 翻版
tikazyq
2020-03-17 19:05:56 +08:00
@myzWILLmake 那说明性能还是不错的,值得深入研究一下
huahuacui
2020-03-18 09:46:21 +08:00
我觉得可以,有些服务可以借鉴一下
piglovesx
2020-03-18 14:53:33 +08:00
学习了
123444a
2020-03-22 01:57:47 +08:00
楼主,我是服了,你不考虑 1%节点挂了的情况么,明明可以直连,却多此一举 Redis,你要是握手阶段就算了,居然一直不让双方直连,打算任务超时设多久?
tikazyq
2020-03-23 09:06:07 +08:00
@123444a 有些时候不希望或者不能暴露地址信息,这种方式就有效了,至于高可用方面,还可以通过集群方式来处理
123444a
2020-03-24 19:29:41 +08:00
@tikazyq 不止高可用问题,还有如何重试,如何调度,要不要幂等,有些 worker 如果阻塞,系统就歇菜了,其实比喻很简单就是我和你通过 email 来交互执行银行转账,缺点就是慢和不好处理异常情况
tikazyq
2020-03-25 14:58:26 +08:00
@123444a 是有各种各样的问题存在,不过我们的项目已经这样用了,目前还没遇到什么问题,https://github.com/crawlab-team/crawlab

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/653446

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX