Go1.18新特性使用Generics泛型进行流式处理
作者:摇摆的小虎牙 发布时间:2024-02-17 07:12:42
前言
Stream 是一个基于 Go 1.18+ 泛型的流式处理库, 它支持并行处理流中的数据. 并行流会将元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.
GitHub - xyctruth/stream: A Stream library based on Go 1.18+ Generics (Support Parallel Stream)
安装
需要安装 Go 1.18+ 版本
$ go get github.com/xyctruth/stream
在代码中导入它
import "github.com/xyctruth/stream"
基础
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
Filter(func(s string) bool { return s != "b" }).
Map(func(s string) string { return "class_" + s }).
Sort().
Distinct().
ToSlice()
// 需要转换切片元素的类型
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
Filter(func(v int) bool { return v >3 }).
Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
Reduce(func(r string, v string) string { return r + v })
类型约束
any
接受任何类型的元素, 所以不能使用 ==
!=
>
<
比较元素, 导致你不能使用 Sort(), Find()...等函数 ,但是你可以使用 SortFunc(fn), FindFunc(fn)... 代替
type SliceStream[E any] struct {
slice []E
}
stream.NewSlice([]int{1, 2, 3, 7, 1})
comparable
接收的类型可以使用 ==
!=
比较元素, 但仍然不能使用 >
<
比较元素, 因此你不能使用 Sort(), Min()...等函数 ,但是你可以使用 SortFunc(fn), MinFunc()... 代替
type SliceComparableStream[E comparable] struct {
SliceStream[E]
}
stream.NewSliceByComparable([]int{1, 2, 3, 7, 1})
constraints.Ordered
接收的类型可以使用 ==
!=
>
<
, 所以可以使用所有的函数
type SliceOrderedStream[E constraints.Ordered] struct {
SliceComparableStream[E]
}
stream.NewSliceByOrdered([]int{1, 2, 3, 7, 1})
类型转换
有些时候我们需要使用 Map
,Reduce
转换切片元素的类型,但是很遗憾目前 Golang 并不支持结构体的方法有额外的类型参数,所有类型参数必须在结构体中声明。在 Golang 支持之前我们暂时使用临时方案解决这个问题。
// SliceMappingStream Need to convert the type of slice elements.
// - E elements type
// - MapE map elements type
// - ReduceE reduce elements type
type SliceMappingStream[E any, MapE any, ReduceE any] struct {
SliceStream[E]
}
s := stream.NewSliceByMapping[int, string, string]([]int{1, 2, 3, 4, 5}).
Filter(func(v int) bool { return v >3 }).
Map(func(v int) string { return "mapping_" + strconv.Itoa(v) }).
Reduce(func(r string, v string) string { return r + v })
并行
Parallel
函数接收一个 goroutines int
参数. 如果 goroutines>1 则开启并行, 否则关闭并行, 默认流是关闭并行的。
并行会将流中的元素平均划分多个的分区, 并创建相同数量的 goroutine 执行, 并且会保证处理完成后流中元素保持原始顺序.
s := stream.NewSliceByOrdered([]string{"d", "a", "b", "c", "a"}).
Parallel(10).
Filter(func(s string) bool {
// 一些耗时操作
return s != "b"
}).
Map(func(s string) string {
// 一些耗时操作
return "class_" + s
}).
ForEach(
func(index int, s string) {
// 一些耗时操作
},
).ToSlice()
并行类型
First
: 一旦获得第一个返回值,并行处理就结束. For: AllMatch, AnyMatch, FindFuncALL
: 所有元素都需要并行处理,得到所有返回值,然后并行结束. For: Map, FilterAction
: 所有元素需要并行处理,不需要返回值. For: ForEach, Action
并行 goroutines
开启并行 goroutine 数量在面对 CPU 操作与 IO 操作有着不同的选择。
一般面对 CPU 操作时 goroutine 数量不需要设置大于 CPU 核心数,而 IO 操作时 goroutine 数量可以设置远远大于 CPU 核心数.
CPU 操作
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
sort.Ints(newArray(1000)) // 模拟 CPU 耗时操作
})
使用6个cpu核心进行基准测试
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByCPU
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByCPU/no_parallel(0)-6 717 9183119 ns/op
BenchmarkParallelByCPU/goroutines(2)-6 1396 4303113 ns/op
BenchmarkParallelByCPU/goroutines(4)-6 2539 2388197 ns/op
BenchmarkParallelByCPU/goroutines(6)-6 2932 2159407 ns/op
BenchmarkParallelByCPU/goroutines(8)-6 2334 2577405 ns/op
BenchmarkParallelByCPU/goroutines(10)-6 2649 2352926 ns/op
IO 操作
NewSlice(s).Parallel(goroutines).ForEach(func(i int, v int) {
time.Sleep(time.Millisecond) // 模拟 IO 耗时操作
})
使用6个cpu核心进行基准测试
go test -run=^$ -benchtime=5s -cpu=6 -bench=^BenchmarkParallelByIO
goos: darwin
goarch: amd64
pkg: github.com/xyctruth/stream
cpu: Intel(R) Core(TM) i7-8750H CPU @ 2.20GHz
BenchmarkParallelByIO/no_parallel(0)-6 52 102023558 ns/op
BenchmarkParallelByIO/goroutines(2)-6 100 55807303 ns/op
BenchmarkParallelByIO/goroutines(4)-6 214 27868725 ns/op
BenchmarkParallelByIO/goroutines(6)-6 315 18925789 ns/op
BenchmarkParallelByIO/goroutines(8)-6 411 14439700 ns/op
BenchmarkParallelByIO/goroutines(10)-6 537 11164758 ns/op
BenchmarkParallelByIO/goroutines(50)-6 2629 2310602 ns/op
BenchmarkParallelByIO/goroutines(100)-6 5094 1221887 ns/op
项目地址 https://github.com/xyctruth/stream
来源:https://juejin.cn/post/7075227613299474446
猜你喜欢
- 元素是文档结构的基础,在CSS中,每个元素生成了一个包含了元素内容的框(box,也译为“盒子”)。但是不同的元素显示的方式会有所不同,例如&
- 用tensorflow构建简单的线性回归模型是tensorflow的一个基础样例,但是原有的样例存在一些问题,我在实际调试的过程中做了一点自
- flagflag 是Go 标准库提供的解析命令行参数的包。使用方式:flag.Type(name, defValue, usage)其中Ty
- 一.定义表变量DECLARE @T1 table(UserID int , UserName nvarchar(50),CityName n
- ip正则式为:r'(([12][0-9][0-9]|[1-9][0-9]|[1-9])\.){3,3}([12][0-9][0-9]
- 数组的组合主要有:1.水平组合:np.hstack(arr1,arr2) 或 concatenate(arr1,arr2,axis=1)2.
- 利用python进行求解,求解的要求是不能使用python内部封装好的函数例如:maxway1:def findmax(data,n): i
- 昨天有人在群里问图1的边框效果是否能实现。 边框效果图有人给出答案,需要嵌套一个元素实现。我当时粗粗写了个测试页面,但是时间太晚了,也没有细
- 你知道SQL Server这么庞大的企业级数据库服务器产品是如何build出来的吗?这有些相关的数据:每个build 的大小在300GB左右
- 1. 引言Python是一种强大的编程语言,有很多内置的功能来处理文本。然而,有时候,我们需要处理的文本非常复杂,而Python内置的功能可
- 本文实例为大家分享了pytorch实现手写数字图片识别的具体代码,供大家参考,具体内容如下数据集:MNIST数据集,代码中会自动下载,不用自
- 平面设计 常用尺寸 三折页广告 标准尺寸: (A4)210mm x 285mm普通宣传册 标准尺寸: (A4)210mm x 285mm文件
- 1.Http连接基础Http协议承载了互联网上的主要流量,然而说到传输,还要回归到最基本的网络分层模型TCP/IP。TCP/IP是全球计算机
- 解读数据库的嵌套查询的性能explain 是非常重要的性能查询的工具!!!1、嵌套查询首先大家都知道我们一般不提倡嵌套查询或是join查询原
- 蜗牛很慢。蜗牛快递会怎样?答案是:当然也会很慢。但是蜗牛尽了他的全力,为了它的兔子朋友,以生命在奔跑。每天都是24个小时,快的只是速度,却不
- 今天碰到一个很有意思的问题,需要将普通的 Unicode字符串转换为 Unicode编码的字符串,如下:将 \\u9500\\u552e 转
- 这篇文章主要介绍了通过python检测字符串的字母,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可
- location是javascript里边管理地址栏的内置对象,比如location.href就管理页面的url,用location.hre
- 众所周知,Python使用pip方法安装第三方包时,需要从https://pypi.org/资源库中下载,但是会面临下载速度慢,甚至无法下载
- hao123的成功引领了一批的网址站,然而辉煌却是很难复制的,复制了模式却复制不了成功,市场一旦被垄断就很难再超越。网址站的成功也在一定程度