Golang实现带优先级的select
作者:jxwu 发布时间:2024-04-26 17:36:41
背景
在 Golang 里面,我们经常使用 channel 进行协程之间的通信。这里有一个经典的场景,也就是生产者消费者模式,生产者协程不断地往 Channel 里面塞元素,而消费者协程不断地消费这些元素。
写成代码就是如下:
package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch := make(chan int, 10)
var wg sync.WaitGroup
wg.Add(2)
go producer(ch, &wg)
go consumer(ch, &wg)
wg.Wait()
}
// 生产者
func producer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
i := 0
for {
select {
case ch <- i:
default:
// 丢弃
log.Println("discard")
}
i++
time.Sleep(time.Second)
}
}
// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
consume := func(i int) {
fmt.Println(i)
time.Sleep(time.Millisecond * 700)
}
for {
i := <-ch
consume(i) // 消费元素
}
}
生产者不断产生元素,消费者消费元素。生产者不会等待消费者消费完毕(不然可能影响其他任务),如果 channel 已经满了,也就是说明消费者消费不过来,生产者就会丢弃这个任务。
生产者平均一秒生成1个,消费者0.7秒消费一个。正常情况下消费者是消费得过来的,然而很多时候消费者协程还需要做一些定时任务,比如一些定时清理工作。假如这个清理工作每2秒触发一次,清理时间一般需要1.5秒,也就是如果每次都做每一秒有0.75秒会被清理工作占有了,但是它不是一定要非常及时的,可以等空闲时再进行。 如下代码:
// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
t := time.NewTicker(time.Second * 2)
consume := func(i int) {
fmt.Println(i)
time.Sleep(time.Millisecond * 700)
}
clear := func() {
fmt.Println("clear")
time.Sleep(time.Millisecond * 1500)
}
for {
select {
case i := <-ch:
consume(i) // 消费元素:
case <-t.C:
clear() // 清理
}
}
}
运行程序到第15秒的时候,生产者发现 channel满了,于是开始丢包:
0
1
clear
2
3
4
5
6
clear
7
clear
8
clear
9
clear
clear
10
clear
11
12
13
14
clear
15
clear
clear
discard
16
clear
discard
discard
解决方案
既然清理任务的优先级并不高,那么它就不应该阻塞消费元素流程,而是应该在空闲时才去执行。由于 Golang 里面,如果 select 两个 case 都同时满足,会随机选一个执行,因此第一想到的可能会使用如下代码实现优先级case:
// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
t := time.NewTicker(time.Second * 2)
consume := func(i int) {
fmt.Println(i)
time.Sleep(time.Millisecond * 700)
}
clear := func() {
fmt.Println("clear")
time.Sleep(time.Millisecond * 1500)
}
for {
select {
case i := <-ch:
consume(i) // 消费元素
continue // 可能还有元素,不走清理逻辑
default:
}
// 没有元素才走清理逻辑
select {
case <-t.C:
clear() // 清理
default:
}
}
}
如果运行这个程序,可以发现它能够满足优先级的需求,先消费元素,空闲时再执行清理任务。
然而,在没有元素可以消费,也没有清理任务可以执行的时候,这里的for将会不断地循环,浪费CPU资源。
其实,可以使用下面的方法实现优先级case,它能够在没有元素就绪的时候阻塞在 select,而不是不断循环:
// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
t := time.NewTicker(time.Second * 2)
consume := func(i int) {
fmt.Println(i)
time.Sleep(time.Millisecond * 700)
}
clear := func() {
fmt.Println("clear")
time.Sleep(time.Millisecond * 1500)
}
for {
select {
case i := <-ch:
consume(i) // 消费元素
case <-t.C:
priority:
for { // 清理前先把元素消费完
select {
case i := <-ch:
consume(i) // 消费元素
default:
break priority // 注:这里会跳过这个循环,而不是再次执行
}
}
clear() // 清理
}
}
}
这里的关键是在触发清理case的时候,先去把channel里面的元素消费完,再进行清理,从而保证能够留下足够的channel缓冲区给生产者放置生产的元素。
一个封装
上面那段优先级case代码其实挺常用的,但是几乎都是模板代码,特别是需要在两个地方写consume(i)
,因此我们可以封装一下这段代码,方便使用,减少出错:
// 优先级select ch1 的任务先执行完毕后才会执行 ch2 里面的任务
func PrioritySelect[T1, T2 any](ch1 <-chan T1, f1 func(T1), ch2 <-chan T2, f2 func(T2)) {
for {
select {
case a := <-ch1:
f1(a)
case b := <-ch2:
priority:
for {
select {
case a := <-ch1:
f1(a)
default:
break priority
}
}
f2(b)
}
}
}
这样,我们的消费者代码就可以简化为:
// 消费者
func consumer(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
t := time.NewTicker(time.Second * 2)
consume := func(i int) {
fmt.Println(i)
time.Sleep(time.Millisecond * 700)
}
clear := func(time.Time) {
fmt.Println("clear")
time.Sleep(time.Millisecond * 1500)
}
PrioritySelect(ch, consume, t.C, clear)
}
来源:https://juejin.cn/post/7217082232938053693


