Go语言Grpc Stream的实现
作者:范闲 发布时间:2023-08-07 06:19:23
标签:Go,Grpc Stream
Stream Grpc
在我们单次投递的数据量很大的时候,比如传输一个二进制文件的时候,数据包过大,会造成瞬时传输压力。或者接收方接收到数据后,需要对数据做一系列的处理工作,
比如:数据过滤 -> 数据格式转换 -> 数据求和 ,这种场景非常适合使用stream grpc,
Stream Grpc演示
syntax = "proto3";
package book_stream;
option go_package = "/book_stream";
service HelloStreamService {
rpc BookListStream(BookListStreamRequest) returns (stream BookListStreamResponse){};
rpc CreateBookStream(stream CreateBookStreamRequest) returns (CreateBookStreamResponse){}
rpc FindBookByIdStream(stream FindBookByIdStreamRequest) returns (stream FindBookByIdStreamResponse){}
}
message BookListStreamRequest{
}
message BookListStreamResponse{
BookPoint book = 1;
}
message CreateBookStreamRequest{
BookPoint book = 1;
}
message CreateBookStreamResponse{
repeated BookIdPoint idx = 1;
}
message FindBookByIdStreamRequest{
BookIdPoint idx = 1;
}
message FindBookByIdStreamResponse{
BookPoint book = 1;
}
message BookIdPoint{
uint64 idx = 1;
}
message BookPoint{
uint64 idx = 1;
string name = 2;
float price = 3;
string author = 4;
}
运行protoc --go_out=plugins=grpc:. *.proto
生成脚手架文件
BookListStream
服务端流式RPCCreateBookStream
客户端流式RPCFindBookByIdStream
双向流式RPC
注意,这里只是用作方便演示使用,演示方法都不是线程安全的
服务端server
var port = 8888
func main() {
server := grpc.NewServer()
book_stream.RegisterHelloStreamServiceServer(server, new(HelloStreamServiceImpl))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
if err := server.Serve(lis); err != nil {
panic(err)
}
}
客户端
func main() {
var port = 8888
conn, err := grpc.Dial(fmt.Sprintf(":%d", port), grpc.WithInsecure())
if err != nil {
panic(err)
}
defer conn.Close()
client := book_stream.NewHelloStreamServiceClient(conn)
ctx := context.Background()
if err := createBookStream(ctx, client); err != nil {
panic(err)
}
if err := printBookList(ctx, client); err != nil {
panic(err)
}
if err := getBookListById(ctx, client); err != nil {
panic(err)
}
}
BookListStream
服务器端流式 RPC,显然是单向流,并代指 Server 为 Stream 而 Client 为普通 RPC 请求
简单来讲就是客户端发起一次普通的 RPC 请求,服务端通过流式响应多次发送数据集,客户端 Recv 接收数据集。
server端实现
var bookStore = map[uint64]book_stream.BookPoint{
1: {
Idx: 1,
Author: "程子",
Price: 9.9,
Name: "游戏思维",
},
2: {
Idx: 2,
Author: "丁锐",
Price: 9.9,
Name: "活出必要的锋芒",
},
}
type HelloStreamServiceImpl struct{}
func (HelloStreamServiceImpl) BookListStream(_ *book_stream.BookListStreamRequest, streamServer book_stream.HelloStreamService_BookListStreamServer) error {
for idx, bookPoint := range bookStore {
err := streamServer.Send(&book_stream.BookListStreamResponse{Book: &book_stream.BookPoint{
Idx: idx,
Name: bookPoint.Name,
Price: bookPoint.GetPrice(),
Author: bookPoint.Author,
}})
if err != nil {
return err
}
}
return nil
}
客户端实现
func printBookList(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
req := &book_stream.BookListStreamRequest{}
listStream, err := client.BookListStream(ctx, req)
if err != nil {
return err
}
for true {
resp, err := listStream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
fmt.Printf("%v\n", *resp.Book)
}
return nil
}
CreateBookStream
客户端流式 RPC,单向流,客户端通过流式发起多次 RPC 请求给服务端,服务端发起一次响应给客户端
server端实现
func (HelloStreamServiceImpl) CreateBookStream(server book_stream.HelloStreamService_CreateBookStreamServer) error {
var resList []*book_stream.BookIdPoint
for {
resp, err := server.Recv()
if err == io.EOF {
return server.SendAndClose(&book_stream.CreateBookStreamResponse{Idx: resList})
}
if err != nil {
return err
}
bookStore[resp.Book.Idx] = *resp.Book
resList = append(resList, &book_stream.BookIdPoint{Idx: resp.Book.Idx})
}
}
客户端实现
var newBookStore = map[uint64]book_stream.BookPoint{
3: {
Idx: 3,
Author: "程子1",
Price: 9.9,
Name: "游戏思维1",
},
4: {
Idx: 4,
Author: "丁锐1",
Price: 9.9,
Name: "活出必要的锋芒1",
},
}
func createBookStream(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
stream, err := client.CreateBookStream(ctx)
if err != nil {
return err
}
for _, bookPoint := range newBookStore {
if err := stream.Send(&book_stream.CreateBookStreamRequest{
Book: &bookPoint,
}); err != nil {
return err
}
}
recv, err := stream.CloseAndRecv()
if err != nil {
return err
}
fmt.Println(recv.Idx)
return nil
}
stream.SendAndClose
,它是做什么用的呢?
在这段程序中,我们对每一个 Recv 都进行了处理,当发现 io.EOF
(流关闭) 后,需要将最终的响应结果发送给客户端,同时关闭正在另外一侧等待的 Recv
stream.CloseAndRecv
和 stream.SendAndClose
是配套使用的流方法,
FindBookByIdStream
服务端实现
func (HelloStreamServiceImpl) FindBookByIdStream(streamServer book_stream.HelloStreamService_FindBookByIdStreamServer) error {
for {
resp, err := streamServer.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if book, ok := bookStore[resp.Idx.Idx]; ok {
if err := streamServer.Send(&book_stream.FindBookByIdStreamResponse{Book: &book}); err != nil {
return err
}
}
}
}
客户端实现
func getBookListById(ctx context.Context, client book_stream.HelloStreamServiceClient) error {
stream, err := client.FindBookByIdStream(ctx)
if err != nil {
return err
}
var findList = []uint64{1, 2}
for _, idx := range findList {
err := stream.Send(&book_stream.FindBookByIdStreamRequest{Idx: &book_stream.BookIdPoint{Idx: idx}})
if err != nil {
return err
}
recv, err := stream.Recv()
if err != nil {
return err
}
fmt.Printf("%v\n", recv.Book)
}
if err := stream.CloseSend(); err != nil {
return err
}
return nil
}
来源:https://juejin.cn/post/7110794610653265933
0
投稿
猜你喜欢
- 1。注意用SQL分析器可以看select出来的东西select right(convert(varchar(30),getdate(),12
- 看看下面:function zr4(y)' 准备数据dim z(10)z(1)="ONE&q
- SQL Server 的扩展存储过程,其实就是一个普通的 Windows DLL,只不过按照某种规则实现了某些函数而已。近日在写一个扩展存储
- 使用本文提供的JavaScript脚本,配合Dreamweaver的层和行为的运用,可以在页面中显示可拖动的精美月历。具体制作步骤如下:1、
- jQuery是一个非常优秀的JavaScript 框架,使用简单灵活,同时还有许多成熟的插件可供选择,它可以帮助你在项目中加入一些非常好的效
- PHP xpath() 函数定义和用法xpath()函数运行对 XML 文档的 XPath 查询。如果成功,该函数返回 SimpleXMLE
- 树型结构在我们应用程序中还是很常见的,比如文件目录,BBS,权限设置,部门设置等。这些数据信息都采用层次型结构,而在我们现在的关系型数据库中
- 1. 什么是 CSV 文件CSV(逗号分隔值)文件是使用逗号分隔信息的文本文件。该文件的每一行都是一条数据记录,也就意味着它可以用于以表格的
- 前后端分离前后端分离的好处最大的好处就是前端JS可以做很大部分的数据处理工作,对服务器的压力减小到最小。后台错误不会直接反映到前台,错误接秒
- Oracle中有多种方法可以向数据库或服务器文件系统上载文件,这里主要介绍如下三种:Oracle HTTP Server(OHS)的mod_
- 概述我在教学和实际设计中的一些心得。就不长篇大论了!让大家省些时间,捞干的。尽量通俗易懂。想知道怎样设计标志,就要知道什么是标志?标志:外来
- 使用 IE8 时发现其原生的 JSON 解析器存在 Bug,让我们先用 IE8 打开 DEMO 页面体验下。http://lab.grace
- buffer:下载数据缓冲区,以字节为单位,缺省依赖操作系统 consistent:下载期间所涉及的数据保持read only,缺省为n d
- 使用MySQL,安全问题不能不注意。以下是MySQL提示的23个注意事项:1.如果客户端和服务器端的连接需要跨越并通过不可信任的网络,那么就
- 在这个abc.php文件中写入如下代码。<?php phpinfo(); ?>你将会看到一个网页,网页内容通常,如下图所示:用中
- 1、设置web.config文件。以下为引用的内容:<system.web> ...... <globalization
- Div的浮动+循环(描述的不清楚,请看图)在设计和布局的时候,碰到图片循环问题,碰到间距问题,怎么样让循环的图片每行的起始点跟上边的titl
- 当然有其它工具可以做这件事,但如果客户不允许你在服务器乱装东西时这个脚本就会有用了。 代码如下:DECLARE @tbImportTable
- 直接使用==比较的情况分类说明是否能比较说明基本类型整型( int/uint/int8/uint8/int16/uint16/int32/u
- 阅读系列教程上一篇:FrontPage2002简明教程二:文字与图像的处理通常网页的布局使用到的是FrontPage 2002中的表格和框架