Go并发编程sync.Cond的具体使用
作者:麦超 发布时间:2024-05-13 10:41:00
简介
Go
标准库提供 Cond
原语的目的是,为等待 / 通知场景下的并发问题提供支持。Cond
通常应用于等待某个条件的一组 goroutine
,等条件变为 true
的时候,其中一个 goroutine
或者所有的 goroutine
都会被唤醒执行。
Cond
是和某个条件相关,这个条件需要一组 goroutine
协作共同完成,在条件还没有满足的时候,所有等待这个条件的 goroutine
都会被阻塞住,只有这一组 goroutine
通过协作达到了这个条件,等待的 goroutine 才可能继续进行下去。
这个条件可以是我们自定义的 true/false
逻辑表达式。
但是 Cond
使用的比较少,因为在大部分场景下是可以被 Channel
和 WaitGroup
来替换的。
详细介绍
下面就是 Cond
的数据结构和对外提供的方法,Cond
内部维护了一个等待队列和锁实例。
type Cond struct {
noCopy noCopy
// 锁
L Locker
// 等待队列
notify notifyList
checker copyChecker
}
func NeWCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()
NeWCond:
NeWCond
方法需要调用者传入一个Locker
接口,这个接口就Lock/UnLock
方法,所以我们可以传入一个sync.Metex
对象Signal:允许调用者唤醒一个等待当前
Cond
的goroutine
。如果Cond
等待队列中有一个或者多个等待的goroutine
,则从等待队列中移除第一个goroutine
并把它唤醒Broadcast:允许调用者唤醒所有等待当前
Cond
的goroutine
。如果 Cond 等待队列中有一个或者多个等待的goroutine
,则清空所有等待的goroutine
,并全部唤醒Wait:会把调用者放入
Cond
的等待队列中并阻塞,直到被Signal
或者Broadcast
的方法从等待队列中移除并唤醒
案例:Redis连接池
可以看一下下面的代码,使用了 Cond
实现一个 Redis
的连接池,最关键的代码就是在链表为空的时候需要调用 Cond
的 Wait
方法,将 gorutine
进行阻塞。然后 goruntine
在使用完连接后,将连接返回池子后,需要通知其他阻塞的 goruntine
来获取连接。
package main
import (
"container/list"
"fmt"
"math/rand"
"sync"
"time"
)
// 连接池
type Pool struct {
lock sync.Mutex // 锁
clients list.List // 连接
cond *sync.Cond // cond实例
close bool // 是否关闭
}
// Redis Client
type Client struct {
id int32
}
// 创建Redis Client
func NewClient() *Client {
return &Client{
id: rand.Int31n(100000),
}
}
// 关闭Redis Client
func (this *Client) Close() {
fmt.Printf("Client:%d 正在关闭", this.id)
}
// 创建连接池
func NewPool(maxConnNum int) *Pool {
pool := new(Pool)
pool.cond = sync.NewCond(&pool.lock)
// 创建连接
for i := 0; i < maxConnNum; i++ {
client := NewClient()
pool.clients.PushBack(client)
}
return pool
}
// 从池子中获取连接
func (this *Pool) Pull() *Client {
this.lock.Lock()
defer this.lock.Unlock()
// 已关闭
if this.close {
fmt.Println("Pool is closed")
return nil
}
// 如果连接池没有连接 需要阻塞
for this.clients.Len() <= 0 {
this.cond.Wait()
}
// 从链表中取出头节点,删除并返回
ele := this.clients.Remove(this.clients.Front())
return ele.(*Client)
}
// 将连接放回池子
func (this *Pool) Push(client *Client) {
this.lock.Lock()
defer this.lock.Unlock()
if this.close {
fmt.Println("Pool is closed")
return
}
// 向链表尾部插入一个连接
this.clients.PushBack(client)
// 唤醒一个正在等待的goruntine
this.cond.Signal()
}
// 关闭池子
func (this *Pool) Close() {
this.lock.Lock()
defer this.lock.Unlock()
// 关闭连接
for e := this.clients.Front(); e != nil; e = e.Next() {
client := e.Value.(*Client)
client.Close()
}
// 重置数据
this.close = true
this.clients.Init()
}
func main() {
var wg sync.WaitGroup
pool := NewPool(3)
for i := 1; i <= 10; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
// 获取一个连接
client := pool.Pull()
fmt.Printf("Time:%s | 【goruntine#%d】获取到client[%d]\n", time.Now().Format("15:04:05"), index, client.id)
time.Sleep(time.Second * 5)
fmt.Printf("Time:%s | 【goruntine#%d】使用完毕,将client[%d]放回池子\n", time.Now().Format("15:04:05"), index, client.id)
// 将连接放回池子
pool.Push(client)
}(i)
}
wg.Wait()
}
运行结果:
Time:15:10:25 | 【goruntine#7】获取到client[31847]
Time:15:10:25 | 【goruntine#5】获取到client[27887]
Time:15:10:25 | 【goruntine#10】获取到client[98081]
Time:15:10:30 | 【goruntine#5】使用完毕,将client[27887]放回池子
Time:15:10:30 | 【goruntine#6】获取到client[27887]
Time:15:10:30 | 【goruntine#10】使用完毕,将client[98081]放回池子
Time:15:10:30 | 【goruntine#7】使用完毕,将client[31847]放回池子
Time:15:10:30 | 【goruntine#1】获取到client[31847]
Time:15:10:30 | 【goruntine#9】获取到client[98081]
Time:15:10:35 | 【goruntine#6】使用完毕,将client[27887]放回池子
Time:15:10:35 | 【goruntine#3】获取到client[27887]
Time:15:10:35 | 【goruntine#1】使用完毕,将client[31847]放回池子
Time:15:10:35 | 【goruntine#4】获取到client[31847]
Time:15:10:35 | 【goruntine#9】使用完毕,将client[98081]放回池子
Time:15:10:35 | 【goruntine#2】获取到client[98081]
Time:15:10:40 | 【goruntine#3】使用完毕,将client[27887]放回池子
Time:15:10:40 | 【goruntine#8】获取到client[27887]
Time:15:10:40 | 【goruntine#2】使用完毕,将client[98081]放回池子
Time:15:10:40 | 【goruntine#4】使用完毕,将client[31847]放回池子
Time:15:10:45 | 【goruntine#8】使用完毕,将client[27887]放回池子
注意点
在调用
Wait
方法前,需要先加锁,就像我上面例子中Pull
方法也是先加锁
看一下源码就知道了,因为 Wait
方法的执行逻辑是先将 goruntine
添加到等待队列中,然后释放锁,然后阻塞,等唤醒后,会继续加锁。如果在调用 Wait
前不加锁,但是里面会解锁,执行的时候就会报错。
//
// c.L.Lock()
// for !condition() {
// c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
c.checker.check()
// 添加到等待队列
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
// 阻塞
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
还是
Wait
方法,在唤醒后需要继续检查Cond
条件
就拿上面的 redis
连接案例来进行说明吧,我这里是使用了 for
循环来进行检测。如果将 for
循环改成使用 if
,也就是只判断一次,会有什么问题?可以停下来先想想
上面说了调用者也可以使用 Broadcast
方法来唤醒 goruntine
,如果使用的是 Broadcast
方法,所有的 goruntine
都会被唤醒,然后大家都去链表中去获取 redis
连接了,就会出现部分 goruntine
拿不到连接,实际上没有那么多连接可以获取,因为每次只会放回一个连接到池子中。
// 如果连接池没有连接 需要阻塞
for this.clients.Len() <= 0 {
this.cond.Wait()
}
// 获取连接
ele := this.clients.Remove(this.clients.Front())
return ele.(*Client)
来源:https://juejin.cn/post/7093041338836320292


猜你喜欢
- 当来自应用程序的第一个连接控制锁而第二个连接需要相冲突的锁类型时,将发生阻塞。其结果是强制第二个连接等待,而在第一个连接上阻塞。不管是来自同
- 运行代码框<html><META HTTP-EQUIV="Content-Type" content=
- 当使用for语句循环(迭代)pandas.DataFrame时,简单的使用for语句便可以取得返回列名,因此使用重复使用for方法,便可以获
- 表示文字链接最清楚的方式是“蓝色文字+下划线”,这是在浏览器发展过程中形成的。这个问题大家都说过很多次了,我也曾经说过。然而,这样的规范却总
- 本文实例讲述了php 多继承的几种常见实现方法。分享给大家供大家参考,具体如下:class Parent1 { function
- 前言最近微信小游戏跳一跳大热,自己也是中毒颇久,无奈手残最高分只拿到200分。无意间看到教你用Python来玩微信跳一跳一文,在电脑上利用a
- 网上看到一些例子,对于一个简单的3 级联动,都加上什么Struts, Hibernate诸如此类的框架。这个Ajax联动殊不知和这些框架有什
- 前言基于 Internet的各种服务系统应运而生,建立商业站点或者功能比较完善的个人站点,常常需要记录访问者的一些信息;论坛作为 Inter
- SQL触发器实例1 定义: 何为触发器?在SQL Server里面也就是对某一个表的一定的操作,触发某种条件,从而执行的一段程序。触发器是一
- 下面这个截图,就是使用 schedule 定时执行 Notebook 的例子import scheduleimport timeimport
- 长话短说,今天介绍实现此功能的一个方法,需要了解的朋友可以参考下:一、JS 重载页面,本地刷新,返回上一页 代码如下:<a href=
- 创建RandomWalk类为模拟随机漫步,我们将创建一个RandomWalk类,随机选择前进方向,这个类有三个属性,一个存储随机漫步的次数,
- 前言今天我看到线性规划模型开头的介绍,特别不错,因此,我把它记录下来了,分享给大家在工程技术、经济管理、科学研究、军事作战训练及日常生活等众
- 一、this指向构造函数实例化对象在上篇文章中,我们提到了使用new和不使用new调用构造函数的区别,如下例:function Benjam
- 什么是.netMicrosoft® .NET 是 Microsoft XML Web services 平台。XML Web
- 概述基于Swoole的websocket服务,再之前的消息系统系列的第4篇,实现了更加复杂的业务场景,是对消息推送的完善和优化,代码本身就是
- 在DOS界面运行python的py文件我用的Notepad++编写代码,编写完后需要在DOS界面运行打开DOS界面按键盘上的WIN+R,输入
- 这篇文章主要介绍了python next()和iter()函数原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学
- 本文实例讲述了Python进程间通信Queue消息队列用法。分享给大家供大家参考,具体如下:进程间通信-QueueProcess之间有时需要
- 一、基本概念Reactive X中有几个核心的概念,先来简单介绍一下。1.1、Observable和Observer(可观察对象和观察者)首