猜你喜欢
- 本文实例总结了python选择排序算法。分享给大家供大家参考。具体如下:代码1:def ssort(V):#V is the list to
- 如何通过PHP实现Des加密算法代码实例注:php7以上不支持了,因为php7去掉了某些函数, 另外变量的{}要改为[]<?phpcl
- 实现代码/// <summary>/// 去除HTML标记/// </summary&
- 初学python,对python的对齐很重视,为了防止出错,使用spyder工具提供的功能下面是方法:1、首先打开Tools菜单栏下的Pre
- 1 , javascript字符集:javascript采用的是Unicode字符集编码。为什么要采用这个编码呢?原因很简单,16位的Uni
- 对于英文文本分句比较简单,只要根据终结符"."划分就好,中文文本分句看似很简单,但是实现时会遇到很多麻烦,尤其是处理社交
- numpy.flip(m, axis=None)Reverse the order of elements in an array alon
- alert table 表名 add column 列名 alter table 表名 drop column 列名 eg: alter t
- 本文实例为大家分享了python编写决策树源代码,供大家参考,具体内容如下因为最近实习的需要,所以用python里的sklearn包重新写了
- 来源 | OpenCV学堂作者 | gloomyfish基本思路选择以前我用过Caffe,用过tensorflow,最近
- 首先说一下 我在form表单里面遇见的坑:1.例如我要给后台传的不是对象,而是一个数组,怎么写验证?2.比如我有四个弹出框,都要做验证,这个
- 一、Pythont如何打开 txt 格式的文件?1.首先我使用pycharm创建一个项目,然后在这个项目里面再创建一个python的包,然后
- 本文实例讲述了Python接收Gmail新邮件并发送到gtalk的方法。分享给大家供大家参考。具体实现方法如下:#!/usr/bin/env
- 1.变量的输入:input函数:input()input("请输入银行卡密码")password = input(&qu
- 在日常应用中,往往根据实际需求录入一些值,而这些值不能直接使用,所以Sql中经常会对字段值进行一些常规的处理。这里搜集了(提取数字、英文、中
- 前言:python虽然是一门'慢语言',但是也有着比较多的性能检测工具来帮助我们优化程序的运行效率。这里总结了五个比较好的p
- 前言相信大家在最近的chatGPT的注册或者使用过程中都遇到了很多很多的报错,接下来的内容是关于chatGPT不管是注册还是使用过程中所有报
- 本文实例讲述了python单向链表的基本实现与使用方法。分享给大家供大家参考,具体如下:# -*- coding:utf-8 -*-#! p
- WSGI协议首先弄清下面几个概念:WSGI:全称是Web Server Gateway Interface,WSGI不是服务器,python
- 分析我们将添加、插入、删除定义为:添加 : 在字符串的后面或者前面添加字符或者字符串插入 : 在字符串之间插入特定字符在Python中,字符