Go并发调用的超时处理的方法
作者:cookedsteak 发布时间:2024-04-28 09:15:40
之前有聊过 golang 的协程,我发觉似乎还很理论,特别是在并发安全上,所以特结合网上的一些例子,来试验下go routine中 的 channel, select, context 的妙用。
场景-微服务调用
我们用 gin(一个web框架) 作为处理请求的工具,需求是这样的:
一个请求 X 会去并行调用 A, B, C 三个方法,并把三个方法返回的结果加起来作为 X 请求的 Response。
但是我们这个 Response 是有时间要求的(不能超过3秒的响应时间),可能 A, B, C 中任意一个或两个,处理逻辑十分复杂,或者数据量超大,导致处理时间超出预期,那么我们就马上切断,并返回已经拿到的任意个返回结果之和。
我们先来定义主函数:
func main() {
r := gin.New()
r.GET("/calculate", calHandler)
http.ListenAndServe(":8008", r)
}
非常简单,普通的请求接受和 handler 定义。其中 calHandler 是我们用来处理请求的函数。
分别定义三个假的微服务,其中第三个将会是我们超时的哪位~
func microService1() int {
time.Sleep(1*time.Second)
return 1
}
func microService2() int {
time.Sleep(2*time.Second)
return 2
}
func microService3() int {
time.Sleep(10*time.Second)
return 3
}
接下来,我们看看 calHandler 里到底是什么
func calHandler(c *gin.Context) {
...
}
要点1--并发调用
直接用 go 就好了嘛~
所以一开始我们可能就这么写:
go microService1()
go microService2()
go microService3()
很简单有没有,但是等等,说好的返回值我怎么接呢?
为了能够并行地接受处理结果,我们很容易想到用 channel 去接。
所以我们把调用服务改成这样:
var resChan = make(chan int, 3) // 因为有3个结果,所以我们创建一个可以容纳3个值的 int channel。
go func() {
resChan <- microService1()
}()
go func() {
resChan <- microService2()
}()
go func() {
resChan <- microService3()
}()
有东西接,那也要有方法去算,所以我们加一个一直循环拿 resChan 中结果并计算的方法:
var resContainer, sum int
for {
resContainer = <-resChan
sum += resContainer
}
这样一来我们就有一个 sum 来计算每次从 resChan 中拿出的结果了。
要点2--超时信号
还没结束,说好的超时处理呢?
为了实现超时处理,我们需要引入一个东西,就是 context,什么是 context ?
我们这里只使用 context 的一个特性,超时通知(其实这个特性完全可以用 channel 来替代)。
可以看在定义 calHandler 的时候我们已经将 c *gin.Context 作为参数传了进来,那我们就不用自己在声明了。
gin.Context 简单理解为贯穿整个 gin 声明周期的上下文容器,有点像是分身,亦或是量子纠缠的感觉。
有了这个 gin.Context, 我们就能在一个地方对 context 做出操作,而其他正在使用 context 的函数或方法,也会感受到 context 做出的变化。
ctx, _ := context.WithTimeout(c, 3*time.Second) //定义一个超时的 context
只要时间到了,我们就能用 ctx.Done() 获取到一个超时的 channel(通知),然后其他用到这个 ctx 的地方也会停掉,并释放 ctx。
一般来说,ctx.Done() 是结合 select 使用的。
所以我们又需要一个循环来监听 ctx.Done()
for {
select {
case <- ctx.Done():
// 返回结果
}
现在我们有两个 for 了,是不是能够合并下?
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- ctx.Done():
fmt.Println("result:", sum)
return
}
}
诶嘿,看上去不错。
不过我们怎么在正常完成微服务调用的时候输出结果呢?
看来我们还需要一个 flag
var count int
for {
select {
case resContainer = <-resChan:
sum += resContainer
count ++
fmt.Println("add", resContainer)
if count > 2 {
fmt.Println("result:", sum)
return
}
case <- ctx.Done():
fmt.Println("timeout result:", sum)
return
}
}
我们加入一个计数器,因为我们只是调用3次微服务,所以当 count 大于2的时候,我们就应该结束并输出结果了。
要点3--并发中的等待
上面的计时器是一种偷懒的方法,因为我们知道了调用微服务的次数,如果我们并不知道,或者之后还要添加呢?
手动每次改 count 的判断阈值会不会太沙雕了?这时候我们就要加入 sync 包了。
我们将会使用的 sync 的一个特性是 WaitGroup。它的作用是等待一组协程运行完毕后,执行接下去的步骤。
我们来改下之前微服务调用的代码块:
var success = make(chan int, 1) // 成功的通道标识
wg := sync.WaitGroup{} // 创建一个 waitGroup 组
wg.Add(3) // 我们往组里加3个标识,因为我们要运行3个任务
go func() {
resChan <- microService1()
wg.Done() // 完成一个,Done()一个
}()
go func() {
resChan <- microService2()
wg.Done()
}()
go func() {
resChan <- microService3()
wg.Done()
}()
wg.Wait() // 直到我们前面三个标识都被 Done 了,否则程序一直会阻塞在这里
success <- 1 // 我们发送一个成功信号到通道中
既然我们有了 success 这个信号,那么再把它加入到监控 for 循环中,并做些修改,删除原来 count 判断的部分。
go func() {
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- success:
fmt.Println("result:", sum)
return
case <- ctx.Done():
fmt.Println("result:", sum)
return
}
}
}()
三个 case,分工明确,一个用来拿服务输出的结果并计算,一个用来做最终的完成输出,一个是超时输出。
同时我们将这个循环监听,也作为协程运行。
至此,所有的主要代码都完成了。下面是完全版
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
)
// 一个请求会触发调用三个服务,每个服务输出一个 int,
// 请求要求结果为三个服务输出 int 之和
// 请求返回时间不超过3秒,大于3秒只输出已经获得的 int 之和
func calHandler(c *gin.Context) {
var resContainer, sum int
var success, resChan = make(chan int), make(chan int, 3)
ctx, _ := context.WithTimeout(c, 3*time.Second)
go func() {
for {
select {
case resContainer = <-resChan:
sum += resContainer
fmt.Println("add", resContainer)
case <- success:
fmt.Println("result:", sum)
return
case <- ctx.Done():
fmt.Println("result:", sum)
return
}
}
}()
wg := sync.WaitGroup{}
wg.Add(3)
go func() {
resChan <- microService1()
wg.Done()
}()
go func() {
resChan <- microService2()
wg.Done()
}()
go func() {
resChan <- microService3()
wg.Done()
}()
wg.Wait()
success <- 1
return
}
func main() {
r := gin.New()
r.GET("/calculate", calHandler)
http.ListenAndServe(":8008", r)
}
func microService1() int {
time.Sleep(1*time.Second)
return 1
}
func microService2() int {
time.Sleep(2*time.Second)
return 2
}
func microService3() int {
time.Sleep(10*time.Second)
return 3
}
上面的程序只是简单描述了一个调用其他微服务超时的处理场景。
实际过程中还需要加很多很多调料,才能保证接口的对外完整性。
来源:https://segmentfault.com/a/1190000017872359


