网络编程
位置:首页>> 网络编程>> php编程>> PHP简易延时队列的实现流程详解

PHP简易延时队列的实现流程详解

作者:i_zane  发布时间:2023-05-29 23:02:48 

标签:PHP,延时,队列

需求说明

  • 当用户申请售后,商家未在n小时内处理,系统自动进行退款。

  • 商家拒绝后,用户可申请客服介入,客服x天内超时未处理,系统自动退款。

  • 用户收到货物,x天自动确认收货

  • 等等需要延时操作的流程……

设计思路

  • 设计一张队列表,记录所有队列的参数,执行状态,重试次数

  • 将创建队列的id 存于redis 中,使用zset有序集合。按照时间戳进行排序

  • 使用croontab定时任务每分钟执行一次

实现

新建队列表

CREATE TABLE `delay_queue` (
 `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
 `params` varchar(512) DEFAULT NULL,
 `message` varchar(255) DEFAULT '' COMMENT '执行结果',
 `ext_string` varchar(255) DEFAULT '' COMMENT '扩展字符串,可用于快速检索。取消该队列',
 `retry_times` int(2) DEFAULT '0' COMMENT '重试次数',
 `status` int(2) NOT NULL DEFAULT '1' COMMENT '1 待执行, 10 执行成功, 20 执行失败,30取消执行',
 `created` datetime DEFAULT NULL,
 `modified` datetime DEFAULT NULL,
 PRIMARY KEY (`id`),
 KEY `ext_idx` (`ext_string`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

部分队列的操作方法,新增队列、取消队列、队列执行成功、队列执行失败、队列重试【重试时间间隔抄的微信支付的异步通知时间】

class DelayQueueService
{
   // 重试时间,最大重试次数 15
   private static $retryTimes = [
       15, 15, 30, 3 * 60, 10 * 60, 20 * 60, 30 * 60, 30 * 60, 30 * 60, 60 * 60,
       3 * 60 * 60, 3 * 60 * 60, 3 * 60 * 60, 6 * 60 * 60, 6 * 60 * 60,
   ];
/**
* @description 增加队列至redis
* @param $queueId
* @param int $delay 需要延迟执行的时间。单位秒
* @return void
*/
public function addDelayQueue($queueId, int $delay)
{
   $time = time() + $delay;
   $redis = RedisService::getInstance();
   $redis->zAdd("delay_queue_job", $time, $queueId);
}
// 取消redis 队列
public function cancelDelayQueue($ext)
{
   $row = $query->getRow(); // 使用ext_string 快速检索到相应的记录
   if ($row) {
       $redis = RedisService::getInstance();
       $redis->zRem('delay_queue_job', $row->id);
       $row->status = DelayQueueTable::STATUS_CANCEL;
       $table->save($row);
   }
}
/**
* @description 执行成功
* @return void
*/
public static function success($id, $message = null)
{
   $table->update([
       'status' => DelayQueueTable::STATUS_SUCCESS,
       'message' => $message ?? '',
       'modified' => date('Y-m-d H:i:s'),
   ], [
       'id' => $id,
   ]);
}
/**
* @description 执行失败
* @return void
*/
public static function failed($id, $message = null)
{
   $table->updateAll([
       'status' => DelayQueueTable::STATUS_FAILED,
       'message' => $message ?? '',
       'modified' => date('Y-m-d H:i:s'),
   ], [
       'id' => $id,
   ]);
}
/**
* @description 失败队列重试,最大重试15次
* @param $id
* @return void
*/
public static function retry($id)
{
   $info = self::getById($id);
   if (!$info) {
       return;
   }
   $retryTimes = ++$info['retry_times'];
   if ($retryTimes > 15) {
       return;
   }
   $entity = [
       'params' => $info['params'],
       'ext_string' => $info['ext_string'],
       'retry_times' => $retryTimes,
   ];
   $queueId = $table->save($entity);
   self::addDelayQueue($queueId, self::$retryTimes[$retryTimes - 1]);
}
}

在命令行进行任务的运行

public function execute(Arguments $args, ConsoleIo $io)
{
   $startTimestamp = strtotime("-1 days");
   $now = time();
   $redis = RedisService::getInstance();
   $queueIds = $redis->zRangeByScore('delay_queue_job', $startTimestamp, $now);
   if ($queueIds) {
       foreach ($queueIds as $id) {
           $info = // 按照队列id 获取相应的信息
           if ($info['status'] === DelayQueueTable::STATUS_PADDING) {
               $params = unserialize($info['params']); // 创建记录的时候,需要试用serialize 将类名,方法,参数序列化
               $class = $params['class'];
               $method = $params['method'];
               $data = $params['data'];
               try {
                   call_user_func_array([$class, $method], [$data]);
                   $redis->zRem('delay_queue_job', $id);
                   $msg = date('Y-m-d H:i:s') . " [info] success: $id";
                   DelayQueueService::success($id, $msg);
                   $io->success($msg);
               } catch (Exception $e) {
                   $msg = date('Y-m-d H:i:s') . " [error] {$e->getMessage()}";
                       DelayQueueService::failed($id, $msg);
                       // 自定义异常code,不进行队列重试
                       if (10000 != $e->getCode()) {
                           DelayQueueService::retry($id);
                       }
                       $io->error($msg);
               }
           }
       }
   }
}

最后说点

  • 我这边的系统对实时性要求不高,所以直接使用的是linuxcrond 服务,每分钟运行一次。如需精确到秒级,可写一个shell,一分钟循环执行<=60

  • 因为目前的数据较少,延时队列加入的只有小部分。所以就在command 里面直接执行更新操作了,后期如果队列多,且有比较耗时的操作,可考虑把耗时操作单独放置一个队列中。本方法只用于将数据塞进队列。

附上 shell 脚本 一分钟执行60次

#!/bin/bash
step=2 #间隔的秒数,不能大于60
for (( i = 0; i < 60; i=(i+step) )); do
  echo $i # do something
  sleep $step
done

来源:https://blog.csdn.net/qq_39059866/article/details/126470332

0
投稿

猜你喜欢

  • SQL Server 2000安装问题集锦1、先把SQL Server卸载(卸载不掉也没有关系,继续下面的操作)2、把Microsoft S
  • 如今,基本每个网站都会需要到Tab切换展示内容的滑动门效果应用,这种效果可以在更少的页面空间内,展示更多的网站内容,节约空间,方便用户集中操
  • 前几天有个人退群了。起因很简单,他问了一个问题,没人回答,于是说要退群,后来我看到了,给了个链接,说这个问题已经说过好多遍了,于是他就退了。
  •   ASP调用WEBSERVICE----INDEX----1. soap请求方式2. post请求方式3.&
  • XPath(XML Path language)是一种处理XML文档段的语言。XSLT(Extensible Stylesheet Lang
  • 一、若出现404错误,自动跳转到所在目录的首页;二、若当前页本身是目录首页,则自动跳转至上一级目录的默认首页。自定义404页面代码如下:&l
  • 假如页面上有很多条记录,很多情况下,对这些信息按照字母表降序排序会比传统的升序排序显示效率更高。采用你熟悉的ORDER BY 子句,你可以很
  • Oracle游标分为显示游标和隐式游标。显示游标(Explicit Cursor):在PL/SQL程序中定义的、用于查询的游标称作显示游标。
  • 代码如下:--CAST 和 CONVERT 函数 Percentage DECLARE @dec decimal(5,3), @var va
  • SQLserver 2000中出现“指定的服务并未以已安装的服务存在" 解决方案一、将计算机名改成大写。二、将sql server
  • 所谓产品其实最终展现在用户面前的只是界面而已,所谓界面绝大多数时候只包括两个部分:图片、文字。重视界面上的每一个像素和每一个文字是UED的基
  • 在CSS初级教程中我们仅仅考虑了HTML选择符──以HTML标签形式出现。你当然可以用类选择符class和标识选择符id来定义自己的选择符。
  • 我需要查询从现在算起五天前的日期。按照商业习惯,这五天应该不包含星期六和星期天。专家回答:对于许多跟商业日期有关的情况,最好的解决方案是使用
  • 你可以通过自定义函数接口 (UDF)来添加函数。自定义函数被编译为目标文件,然后用CREATE FUNCTION 和DROP FUNCTIO
  • 这里其实并不需要其它的什么函数来支持,只需要使用MYSQL提供的一些SQL语句就可以了。这里为了简单起见,以MYSQL的系统表USER为例,
  • 我们来编写一个,引用时用:<!--#include Virtual="page.inc"-->语句即可:pa
  • Session 对象 可以使用 Session 对象存储特定用户会话所需的信息。这样,当用户在应用程序的 Web 页之间跳转时,存储在 Se
  • 需要的软件phpStudy 用来导入一个数据库api-server 数据库功能可以开启一个服务器,让开发环境可以使用生产环境的网址请求安装
  • 前言:1、上几次讨论右键禁止等问题的时候,有网友问那里有键值表KeyCode,我今天写了一个javascript,以飨各位有需要者。2、适用
  • 用QQ聊过天的朋友都对它的自动隐藏窗口功能爱不释手,它可以使窗口显得清爽整洁而且富有动感,笔者的几个朋友都想在自己的网页中加入类似的东东,经
手机版 网络编程 asp之家 www.aspxhome.com