grpc的stream流模式
grpc的stream流模式stream流模式简介proto文件的定义客户端流模式服务端流模式双向流模式stream流模式简介流模式就是与简单模式对应的,支持个服务之间并发的进行数据交互与收发。他一共有三种基本形式——客户端流模式、服务端流模式、双向流模式。客户端流模式:就是客户端并发的向服务端发送数据,而服务端不停的接收数据而不可以并发向客户端发送数据。服务端流模式:就和客户端流模式相反。双向流
·
grpc的stream流模式
stream流模式简介
- 流模式就是与简单模式对应的,支持个服务之间并发的进行数据交互与收发。他一共有三种基本形式——客户端流模式、服务端流模式、双向流模式。
- 客户端流模式:就是客户端并发的向服务端发送数据,而服务端不停的接收数据而不可以并发向客户端发送数据。
- 服务端流模式:就和客户端流模式相反。
- 双向流模式:就是客户端和服务端都在并发的向另一端发送和接收数据。
proto文件的定义
- 如果服务需要支持流模式,那么在定义proto文件时就需要使用stream关键定义,protoc才会帮我们生成对应的流模式使用的方法。
syntax = "proto3";
option go_package = ".;proto";
/*
grpc的流模式一共有三种,其实就是把客户端还有服务端的消息,变成流的形式传输
*/
service Greeter {
rpc GetStream(StreamReqData) returns (stream StreamResData); //服务端流模式
rpc PutStream(stream StreamReqData) returns (StreamResData); //客户端流模式
rpc AllStream(stream StreamReqData) returns (stream StreamResData); //双向流模式
}
message StreamReqData {
string data = 1;
}
message StreamResData {
string data = 1;
}
客户端流模式
- 我们查看protoc帮我们生成的文件中定义好的PutStream函数来获取流模式传输方式,
//这个就是我们客户端需要使用的传输数据的方法,这里会返回一个流模式用的结构Greeter_PutStreamClient,之后就是使用他的Send()函数来并发的传输流数据
func (c *greeterClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_PutStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Greeter_serviceDesc.Streams[1], "/Greeter/PutStream", opts...)
if err != nil {
return nil, err
}
x := &greeterPutStreamClient{stream}
return x, nil
}
type Greeter_PutStreamClient interface {
Send(*StreamReqData) error
CloseAndRecv() (*StreamResData, error)
grpc.ClientStream
}
type greeterPutStreamClient struct {
grpc.ClientStream
}
func (x *greeterPutStreamClient) Send(m *StreamReqData) error {//里面有一个协程来并发的发送数据
return x.ClientStream.SendMsg(m)
}
- 客户端:通过GreeterClient连接去调用PutStream函数。
//onlyClientStream 客户端的流模式
func onlyClientStream(cli proto.GreeterClient) {
putS, err := cli.PutStream(context.Background()) //拿到流模式生成的对象,然后不停的向服务端发送数据
if err != nil {
panic(err)
}
i := 0
for {
i++
putS.Send(&proto.StreamReqData{Data: fmt.Sprintf("LLL%v", i)})//同过循环来不停的发送数据
if i > 10 {
break
}
time.Sleep(time.Second)
}
}
func main() {
conn, err := grpc.Dial("127.0.0.1:50052", grpc.WithTransportCredentials(insecure.NewCredentials())) //生成连接
if err != nil {
panic("failed to connect server:" + err.Error())
}
defer conn.Close()
cli := proto.NewGreeterClient(conn) //组装成一个客户端连接
//服务端的流模式(上面的代码是通用的)
//onlyServerStream(cli)
//客户端的流模式
//onlyClientStream(cli)
//双向流模式
allServerClientStream(cli)
}
- 服务端:通过我们自己定义的接口(实现了protoc生成的文件中的GreeterServer接口)连接去调用PutStream函数。
//PutStream 客户端的流模式
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
for {
if rec, err := cliStr.Recv(); err != nil {//不停的接收数据
fmt.Println(err)
break
} else {
fmt.Println(rec.Data)
}
}
return nil
}
func main() {
gs := grpc.NewServer() //创建一个服务(返回的就是一个地址)
proto.RegisterGreeterServer(gs, &server{}) //把服务注册上去
lis, err := net.Listen("tcp", PORT)
if err != nil {
panic(err)
}
//把服务和监听绑定在一起
err = gs.Serve(lis)
if err != nil {
panic(err)
}
}
/*
服务端使用的时protoc生成的服务端的PutStream方法,如下
*/
// GreeterServer is the server API for Greeter service.
// All implementations must embed UnimplementedGreeterServer
// for forward compatibility
type GreeterServer interface {
GetStream(*StreamReqData, Greeter_GetStreamServer) error
PutStream(Greeter_PutStreamServer) error
AllStream(Greeter_AllStreamServer) error
MustEmbedUnimplementedGreeterServer()
}
// UnimplementedGreeterServer must be embedded to have forward compatible implementations.
type UnimplementedGreeterServer struct {
}
//这些就是服务端使用方法
func (*UnimplementedGreeterServer) GetStream(*StreamReqData, Greeter_GetStreamServer) error {//使用流式数据会有一个*StreamReqData参数数据
return status.Errorf(codes.Unimplemented, "method GetStream not implemented")
}
func (*UnimplementedGreeterServer) PutStream(Greeter_PutStreamServer) error {
return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
}
func (*UnimplementedGreeterServer) AllStream(Greeter_AllStreamServer) error {
return status.Errorf(codes.Unimplemented, "method AllStream not implemented")
}
服务端流模式
- 其实就是和客户端相反的用法,服务端使用的发送数据的方法GetStream()中拥有流数据参数*StreamReqData,而客户端则没有。客户端只能接收数据
/*
客户端
*/
//onlyServerStream 服务端的流模式
func onlyServerStream(cli proto.GreeterClient) {
res, err := cli.GetStream(context.Background(), &proto.StreamReqData{Data: "LLL"}) //客户端没有使用流模式,传入的参数就和一元模式一致(返回值是流模式自己定义的类型)
if err != nil {
panic(err)
}
//这里我们通过一个循环来不停的接收来自服务端的结果
for {
rec, err := res.Recv() //通过流模式的类型来不停的接收数据
if err != nil {
fmt.Errorf("failed of res: %v", err)
break
}
fmt.Println(rec)
}
}
/*
客户端
*/
//GetStream 服务端流模式,所需要的参数和返回值与一元模式不一样了(StreamReqData流模式数据类型)
func (s *server) GetStream(rep *proto.StreamReqData, res proto.Greeter_GetStreamServer) error {
i := 0
for { //循环向客户端发送数据,次数达到10就停止发送
i++
err := res.Send(&proto.StreamResData{
Data: fmt.Sprintf("%v", time.Now().Unix()),//输出当前时间
})
if err != nil {
panic("service send res failed :" + err.Error())
}
time.Sleep(time.Second) //每发送一次数据就等待一秒钟
if i > 10 {
break
}
}
return nil
}
双向流模式
- 双向流就是两端都就可以并行的发送数据和接收数据。protoc为我们生成的双向流方法。
/*
这是客户端使用的流数据方法
*/
func (c *greeterClient) AllStream(ctx context.Context, opts ...grpc.CallOption) (Greeter_AllStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &_Greeter_serviceDesc.Streams[2], "/Greeter/AllStream", opts...)
if err != nil {
return nil, err
}
x := &greeterAllStreamClient{stream}
return x, nil
}
type Greeter_AllStreamClient interface {
Send(*StreamReqData) error
Recv() (*StreamResData, error)
grpc.ClientStream
}
type greeterAllStreamClient struct {
grpc.ClientStream
}
func (x *greeterAllStreamClient) Send(m *StreamReqData) error {
return x.ClientStream.SendMsg(m)
}
func (x *greeterAllStreamClient) Recv() (*StreamResData, error) {
m := new(StreamResData)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
/*
这是服务端使用的流数据方法
*/
type Greeter_AllStreamServer interface {
Send(*StreamResData) error
Recv() (*StreamReqData, error)
grpc.ServerStream
}
type greeterAllStreamServer struct {
grpc.ServerStream
}
func (x *greeterAllStreamServer) Send(m *StreamResData) error {
return x.ServerStream.SendMsg(m)
}
func (x *greeterAllStreamServer) Recv() (*StreamReqData, error) {
m := new(StreamReqData)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
- 客户端:我们通过设置两个协程来并发的发送和接收数据,如果不使用协程的话,那么就会变顺序执行了。
func allServerClientStream(cli proto.GreeterClient) {
allStr, err := cli.AllStream(context.Background())
if err != nil {
fmt.Errorf("failed to get allClient:%v", err)
}
//先开两个协程
wg := sync.WaitGroup{}
wg.Add(2)
go func() { //这个协程不停接收来自服务端的数据
defer wg.Done()
for {
rec, err := allStr.Recv()
if err != nil {
fmt.Errorf("failed to get server data:%v", err)
}
fmt.Println("Data from:", rec.Data)
}
}()
go func() { //这个协程不停的向服务端发送数据
defer wg.Done()
for {
allStr.Send(&proto.StreamReqData{
Data: "I am Client",
})
time.Sleep(time.Second)
}
}()
wg.Wait()
}
- 服务端:其实在前面两个类型做完之后(客户端流和服务端流)我们就可以猜到双向流是如何实现的。同样是两个协程。
//AllStream 双向流模式(支持客户端和服务器并行收发数据)
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
//这里我们先起两个协程
wg := sync.WaitGroup{}
wg.Add(2)
go func() { //这个协程不停的接收来自客户端的数据
defer wg.Done() //在WaitGroup中减掉该协程
for {
rec, err := allStr.Recv()
if err != nil {
fmt.Errorf("failed to req recv:%v", err)
}
fmt.Println("Data from:" + rec.Data)
}
}()
go func() { //这个协程不停的向服务端发送数据
defer wg.Done()
for {
allStr.Send(&proto.StreamResData{ //发送我们定义好类型的数据
Data: "I am Server",
})
time.Sleep(time.Second) //间隔一秒发送一次
}
}()
wg.Wait() //等待WaitGroup中的协程数为0
return nil
}

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)