猜你喜欢
- MySql 8.0 对应驱动包匹配MySql 数据库更新为8.0及以上后,对应的应用程序数据库链接驱动包也要更新为8.0版本。否则会报驱动异
- 这时候最好的做法就是按需引入,动态引入组件js和样式,文件load完成后调用callback,运行js。代码还是很简便的 1. 判断文件lo
- 朋友的网站要计算机票的折扣价格,并且在最后的折扣价格上应对个位进行四舍五入,同时在ASP和Javasc
- #小策略,策略逻辑是在金叉时候买进,死叉时候卖出,所谓金叉死叉是两条均线的交叉,当短期均线上穿长期均线为金叉,反之为死叉#下面是策略代码及结
- 抽象工厂模式Abstract Factory Pattern是什么抽象工厂模式是一种创建型模式,它提供了一种创建一系列相关或相互依赖对象的最
- 1.循环删除 #这个是我选中其中的一个分支进行右键清空操作时进行的处理for i in range(self.tree.currentIte
- 下表列出了所有Python语言支持的算术运算符。假设变量a持有10和变量b持有20,则: 例子:试试下面的例子就明白了所有的Pyt
- 一、opencv是什么?OpenCV是一个用于图像处理、分析、机器视觉方面的开源函数库.二、使用步骤1.引入库代码如下:import cv2
- 前言:问题分析:在进行数据库查询的时候,我们都知道索引可以加快数据查询的效率。但是在实际的业务场景下,经常会遇到即使在表中增加了索引,但是同
- 一、Pyecharts简介和安装1、简介Echarts 是一个由百度开源的数据可视化,凭借着良好的交互性,精巧的图表设计,得到了众多开发者的
- 实例如下所示:# -*- coding: utf-8 -*-import xlrddef open_excel(file = 'fi
- 列表推导与生成器表达式当我们创建了一个列表的时候,就创建了一个可以迭代的对象:>>> squares=[n*n for n
- 1. 需求分析我们要把我们的表单组件分成两个部分,一个是item部分,一个是整体的 form 部分,form部分由item和button提交
- ★二维数组的使用方式:先声明或者定义,再赋值1)语法:var 数组名[大小][大小]类型2)比如:var arr[2][3]int[][]两
- 今天,本文向大家推荐20佳国外的脚本下载网站。1- Hot Scripts2- Code Canyon3- User Scripts4- S
- 一、实现代码1.sql-- phpMyAdmin SQL Dump-- version 4.5.1-- http://www.phpmyad
- python菜鸟,每天都要进步一点点。二维元组的例子:A = ((1, 1, 1), (1, 1, 1),(1, 1, 1),(0, 0,
- 如下所示:import numpy as npfrom torchvision.transforms import Compose, ToT
- 看到Python中有个函数名比较奇特,__init__我知道加下划线的函数会自动运行,但是不知道它存在的具体意义..Python中所有的类成
- f-string 格式化f-string 格式化 就是在字符串模板前面加上f,然后占位符使用{} ,里面直接放入对应的数据对象。如下所示f&