gRPC-Go 源码阅读 - gRPC 拦截器

2021-05-16 09:21:38 +08:00
 wewin

在 gRPC 中有拦截器,拦截器可以对 RPC 的请求和响应做拦截处理,并且在客户端、服务端都可以进行拦截处理, 使用拦截器可以在 handler 执行前后做一些处理,更灵活的处理 gRPC 流程中的业务逻辑

在 gRPC-Go 的源码的 NewServer 方法中,我们可以看到如下代码


func NewServer(opt ...ServerOption) *Server {
	opts := defaultServerOptions
	for _, o := range opt {
		o.apply(&opts)
	}
	s := &Server{
		lis:      make(map[net.Listener]bool),
		opts:     opts,
		conns:    make(map[transport.ServerTransport]bool),
		services: make(map[string]*serviceInfo),
		quit:     grpcsync.NewEvent(),
		done:     grpcsync.NewEvent(),
		czData:   new(channelzData),
	}
	chainUnaryServerInterceptors(s)
	chainStreamServerInterceptors(s)
	s.cv = sync.NewCond(&s.mu)
	if EnableTracing {
		_, file, line, _ := runtime.Caller(1)
		s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
	}

	if s.opts.numServerWorkers > 0 {
		s.initServerWorkers()
	}

	if channelz.IsOn() {
		s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
	}
	return s
}

其中 chainUnaryServerInterceptors(s)chainStreamServerInterceptors(s) 就是将用户实现的拦截器注册到服务端拦截器链中,服务端拦截器有 unary interceptor 和 stream interceptor 两种, 在我目前看的 gRPC 版本(1.37.1)中, 服务端两种拦截器都是可以支持多个拦截器的,在更早的 gRPC 版本中服务端、可客户端只支持单个拦截器

gRPC 的拦截器实现特别简单

unary interceptor 拦截器只要实现 UnaryServerInterceptor 方法

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

stream interceptor 拦截器只要实现 StreamServerInterceptor 方法

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

下面我看可以通过一个简单的例子看看如何添加一个拦截器

服务端拦截器

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	interceptor "grpcinterceptor/proto"
	"grpcinterceptor/services"
	"log"
	"net"
)

func main() {
	rpcService := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptorFirst),
		grpc.StreamInterceptor(streamServerInterceptorFirst))

	interceptor.RegisterSayHiServiceServer(rpcService, new(services.SayHiService))

	lis, err := net.Listen("tcp", "localhost:11088")
	if err != nil {
		log.Fatalf("net listen err: %v", err)
	}

	err = rpcService.Serve(lis)
	if err != nil {
		log.Fatalf("serve err: %v", err)
	}
}

// type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
func unaryServerInterceptorFirst(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	fmt.Println("unaryServerInterceptor -- 01 before handler")
	resp, err := handler(ctx, req)

	fmt.Println("unaryServerInterceptor -- 01 after handler")

	return resp, err
}

// type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
func streamServerInterceptorFirst(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	fmt.Println("streamServerInterceptor -- 01 before handler")

	err := handler(srv, ss)

	fmt.Println("streamServerInterceptor -- 01 after handler")

	return err
}

上面我实现了一个 unary interceptor unaryServerInterceptorFirst,和一个 stream interceptor streamServerInterceptorFirst

然后使用 grpc.UnaryInterceptorgrpc.StreamInterceptor 做一次转换后,以参数的形式传入 NewServer 即可

rpcService := grpc.NewServer(grpc.UnaryInterceptor(unaryServerInterceptorFirst), grpc.StreamInterceptor(streamServerInterceptorFirst))

此时使用当客户端发起请求后,可以看到如下输出

go run main.go
unaryServerInterceptor -- 01 before handler
unaryServerInterceptor -- 01 after handler

这里的例子比较简单,在实战中,如果你有用户权限认证这种需求,使用拦截器实现是一个不错的主意

上面例子中我们定义的是单个拦截器, 下面是一个定义多个拦截器的例子

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	interceptor "grpcinterceptor/proto"
	"grpcinterceptor/services"
	"log"
	"net"
)

