Golang工作池的使用实例讲解
作者:寻找09之夏 发布时间:2024-05-08 10:53:14
一、概念
我们可以将工作池理解为线程池。线程池的创建和销毁非常消耗资源,所以专门写一个pool,每次用过的线程池再放回pool中而不是销毁。不过在Go语言中不会使用系统的线程,而是使用goroutine。gorotine的创建和销毁比系统线程的消耗要小的多,而且goroutine没有标号。所以goroutine的pool就不再时线程池,而是work pool(工作池)。
虽然goroutine的系统消耗较小,但也不能随意在编码时使用go func(),如果程序频繁启动goroutine,会造成极其不可控性能问题。对于可以提前预知的大量异步处理的任务就要考虑使用工作池。
工作池的作用控制goroutine的规模,或者说是goroutine的数量。在Go语言中,控制goroutine的数量最好方式就是使用缓存通道。
二、实例
1.简单示例
下面是Go语言解决工作池的经典用法。
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("worker(%d) start to do job(%d)\n", id, job)
time.Sleep(time.Second)
fmt.Printf("worker(%d) finished job(%d)\n", id, job)
results <- job
}
}
func main() {
// 为了使用我们的工作池,我们需要发送工作和接受工作的结果,
// 这里我们定义两个通道,一个jobs,一个results
jobs := make(chan int, 100)
results := make(chan int, 100)
// 开启3个goroutine
for id := 1; id <= 3; id++ {
go worker(id, jobs, results)
}
// 创建5个任务
for job := 1; job <= 5; job++ {
jobs <- job
}
close(jobs)
// 输出结果
for i := 1; i <= 5; i++ {
<-results
}
}
上述代码工作池思想主要体现在jobs的通道上,因为定义了一个缓存长度为100的通道,所以在通道到100以后,新任务就会阻塞,只有等worker从通道取走一个工作以后才能继续分配新工作。
本案例较为简单,如果worker的数量较大,业务执行时间较长的话,我们需要在程序设计上将jobs和worker的模式进行优化,每个worker处理一项工作,工作池可以自定义最大数量的worker;这样可以保证goroutine的最大数量,可程序更加可控,避免代码消耗压垮系统。
2.读入数据
下面时改良之后代码
1package main
import (
"fmt"
"reflect"
"time"
)
// Job 任务内容
type Job struct {
ID int
Name string
}
// Worker 工作
type Worker struct {
id int // id
WorkerPool chan chan Job // 工作者池(通道的通道),每个元素都是一个job通道, 公共的job
JobChannel chan Job // 工作通道,每个元素是一个job,worker私有的job
exit chan bool // 结束信号
}
var (
MaxWorker = 5 // 最大worker数量
JobQueue = make(chan Job, 5) // 工作通道,模拟需处理的工作
)
// Scheduler 排程中心
type Scheduler struct {
WorkerPool chan chan Job // 工作池
WorkerMaxNum int // 最大工作者数
Workers []*Worker // worker队列
}
// NewScheduler 创建排程中心
func NewScheduler(workerMaxNum int) *Scheduler {
workerPool := make(chan chan Job, workerMaxNum) // 工作池
return &Scheduler{WorkerPool: workerPool, WorkerMaxNum: workerMaxNum}
}
// Start 工作池开始
func (s *Scheduler) Start() {
Workers := make([]*Worker, s.WorkerMaxNum)
for i := 0; i < s.WorkerMaxNum; i++ {
worker := NewWorker(s.WorkerPool, i)
worker.Start()
Workers[i] = &worker
}
s.Workers = Workers
go s.schedule()
}
// Stop 工作池的关闭
func (s *Scheduler) Stop() {
Workers := s.Workers
for _, w := range Workers {
w.Stop()
}
time.Sleep(time.Second)
close(s.WorkerPool)
}
func NewWorker(WorkerPool chan chan Job, id int) Worker {
fmt.Printf("new a worker(%d)\n", id)
return Worker{
id: id,
WorkerPool: WorkerPool,
JobChannel: make(chan Job),
exit: make(chan bool),
}
}
// Start 监听任务和结束信号
func (w Worker) Start() {
go func() {
for {
select {
case job := <-w.JobChannel: // 收到任务
fmt.Println("get a job from private w.JobChannel")
fmt.Println(job)
case <-w.exit: // 收到结束信号
fmt.Println("worker exit", w)
return
}
}
}()
}
func (w Worker) Stop() {
go func() {
w.exit <- true
}()
}
// 排程
func (s *Scheduler) schedule() {
for {
select {
case job := <-JobQueue:
fmt.Println("get a job from JobQueue")
go func(job Job) {
//从WorkerPool获取jobChannel,忙时阻塞
jobChannel := <-s.WorkerPool
fmt.Println("get a private jobChannel from public s.WorkerPool", reflect.TypeOf(jobChannel))
jobChannel <- job
fmt.Println("worker's private jobChannel add one job")
}(job)
}
}
}
func main() {
scheduler := NewScheduler(MaxWorker)
scheduler.Start()
jobQueue()
scheduler.Stop()
}
// 模拟Job任务
func jobQueue() {
for i := 1; i <= 30; i++ {
JobQueue <- Job{ID: i, Name: fmt.Sprintf("Job【%d】", i)}
fmt.Printf("jobQueue add %d job\n", i)
}
}
定义了两个结构体:Task任务和Job工作,Task并没有实质性的内容,这里仅仅定义了一个整型变量;
定义两个全局变量:MaxWorker是最大的worker数量;JobQueue是Job的通道。这两个变量都用于后面的模拟,在真实场景中可以不设置这两个变量。
定义了一个Worker结构体,与上一个简单工作池的示例不同,本例的Worker不再是简单的一个goroutine,而是一个结构体。结构体内定义了如下四个变量。▪id:worker编号。▪exit:这是一个bool类型的通道,当有数据写入时worker结束运行。▪JobChannel:Job类型的通道,该通道是专属于当前worker的私有工作队列。▪WorkerPool:注意看,定义的时候使用了两个Channel,每一个元素是一个Job通道,其实每一个元素是一个JobChannel。
NewWorker方法用于创建一个新的worker,要注意该方法的参数workerPool用于创建worker时传入,这就说明每个worker与其他worker的WorkerPool是共享的,或者说多个worker使用一个WorkerPool。这一点很重要,这是本示例代码在上一个简单示例代码基础上的优化。而JobChannel和exit变量则是随着Worker的新建而新建的。
Worker的Start方法,该方法用于监听任务或者结束信号。Start方法一开始就用goroutine运行一个匿名函数,而函数内部是一个无限循环。在循环内部,首先是把当前的JobChannel注册到WorkerPool里,一旦注册进去也就说明该worker可以接收任务了。然后通过select判断JobChannel是否可以读取,也就是其中是否有Job,或者exit通道是否可以读取。如果JobChannel可读取,证明有Job,后续开始处理Job;而如果exit可读,则结束当前的无限循环。所以,后面的代码中要特别注意对WorkerPool的操作,Worker是从WorkerPool领取工作的。Worker的Stop方法,用于为exit通道写入数据,在Start方法内Worker会读取到写入的数据,进而结束无限循环。
NewScheduler函数用于创建一个Scheduler,可以看到函数内部的WorkerPool是通过make函数新建的,NewWorker函数一样靠参数传入。注意WorkerPool是有缓存通道的,缓存长度是MaxWorkers。
Scheduler的Create方法,该方法根据MaxWorkers最大数创建Worker,并且把引用存入Workers切片。创建好Worker后,马上调用Worker的Start方法,最后通过goroutine运行Schedule方法。Scheduler的Shutdown方法,用于关闭工作池,调用所有worker的Stop方法并且关闭WorkerPool工作池。
Scheduler的Schedule方法,该方法内也是一个无限循环,循环内部就是不停地读取JobQueue,然后运行一个goroutine。在新运行的goroutine内从s.WorkerPool读取一个JobChannel,注意,Worker注册到WorkerPool以后此处才可以读取到,如果WorkerPool的缓存通道内没有JobChannel,则会阻塞,直到读取到JobChannel,才把Job写入。
备注:此文内容来自《Go微服务实战》
来源:https://blog.csdn.net/qq_34272964/article/details/127034590
猜你喜欢
- 我就废话不多说了,直接 上代码吧!import kafka.api.PartitionOffsetRequestInfo;import ka
- 一、图图:数据(张量Tenrsor)+ 操作(节点Operation) (静态)图可以用:1、默认图;2、自定义图。1、默认图查看默认图的方
- 今天因为程序里面要把写入数据库的html源代码以html源编码的形式显示在页面里面,而不要被浏览器解释成。。找了N久都不知道怎么做后来看了一
- Goland 项目创建goland2020.3 及以上 IDE,默认创建的 go 项目 就是使用 gomod 管理!goland2020.3
- 1.安装Oracle Client连接到Oracle的前提是在SQL Server服务器上安装Oracle Client。Oracle Cl
- 简介在很多实际的项目开发中,我们需要实现很多实时功能;而在这篇文章中,我们就利用django channels简单地实现了点对点聊天和消息推
- 什么是Mosaic数据增强方法Yolov4的mosaic数据增强参考了CutMix数据增强方式,理论上具有一定的相似性!CutMix数据增强
- 多路搜索树完全二叉树高度:O(log2N),其中2为对数完全M路搜索树的高度:O(logmN),其中M为对数,树每层的节点数M路搜索树主要用
- 错误:ImportError: libcublas.so.9.0: cannot open shared object file: No s
- 计模式的目的是让代码易维护、易扩展,不能为了模式而模式,因此一个简单的工具脚本是不需要用到任何模式的。简单工厂模式又叫静态工厂方法模式,工厂
- 本文实例为大家分享了微信小程序自定义支持图片的弹窗,供大家参考,具体内容如下为index.wxml添加如下图代码: (微信小程序 - can
- 我们做登录的时候经常会使用到,验证手机号是否正确、向手机发送验证码倒计时60s的问题,我们改如何解决呢?让我们一起来探讨一下吧。如下图:首先
- 在Python中有时会碰到需要一个一维列向量(n*1)与另一个一维列向量(n*1)的转置(1*n)相乘,得到一个n*n的矩阵的情况。但是在p
- 今天继续给大家介绍MySQL相关知识,本文主要内容是MySQL外键约束详解。一、MySQL外键约束作用外键约束(Foreign Key)即数
- 在Python个人博客程序开发实例框架设计中,我们已经完成了 数据库设计、数据准备、模板架构、表单设计、视图函数设计、电子邮件支持 等总体设
- 本文实例讲述了python中栈的原理及实现方法。分享给大家供大家参考,具体如下:栈(stack),有些地方称为堆栈,是一种容器,可存入数据元
- 与上篇实践教程一样,在这篇文章中,我将继续从一种常见的功能——表格入手,展示Vue.js中的一些优雅特性。同时也将对filter功能与com
- 需求:1.用户输入密码正确登录2.用户输入密码错误退出并调用函数继续输入3.用户输入密码符合原先给定的一个值时,允许用户重置密码,并且可以用
- 本文实例讲述了Python多继承原理与用法。分享给大家供大家参考,具体如下:python中使用多继承,会涉及到查找顺序(MRO)、重复调用(
- 在WEB开发中.我们可能都习惯使用下面的代码来获取客户端的IP地址: C#代码 //优先取得 * string IP = Request