grpc 开发进阶 - 使用拦截器 interceptor
现在网上大部分都是 grpc 相关的介绍,真正涉及到 grpc 的配置使用的文章还是比较少的所以本系列着重介绍 grpc 开发时可以能会用到的一些配置拦截器在作用于每一个 RPC 调用,通常用来做日志,认证,metric 等等interfactor 分为两种unary interceptor 拦截 unary(一元) RPC 调用stream interceptor 处理 stre...
现在网上大部分都是 grpc 相关的介绍,真正涉及到 grpc 的配置使用的文章还是比较少的
所以本系列着重介绍 grpc 开发时可以能会用到的一些配置
拦截器在作用于每一个 RPC 调用,通常用来做日志,认证,metric 等等
interfactor 分为两种
- unary interceptor 拦截 unary(一元) RPC 调用
- stream interceptor 处理 stream RPC
client 和 server 端需要单独设置他们的 interceptor
客户端
UnaryClientInterceptor
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
可以通过参数获取 context
, method 名称
,发送的请求
, CallOption
用户可以根据这些信息,来改变调用或者做一些其他处理
interceptor 实现可以分为三步,预处理
,调用(invoker)RPC 方法
,调用后处理
预处理完成后,通过 invoker 来调用 RPC 方法
err := invoker(ctx, method, req, reply, cc, opts...)
调用RPC后,用户同样也可以处理返回的响应和错误
StreamClientInterceptor
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
stream interceptor 是拦截用户在 stream 上的操作,返回给用户自定义的 ClientStream
通过调用streamer
可以获得 ClientStream
, 包装ClientStream
并重载他的 RecvMsg
和 ·SendMsg
方法,即可做一些拦截处理了
最后将包装好的 ClientStream
返回给客户
type wrappedStream struct{
grpc.ClientStream
}
func (w *wrappedStream) RecvMsg(m interface{})error{
log.Printf("Receive a message (Type: %T)", m)
return w.ClientStream.RecvMsg(m)
}
func (w *wrappedStream)SendMsg(m interface{})error{
log.Printf("Send a message (Type: %T)", m)
return w.ClientStream.SendMsg(m)
}
func streamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption)(grpc.ClientStream, error){
// do soming
// return ClientStream
s , err := streamer(ctx, desc, cc, method, opts...)
if err != nil{
return nil, err
}
return &wrappedStream{s}, nil
}
配置拦截器
创建 ClientConn
时,使用 WithUnaryInterceptor 和 WithStreamInterceptor 来设置 interceptor
conn, err := grpc.Dial(
grpc.WithUnaryInterceptor(unaryInterepotr),
grpc.WithStreamInterceptor(streamInterceptor)
)
链式拦截器
创建 ClientConn
时,使用WithChainUnaryInterceptor 和 WithChainStreamInterceptor 可以设置链式的 interceptor
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption
第一个 interceptor
是最外层的,最后一个为最内层
使用WithUnaryInterceptor
, WithChainStreamInterceptor·
添加的interceptor,总是最先执行
服务端
UnaryServerInterceptor
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
处理和作用于 client 类似
func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler)(interface{}, error){
// do somthing
m, err := handler(ctx, req)
if err != nil{
log.Printf("RPC failed: %v", err)
}
return m, err
StreamServerInteceptor
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
与客户端处理类似, 封装 ServerStream
,并覆盖 RecvMsg
,SendMsg
方法
调用 handler 需要传递自定义的 ServerStream
type StreamHandler func(srv interface{}, stream ServerStream) error
配置拦截器
在调用 grpc.NewServer
时通过 UnaryInterceptor, StreamInterceptor 来配置 interceptor
s := grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor)
)
链式拦截器
调用 grpc.NewServer
时,使用ChainUnaryInterceptor 和 ChainStreamInterceptor 可以设置链式的 interceptor
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption
第一个 interceptor
是最外层的,最后一个为最内层
使用UnaryInterceptor
, StreamInterceptor·
添加的interceptor,总是最先执行
example: Interceptor

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)