func main() {
	var serverInterceptors = ServerInterceptors{
		UnaryInterceptors:  []grpc.UnaryServerInterceptor{unaryServerInterceptorFirst, unaryServerInterceptorSecond},
		StreamInterceptors: []grpc.StreamServerInterceptor{streamServerInterceptorFirst},
	}

	rpcService := grpc.NewServer(grpc.ChainUnaryInterceptor(serverInterceptors.UnaryInterceptors...),
		grpc.ChainStreamInterceptor(serverInterceptors.StreamInterceptors...))

	interceptor.RegisterSayHiServiceServer(rpcService, new(services.SayHiService))

	lis, err := net.Listen("tcp", "localhost:11088")
	if err != nil {
		log.Fatalf("net listen err: %v", err)
	}

 	err = rpcService.Serve(lis)
 	if err != nil {
 		log.Fatalf("serve err: %v", err)
	}
}

type ServerInterceptors struct {
	// 服务端 Unary interceptor
	UnaryInterceptors []grpc.UnaryServerInterceptor
	// 服务端 Stream interceptor
	StreamInterceptors []grpc.StreamServerInterceptor
}

// type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
func unaryServerInterceptorFirst(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	fmt.Println("unaryServerInterceptor -- 01 before handler")
	resp, err := handler(ctx, req)

	fmt.Println("unaryServerInterceptor -- 01 after handler")

	return resp, err
}

func unaryServerInterceptorSecond(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	fmt.Println("unaryServerInterceptor -- 02 before handler")

	resp, err := handler(ctx, req)

	fmt.Println("unaryServerInterceptor -- 02 after handler")

	return resp, err
}

// type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
func streamServerInterceptorFirst(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	fmt.Println("streamServerInterceptor -- 01 before handler")

	err := handler(srv, ss)

	fmt.Println("streamServerInterceptor -- 01 after handler")

	return err
}

和单拦截器不同点是在传给 NewServer 之前,调用的是 grpc.ChainUnaryInterceptor 和 grpc.ChainStreamInterceptor 做了一次转化

此时 handler 执行时候,输出如下

 go run main.go
unaryServerInterceptor -- 01 before handler
unaryServerInterceptor -- 02 before handler
unaryServerInterceptor -- 02 after handler
unaryServerInterceptor -- 01 after handler

此时的执行顺序是值得注意的,先执行 before,后执行 after, before 按照拦截器的顺序执行,after 则是逆序执行的。

在我们上面的例子中,定义有 unary 、stream 两种拦截器, 从运行结果可以看到只有 unary interceptor 执行了,stream interceptor 没有执行,我相信大家已经知道原因了,因为我们定义的 handler 只有 unary 的

客户端的拦截器的定义和使用和服务端类似,大家去尝试一下即可;另外 Github 上也有很多开源拦截器,如 做身份认证的拦截器 可以拿来就用,如果你有好的想法,也可以实现拦截器开源后供大家使用。

例子完整代码 https://gitee.com/wewin11235/grpcinterceptor 运行代码:

cd grpcinterceptor
go run server/main.go
go run client/main.go
2501 次点击
所在节点    Go 编程语言
4 条回复
wewin
2021-05-16 09:23:53 +08:00
最近在阅读 gRPC-Go 的项目源码,渣渣读源码还是很费力,V 友们有什么好的阅读 Go 框架源码的建议吗?
GoLand
2021-05-16 12:17:05 +08:00
拦截器这有什么好看的,不就是 handler 外面套一层吗?

不要为了读代码而读代码,要带有目的的去读,保质不保量。我认为值得阅读的就三点:
1. 代码写的很精妙,学习精妙的代码写法。
2. 框架的架构设计,学习框架的整体设计思路和具体实现。
3. 单个功能点实现,比如 gRPC 的序列化实现。
wewin
2021-05-16 20:18:09 +08:00
@GoLand 感谢大佬的建议 🙏
eudore
2021-05-18 08:35:59 +08:00
源码看了越多约强, 阅读代码的规模也会 1k,5k,10k 不断提升。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/777190

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX