go实现grpc四种数据流模式
作者:Jeff的技术栈 发布时间:2024-02-01 15:51:56
标签:go,grpc,流模式,数据流
1. 什么是数据流
grpc中的stream,srteam顾名思义就是一种流,可以源源不断的推送数据,很适合传输一些大数据,或者服务端和客户端长时间数据交互,比如客户端可以向服务端订阅一个数据,服务端就可以利用stream,源源不断地推送数据。
底层还原成socket编程
2. grpc的四种数据流
1.简单模式
2.服务端数据流模式(Server-side streaming RPC)
3.客户端数据流模式(Client-side streaming RPC)
4.双向数据流模式(Bidirectional streaming RPC)
2.1 简单模式
这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的RPC没有什么大的区别,上两篇中介绍此模式。
2.2 服务端数据流模式
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端
2.3 客户端数据流模式
与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。
2.4 双向数据流
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。
3. 上代码
3.1 代码目录
3.2 编写stream.proto文件
stream是常量,写在哪一边,哪一边就是数据流
syntax = "proto3";
option go_package = "./;proto";
service Greeter {
// 定义方法,stream是常量,流模式
rpc ServerStream (StreamRequestData) returns (stream StreamResponseData); //服务端流模式,拉消息
rpc ClientStream (stream StreamRequestData) returns (StreamResponseData); //客户端流模式,推消息
rpc AllStream (stream StreamRequestData) returns (stream StreamResponseData); //双向流模式,能推能拉
}
message StreamRequestData {
string data = 1; //编号
}
message StreamResponseData {
string data = 1; //编号
}
生成go的protobuf文件命令:
cd到proto目录下
命令:protoc -I . hello.proto --go_out=plugins=grpc:.
3.3 编写server文件
package main
import (
"file_test/grpc_go_stream/proto"
"fmt"
"net"
"sync"
"time"
"google.golang.org/grpc"
)
const port = 8082
type server struct{}
func (s *server) ServerStream(req *proto.StreamRequestData, res proto.Greeter_ServerStreamServer) error {
i := 0
for {
i++
//业务代码
_ = res.Send(&proto.StreamResponseData{
Data: fmt.Sprintf("这是发给%s的数据流", req.Data),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
return nil
}
func (s *server) ClientStream(cliStr proto.Greeter_ClientStreamServer) error {
for {
//业务代码
res, err := cliStr.Recv()
if err != nil {
fmt.Println("本次客户端流数据发送完了:",err)
break
}
fmt.Println("客户端发来消息:",res.Data)
}
return nil
}
func (s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
wg:=sync.WaitGroup{}
wg.Add(2)
//接受客户端消息的协程
go func() {
defer wg.Done()
for {
//业务代码
res, err := allStr.Recv()
if err != nil {
fmt.Println("本次客户端流数据发送完了:",err)
break
}
fmt.Println("收到客户端发来消息:",res.Data)
}
}()
//发送消息给客户端的协程
go func() {
defer wg.Done()
i := 0
for {
i++
//业务代码
_ = allStr.Send(&proto.StreamResponseData{
Data: fmt.Sprintf("这是发给客户端的数据流"),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}()
wg.Wait()
return nil
}
// 启动
func start() {
// 1.实例化server
g := grpc.NewServer()
// 2.注册逻辑到server中
proto.RegisterGreeterServer(g, &server{})
// 3.启动server
lis, err := net.Listen("tcp", "127.0.0.1:8082")
if err != nil {
panic("监听错误:" + err.Error())
}
err = g.Serve(lis)
if err != nil {
panic("启动错误:" + err.Error())
}
}
func main() {
start()
}
3.4 编写client文件
package main
import (
"context"
"file_test/grpc_go_stream/proto"
"fmt"
"sync"
"time"
"google.golang.org/grpc"
)
var rpc proto.GreeterClient
func serverStreamDemo() {
//服务端流模式
res,err:=rpc.ServerStream(context.Background(),&proto.StreamRequestData{Data: "jeff"})
if err != nil {
panic("rpc请求错误:"+err.Error())
}
for {
data,err:=res.Recv() //
if err != nil {
fmt.Println("客户端发送完了:",err)
return
}
fmt.Println("客户端返回数据流值:",data.Data)
}
}
func clientStreamDemo() {
//客户端流模式
cliStr, err := rpc.ClientStream(context.Background())
if err != nil {
panic("rpc请求错误:" + err.Error())
}
i := 0
for {
i++
_ = cliStr.Send(&proto.StreamRequestData{
Data: "jeff",
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}
func clientAndServerStreamDemo() {
//双向流模式
allStr, _ := rpc.AllStream(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
//接受服务端消息的协程
go func() {
defer wg.Done()
for {
//业务代码
res, err := allStr.Recv()
if err != nil {
fmt.Println("本次服务端流数据发送完了:", err)
break
}
fmt.Println("收到服务端发来消息:", res.Data)
}
}()
//发送消息给服务端的协程
go func() {
defer wg.Done()
i := 0
for {
i++
//业务代码
_ = allStr.Send(&proto.StreamRequestData{
Data: fmt.Sprintf("这是发给服务端的数据流"),
})
time.Sleep(time.Second * 1)
if i > 10 {
break
}
}
}()
wg.Wait()
}
// 启动
func start() {
conn, err := grpc.Dial("127.0.0.1:8082", grpc.WithInsecure())
if err != nil {
panic("rpc连接错误:" + err.Error())
}
defer conn.Close()
rpc = proto.NewGreeterClient(conn) //初始化
serverStreamDemo() //服务端流模式
clientStreamDemo() //客户端流模式
clientAndServerStreamDemo() // 双向流模式
}
func main() {
start()
}
来源:https://www.cnblogs.com/guyouyin123/p/16135335.html
0
投稿
猜你喜欢
- Python的3.0版本,常被称为Python 3000,或简称Py3k。相对于Python的早期版本,这是一个较大的升级。为了不带入过多的
- 本文实例讲述了python中查看变量内存地址的方法。分享给大家供大家参考。具体实现方法如下:这里可以使用id>>> pri
- 根据导师作业安排,在学习数字图像处理(刚萨雷斯版)第六章 彩 * 像处理 中的彩色模型后,导师安排了一个比较有趣的作业:融合原理为:1 注意:
- Simple Nested-Loop Join我们来看一下当进行 join 操作时,mysql是如何工作的。常见的 join 方式有哪些?如
- matplotlib是功能十分强大的绘制二维图形的Python模块,它用Python语言实现了MATLAB画图函数的易用性,同时又有非常强大
- 安装selenium打开命令控制符输入:pip install -U selenium火狐浏览器安装firebug:www.firebug.
- 编码规范Python 编码规范重要性的原因用一句话来概括就是:统一的编码规范可以提高开发效率。无论你是 编程者,还是 阅读者,好的规范能让你
- 为什么需要线程锁当我们访问一些特殊的数据时,需要保证该数据的原子性,比如: 文章的阅读量、文章的点赞量等。我们必须要确保这些共享数据必须是原
- 第一部分 关于requests库(1) requests是一个很实用的Python HTTP客户端库,编写爬虫和测试服务器响应数据时经常会用
- 正在看的ORACLE教程是:Oracle不同数据库间对比分析脚本。Oracle数据库开发应用中经常对数据库管理员有这样的需求,对比两个不同实
- 本文实例为大家分享了python图书管理系统的具体代码,供大家参考,具体内容如下实现语言:python图形框架:DTK+2.0数据库框架:S
- 上篇介绍的使用python自带tkinter包,来写带界面的工具。此篇介绍使用pyqt来开发测试工具。tkinter的好处是python官方
- 在Mysql 中删除数据以及数据表非常的容易,但是需要特别小心,因为一旦删除所有数据都会消失。删除数据删除表内数据,使用delete关键字。
- 在MySQL 8.0.16版本中安装可能会出现部分错误提示已经不使用“UTF8B3”而是使用了“UTF8B4”#//////////////
- 最近和Sobin在做一个精品课程的项目,因为用到一个固定的id作为表间关联,所以在前一个表插入数据后要把插入数据生成的自增id传递给下一个表
- #! /usr/bin/env python ##python2.7-批量下载壁纸 ##壁纸来自桌酷网站,所有权归属其网站 ##本代码仅做为
- Python是一种面向对象的解释型计算机程序设计语言。Python是纯粹的自由软件, 源代码和解释器CPython遵循 GPL(GNU Ge
- scrapy是目前python使用的最广泛的爬虫框架架构图如下解释:Scrapy Engine(引擎): 负责Spider、ItemPipe
- 对于时间的选择问题,查到的大部分为两种情况:1.存在readonly属性的2.没有readonly属性的可直接赋值send_keys()测试
- 一、问题这两天在学习使用flask + SQLAlchemy 定制一个web查询页面的demo ,在测试时,发现查询到的结果显示乱码 。这里