基于golang的简单分布式延时队列服务的实现
作者:侯盛鑫 发布时间:2024-05-08 10:44:03
一、引言
背景
我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈。但有时也会遇到非实时的任务,比如确定的时间点发布重要公告。或者需要在用户做了一件事情的X分钟/Y小时后,EG:
“PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励”
对其特定动作,比如通知、发券等等。一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延时队列服务。
名词解释
topic_list队列:每一个来的延时请求都应该又一个延时主题参考kafka,在逻辑上划分出一个队列出来每个业务分开处理;
topic_info队列:每一个队列topic都存在一个新的队列里,每次扫描topic信息检测新的topic建立与销毁管理服务协程数量;
offset:当前消费的进度;
new_offset:新消费的进度,预备更迭offset;
topic_offset_lock:分布式锁。
二、设计目标
功能清单
1、延时信息添加接口基于http调用
2、拥有存储队列特性,可保存近3天内的队列消费数据
3、提供消费功能
4、延时通知
性能指标
预计接口的调用量:单秒单类任务数3500,多秒单类任务数1300
压测结果:
简单压测
wrk写入qps:259.3s 写入9000条记录 单线程 无并发
触发性能/准确率:单秒1000,在测试机无延长。单秒3000时,偶尔出现1-2秒延迟。受内存和cpu影响。
三、系统设计
交互流程
时序图
本设计基于http接口调用,当向topic存在的队列中添加消息的时候,消息会被添加到相应topic队列的末尾储存,当添加到不存在的相应topic队列时,首先建立新topic队列,当定时器触发的时候或者分布式锁,抢到锁的实例先获得相应队列的offset,设置新offset,就可以释放锁了让给其他实例争抢,弹出队列头一定数量元素,然后拿到offset段的实例去存储中拿详细信息,在协程中处理,主要协程等待下次触发。然后添加协程去监控触发。
模块划分
1、队列存储模块
1·delay下的delay.base模块,主要负责接收写请求,将队列信息写入存储,不负责backend逻辑,调用存储模块
2、backend模块。delay下的delay.backend模块,负责时间触发扫描对应的topic队列,调用存储模块,主要负责访问读取存储模块,调用callback模块
1·扫描topic添加groutine
2·扫描topic_list消费信息
3·扫描topic_list如果一定时间没有消费到则关闭groutine
3、callback模块,主要负责发送已经到时间的数据,向相应服务通知
3、存储模块
1·分布式锁模块,系统多机部署,保证每次消费的唯一性,对每次topic消费的offset段进行上锁offset到new_offset段单机独享
2·topic管理列表,管理topic数量控制协程数
3·topic_list,消息队列
4·topic_info,消息实体,可能需要回调中会携带一些信息统一处理
4、唯一号生成模块。
五、缓存设计
目前使用全缓存模式
key设计:
topic管理list key: XX:DELAY_TOPIC_LIST type:list
topic_list key: XX:DELAY_SIMPLE_TOPIC_TASK-%s(根据topic分key) type:zset
topic_info key: XX:DELAY_REALL_TOPIC_TASK-%s(根据topic分key) type:hash
topic_offset key: XX:DELAY_TOPIC_OFFSET-%s(根据topic分key) type:string
topic_lock key: xx:DELAY_TOPIC_RELOAD_LOCK-%s(根据topic分key) type:string
六、接口设计
delay.task.addv1 (延时队列添加v1)
请求示例
curl -d
'{
"topic": "xxx", // 业务topic
"timing_moment": , // 单位秒,要定时时刻
"content": "{}"// 消息体,json串
}'
'http://127.0.0.1:xxxx/delay/task/add'
返回示例
{
"dm_error": 0,
"error_msg": "操作成功",
"task_id":112345465765
}
pull回调方式返回(v2不再支持)
请求示例
curl -d
'{
"topic": "xxxx", // 业务topic
"task_id":1324568798765// taskid,选填,有则返回特定消息
}'
'http://127.0.0.1:xxxx/delay/task/pull'
返回示例
{
"dm_error": 0,
"error_msg": "操作成功"
"content":"{"\xxx"\}"
}
delay.task.addv2 (延时队列添加v2)
请求示例
curl -d
'{
"topic": "xxx", // 业务topic
"timing_moment": ,// 单位秒,要定时时刻
"content": "{ // 消息内容(json string)
"sn":"message.call", // 服务发现名字(或为配置服务名)
"url":"/ev/tp/xxxx", // 回调url
"xxx":"xxx" // 其他字段
}"
}'
'http://127.0.0.1:xxxx/delay/task/add'
示例
curl -d '{
"topic":"xxxx_push",
"content":"{
"uid":"111111",
"sn":"other.server",
"url":"/xxxx/callback",
"msg_type":"gift",
}",
"timing_moment":1565700615
}'
http://127.0.0.1:xxxx/delay/task/add
返回示例
{
"dm_error": 0,
"error_msg": "操作成功",
"task_id":112345465765
}
七、MQ设计(v2不再支持)
关于kafka消费方式返回:
topic: delay_base_push
固定返回格式
{
"topic": "xxxx",// 业务topic
"content": "{}"// 单条生产消息content
}
八、其他设计
唯一号设计
调用存储模块,利用redis的自增结合逻辑生成唯一号具体逻辑如下:
func (c *CacheManager) OperGenTaskid() (uint64, error) {
now := time.Now().Unix()
key := c.getDelayTaskIdKey()
reply, err := c.DelayRds.Do("INCR", key)
if err != nil {
log.Errorf("genTaskid INCR key:%s, error:%s", key, err)
return 0, err
}
version := reply.(int64)
if version == 1 {
//默认认为1秒能创建100个任务
c.DelayRds.Expire(key, time.Duration(100)*time.Second)
}
incrNum := version % 10000
taskId := (uint64(now)*10000 + uint64(incrNum))
log.Debugf("genTaskid INCR key:%s, taskId:%d", key, taskId)
return taskId, nil
}
分布式锁设计
func (c *CacheManager) SetDelayTopicLock(ctx context.Context, topic string) (bool, error) {
key := c.getDelayTopicReloadLockKey(topic)
reply, err := c.DelayRds.Do("SET", key, "lock", "NX", "EX", 2)
if err != nil {
log.Errorf("SetDelayTopicLock SETNX key:%s, cal:%v, error:%s", key, "lock", err)
return false, err
}
if reply == nil {
return false, nil
}
log.Debugf("SetDelayTopicLock SETNXEX topic:%s lock:%d", topic, false)
return true, nil
}
九、设计考虑
健壮性
熔断策略:
这版设计中有很多不足之处,当redis不可访问时,请求将大量积压给机器或者实例带来压力,导致其他服务不可用,所以采取降级策略(降级策略也有不足);在请求redis时加入重试,当重试次数多于报警次数,会记录一个原子操作atomic.StoreInt32(&stopFlag,1),其中stopFlag为一个全局的变量,在atomic.LoadInt32(&stopFlag)后,stopFlag的值为1则暂时不请求redis,同时记录当前时间,加入定时器,熔断器分为三个级别,开,关,半开,当定时器结束后stopFlag=2第二个定时将为半开状态计时,有概率访问redis,当成功次数到达阈值stopFlag=0,否则stopFlag=1继续计时
不足
1、调用time定时
通常golang 写循环执行的定时任务大概用三种实现方式:
1、time.Sleep方法:
for {
time.Sleep(time.Second)
fmt.Println("test")
}
2、time.Tick函数:
t1:=time.Tick(3*time.Second)
for {
select {
case <-t1:
fmt.Println("test")
}
}
3、其中Tick定时任务,也可以先使用time.Ticker函数获取Ticker结构体,然后进行阻塞监听信息,这种方式可以手动选择停止定时任务,在停止任务时,减少对内存的浪费。
t:=time.NewTicker(time.Second)
for {
select {
case <-t.C:
fmt.Println("test")
t.Stop()
}
}
在最开始以为sleep是单独处理直接停掉了这个协程,所以第一版用的也是sleep,但是在收集资料后发现这几种方式都创建了timer,并加入了定时任务处理协程。实际上这两个函数产生的timer都放入了同一个timer堆(golang时间轮),都在定时任务处理协程中等待被处理。Tick,Sleep,time.After函数都使用的timer结构体,都会被放在同一个协程中统一处理,这样看起来使用Tick,Sleep并没有什么区别。实际上是有区别的,本文不是讨论golang定时执行任务time.sleep和time.tick的优劣,以后会在后续文章进行探讨。使用channel阻塞协程完成定时任务比较灵活,可以结合select设置超时时间以及默认执行方法,而且可以设置timer的主动关闭,所以,建议使用time.Tick完成定时任务。
2、存储模块问题
目前是全缓存,没有DB参与,首先redis(codis)的高可用是个问题,在熔断之后采取“不作为”的判断也是有问题的,所以对未来展望,首先是:
1·单机的数据结构使用多时间轮。为了减少数据的路程,将load数据的过程异步加载到机器,减少网络io所造成的时间损耗。同时也是减少对redis的依赖
2·引入ZooKeeper或者添加集群备份,leader。保证集群中至少有两台机器load一个topic的数据,leader可以协调消费保证高可用
来源:https://juejin.im/post/5e5792bff265da570b3f2f07


