PHP简易延时队列的实现流程详解
作者:i_zane 发布时间:2023-05-29 23:02:48
标签:PHP,延时,队列
需求说明
当用户申请售后,商家未在n小时内处理,系统自动进行退款。
商家拒绝后,用户可申请客服介入,客服x天内超时未处理,系统自动退款。
用户收到货物,x天自动确认收货
等等需要延时操作的流程……
设计思路
设计一张队列表,记录所有队列的参数,执行状态,重试次数
将创建队列的
id
存于redis
中,使用zset
有序集合。按照时间戳进行排序使用
croontab
定时任务每分钟执行一次
实现
新建队列表
CREATE TABLE `delay_queue` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`params` varchar(512) DEFAULT NULL,
`message` varchar(255) DEFAULT '' COMMENT '执行结果',
`ext_string` varchar(255) DEFAULT '' COMMENT '扩展字符串,可用于快速检索。取消该队列',
`retry_times` int(2) DEFAULT '0' COMMENT '重试次数',
`status` int(2) NOT NULL DEFAULT '1' COMMENT '1 待执行, 10 执行成功, 20 执行失败,30取消执行',
`created` datetime DEFAULT NULL,
`modified` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `ext_idx` (`ext_string`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
部分队列的操作方法,新增队列、取消队列、队列执行成功、队列执行失败、队列重试【重试时间间隔抄的微信支付的异步通知时间】
class DelayQueueService
{
// 重试时间,最大重试次数 15
private static $retryTimes = [
15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60,
3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60,
];
/**
* @description 增加队列至redis
* @param $queueId
* @param int $delay 需要延迟执行的时间。单位秒
* @return void
*/
public function addDelayQueue($queueId, int $delay)
{
$time = time() + $delay;
$redis = RedisService::getInstance();
$redis->zAdd("delay_queue_job", $time, $queueId);
}
// 取消redis 队列
public function cancelDelayQueue($ext)
{
$row = $query->getRow(); // 使用ext_string 快速检索到相应的记录
if ($row) {
$redis = RedisService::getInstance();
$redis->zRem('delay_queue_job', $row->id);
$row->status = DelayQueueTable::STATUS_CANCEL;
$table->save($row);
}
}
/**
* @description 执行成功
* @return void
*/
public static function success($id, $message = null)
{
$table->update([
'status' => DelayQueueTable::STATUS_SUCCESS,
'message' => $message ?? '',
'modified' => date('Y-m-d H:i:s'),
], [
'id' => $id,
]);
}
/**
* @description 执行失败
* @return void
*/
public static function failed($id, $message = null)
{
$table->updateAll([
'status' => DelayQueueTable::STATUS_FAILED,
'message' => $message ?? '',
'modified' => date('Y-m-d H:i:s'),
], [
'id' => $id,
]);
}
/**
* @description 失败队列重试,最大重试15次
* @param $id
* @return void
*/
public static function retry($id)
{
$info = self::getById($id);
if (!$info) {
return;
}
$retryTimes = ++$info['retry_times'];
if ($retryTimes > 15) {
return;
}
$entity = [
'params' => $info['params'],
'ext_string' => $info['ext_string'],
'retry_times' => $retryTimes,
];
$queueId = $table->save($entity);
self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]);
}
}
在命令行进行任务的运行
public function execute(Arguments $args, ConsoleIo $io)
{
$startTimestamp = strtotime("-1 days");
$now = time();
$redis = RedisService::getInstance();
$queueIds = $redis->zRangeByScore('delay_queue_job', $startTimestamp, $now);
if ($queueIds) {
foreach ($queueIds as $id) {
$info = // 按照队列id 获取相应的信息
if ($info['status'] === DelayQueueTable::STATUS_PADDING) {
$params = unserialize($info['params']); // 创建记录的时候,需要试用serialize 将类名,方法,参数序列化
$class = $params['class'];
$method = $params['method'];
$data = $params['data'];
try {
call_user_func_array([$class, $method], [$data]);
$redis->zRem('delay_queue_job', $id);
$msg = date('Y-m-d H:i:s') . " [info] success: $id";
DelayQueueService::success($id, $msg);
$io->success($msg);
} catch (Exception $e) {
$msg = date('Y-m-d H:i:s') . " [error] {$e->getMessage()}";
DelayQueueService::failed($id, $msg);
// 自定义异常code,不进行队列重试
if (10000 != $e->getCode()) {
DelayQueueService::retry($id);
}
$io->error($msg);
}
}
}
}
}
最后说点
我这边的系统对实时性要求不高,所以直接使用的是
linux
的crond
服务,每分钟运行一次。如需精确到秒级,可写一个shell
,一分钟循环执行<=60
次因为目前的数据较少,延时队列加入的只有小部分。所以就在
command
里面直接执行更新操作了,后期如果队列多,且有比较耗时的操作,可考虑把耗时操作单独放置一个队列中。本方法只用于将数据塞进队列。
附上 shell
脚本 一分钟执行60次
#!/bin/bash
step=2 #间隔的秒数,不能大于60
for (( i = 0; i < 60; i=(i+step) )); do
echo $i # do something
sleep $step
done
来源:https://blog.csdn.net/qq_39059866/article/details/126470332
0
投稿
猜你喜欢
- 原来是在系统上出了问题.是2003的IIS出现了问题,因为是2003的系统,它对ASP的上传文件做出了200K的限制,解决问题方法如下 :
- 本文实例讲述了nodejs简单实现TCP服务器端和客户端的聊天功能。分享给大家供大家参考,具体如下:服务器端var net = requir
- Python 读取 .gz 文件读取.gz 文件需要使用gzip 包,如果没有安装可以自行在终端安装pip install gzipimpo
- 自动化测试执行过程中,难免会有错误/异常出现,比如测试脚本没有发现对应元素,则会立刻抛出NoSuchElementException异常。这
- 引入:if-else的作用,满足一个条件做什么,否则做什么。if-else语句语法结构if 判断条件:要执行的代码else:要执行的代码判断
- 1. 编码问题:遇到了几个字符串转换问题,总结如下:# str to bytes str.encode(s)# bytes to str b
- HTTP(HyperTextTransferProtocol)是超文本传输协议的缩写,它用于传送WWW方式的数据,关于HTTP协议的详细内容
- asp连接sql 第一种写法: 代码如下: MM_conn_STRING = "Driver={SQL Server};serv
- SQL登录时如果采用windows集成身份验证,登录框将会以“机器名\当前系统用户名”的格式显示登录名,而且登录名和密码都是灰色的,不允许用
- 可能某次不小心改了配置文件,导致无法打开jupyter,找了很多方法,都没从根本上解决问题。倒是发现启动的默认目录被改了,怀疑是这个问题。然
- 许多共享主机的服务提供商不允许运行你自己的服务进程,也不允许修改 httpd.conf 文件。 尽管如此,仍然有可能通过Web服务器产生的子
- 随着移动端的用户越来越多,传统的web系统架构无法兼容很多移动终端的正常使用。在工作中也会发现,现在很多的客户都有在手机、平板等移动终端上使
- 在浏览天极RSS订阅页面时,可以看到天极网为方便用户定制站点内容而设立的各个RSS频道。浏览者通过订阅不同的RSS(可同时订阅多个网站),就
- 环境:【wind2003[open Tftp server] + virtualbox:ubuntn10 server】tftp
- 一、背景分析对想要在视觉化环境下制作复杂网页的专业网页制作者来说,Dreamweaver已经渐渐在网页编辑工具中展露头角,成为专业人士编写网
- 说到客户端数据存储,可能第一时间想到的是cookies,这是一种网站常见的存储数据的方法。它的最大优点是兼容性好,几乎所有浏览器都具有这个功
- 1.substring_index函数的语法及其用法(1)语法:substring_index(string,sep,num)即substr
- 概要在列表,元组,实例,类,字典和函数中存在循环引用问题。有 __del__ 方法的实例会以健全的方式被处理。给新类型添加GC支持是很容易的
- 特别是linux系统,装了多个python,有时候找不到python的绝对路径,有时候装了个django,又找不到django安装到哪里了。
- 撰写时间:2017.5.23一维数组1.numpy初始化一维数组a = np.array([1,2,3]);print a.shape输出的