猜你喜欢
- 身为一名小小的程序猿,在日常开发中不可以避免的要和where in和like打交道,在大多数情况下我们传的参数不多简单做下单引号、敏感字符转
- 经常使用到有关数据库的操作。包括连接代码、SQL命令等等,又不曾刻意去记忆它们(我本人是不愿意去记这东东),所以常常在用到的时候又去查书本,
- gchart是基于google图表API的jquery组件。使用gchart可以方便地生成强大的各种图表和报表。基于google图表接口的g
- 一 按时间创建文件源码# 截图方式二# coding=utf-8import osimport time# 当前年月日时分秒时间 2020-
- 前言一个函数就可以接收另一个函数作为参数,简言之,函数的参数能够接收别的函数,这种函数就称之为高阶函数JavaScript 的高阶函数跟 S
- 本文实例讲述了Python3实现的Mysql数据库操作封装类。分享给大家供大家参考,具体如下:#encoding:utf-8#name:mo
- 方式1:静态获取,通过直接解析checkpoint文件获取变量名及变量值通过reader = tf.train.NewCheckpointR
- 本文介绍了Python格式化输出%s和%d的实例案例。分享给大家供大家参考,具体如下:python print格式化输出1. 打印字符串pr
- 车牌识别在高速公路中有着广泛的应用,比如我们常见的电子收费(ETC)系统和交通违章车辆的检测,除此之外像小区或地下车库门禁也会用到,基本上凡
- 就是在mysql命令行登录的时候加上: --pager=more 参数可以使用linux下的more来分页,很好用
- 一、环境Ubuntu 16.04tensorflow 1.4.0keras 2.1.3二、训练数据时报错:ValueError: Error
- 下面是ThoughtBot 的Git使用规范流程。我从中学到了很多,推荐你也这样使用Git。第一步:新建分支首先,每次开发新功能,都应该新建
- MySQL服务器有几个影响其操作的参数(变量)。如果缺省的参数值不合适,可以将其修改为对服务器运行环境更合适的值。例如,如果您有大量的内存,
- Python:2.7 IDE:Pycharm5.0.3 今天遇到一个问题,就是在使用json.load()时,中文字符被转化为Unicode
- 本文实例讲述了Python socket套接字实现C/S模式远程命令执行功能。分享给大家供大家参考,具体如下:一. 前言要求: 使用pyth
- 如下所示:<!doctype html><html lang="en"> <head>
- 本文实例为大家分享了python实现图书管理系统的具体代码,供大家参考,具体内容如下添加新书查询借阅二次添加新书(读取已有的.xls并修改)
- 一、现状Python 有诸多优秀的 Web 开发框架供我们使用,比如Django、Flask、Sanic。正常的情况下,我们基于这些 Web
- 本文实例讲述了Django实现图片文字同时提交的方法。分享给大家供大家参考。具体分析如下:jQuery为我们网站开发解决了很多问题,使我们的
- 在 HTML 中使用JavaScriptJavaScript能以两种方式嵌入HTML:作为语句和函数使用时,用 SCRIPT 标记作为事件处