swoole_process实现进程池的方法示例
作者:opso 发布时间:2024-06-05 15:40:23
swoole —— 重新定义PHP
swoole 的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe),对swoole_process 的研究在swoole中显得尤为重要。
预备知识
IO多路复用
swoole 中的io多路复用表现为底层的 epoll进程模型,在C语言中表现为 epoll 函数。
epoll 模型下会持续监听自己名下的素有socket 描述符 fd
当触发了 socket 监听的事件时,epoll 函数才会响应,并返回所有监听该时间的 socket 集合
epoll 的本质是阻塞IO,它的优点在于能同事处理大量socket连接
Event loop 事件循环
swoole 对 epoll 实现了一个Reactor线程模型封装,设置了read事件和write事件的监听回调函数。(详见swoole_event_add)
Event loop 是一个Reactor线程,其中运行了一个epoll实例。
通过swoole_event_add将socket描述符的一个事件添加到epoll监听中,事件发生时将执行回调函数
不可用于fpm环境下,因为fpm在任务结束时可能会关掉进程。
swoole_process
基于C语言封装的进程管理模块,方便php来调用
内置管道、消息队列接口,方便实现进程间通信
我们在php-fpm.conf配置文件中发现,php-fpm中有两种进程池管理设置。
静态模式 即初始化固定的进程数,当来了一个请求时,从中选取一个进程来处理。
动态模式 指定最小、最大进程数,当请求量过大,进程数不超过最大限制时,新增线程去处理请求
接下来用swoole代码来实现,这里只是为理解swoole_process、进程间通信、定时器等使用,实际情况使用封装好的swoole_server来实现task任务队列池会更方便。
假如有个定时投递的任务队列:
<?php
/**
* 动态进程池,类似fpm
* 动态新建进程
* 有初始进程数,最小进程数,进程不够处理时候新建进程,不超过最大进程数
*/
// 一个进程定时投递任务
/**
* 1. tick
* 2. process及其管道通讯
* 3. event loop 事件循环
*/
class processPool
{
private $pool;
/**
* @var swoole_process[] 记录所有worker的process对象
*/
private $workers = [];
/**
* @var array 记录worker工作状态
*/
private $used_workers = [];
/**
* @var int 最小进程数
*/
private $min_woker_num = 5;
/**
* @var int 初始进程数
*/
private $start_worker_num = 10;
/**
* @var int 最大进程数
*/
private $max_woker_num = 20;
/**
* 进程闲置销毁秒数
* @var int
*/
private $idle_seconds = 5;
/**
* @var int 当前进程数
*/
private $curr_num;
/**
* 闲置进程时间戳
* @var array
*/
private $active_time = [];
public function __construct()
{
$this->pool = new swoole_process(function () {
// 循环建立worker进程
for ($i = 0; $i < $this->start_worker_num; $i++) {
$this->createWorker();
}
echo '初始化进程数:' . $this->curr_num . PHP_EOL;
// 每秒定时往闲置的worker的管道中投递任务
swoole_timer_tick(1000, function ($timer_id) {
static $count = 0;
$count++;
$need_create = true;
foreach ($this->used_workers as $pid => $used) {
if ($used == 0) {
$need_create = false;
$this->workers[$pid]->write($count . ' job');
// 标记使用中
$this->used_workers[$pid] = 1;
$this->active_time[$pid] = time();
break;
}
}
foreach ($this->used_workers as $pid => $used)
// 如果所有worker队列都没有闲置的,则新建一个worker来处理
if ($need_create && $this->curr_num < $this->max_woker_num) {
$new_pid = $this->createWorker();
$this->workers[$new_pid]->write($count . ' job');
$this->used_workers[$new_pid] = 1;
$this->active_time[$new_pid] = time();
}
// 闲置超过一段时间则销毁进程
foreach ($this->active_time as $pid => $timestamp) {
if ((time() - $timestamp) > $this->idle_seconds && $this->curr_num > $this->min_woker_num) {
// 销毁该进程
if (isset($this->workers[$pid]) && $this->workers[$pid] instanceof swoole_process) {
$this->workers[$pid]->write('exit');
unset($this->workers[$pid]);
$this->curr_num = count($this->workers);
unset($this->used_workers[$pid]);
unset($this->active_time[$pid]);
echo "{$pid} destroyed\n";
break;
}
}
}
echo "任务{$count}/{$this->curr_num}\n";
if ($count == 20) {
foreach ($this->workers as $pid => $worker) {
$worker->write('exit');
}
// 关闭定时器
swoole_timer_clear($timer_id);
// 退出进程池
$this->pool->exit(0);
exit();
}
});
});
$master_pid = $this->pool->start();
echo "Master $master_pid start\n";
while ($ret = swoole_process::wait()) {
$pid = $ret['pid'];
echo "process {$pid} existed\n";
}
}
/**
* 创建一个新进程
* @return int 新进程的pid
*/
public function createWorker()
{
$worker_process = new swoole_process(function (swoole_process $worker) {
// 给子进程管道绑定事件
swoole_event_add($worker->pipe, function ($pipe) use ($worker) {
$data = trim($worker->read());
if ($data == 'exit') {
$worker->exit(0);
exit();
}
echo "{$worker->pid} 正在处理 {$data}\n";
sleep(5);
// 返回结果,表示空闲
$worker->write("complete");
});
});
$worker_pid = $worker_process->start();
// 给父进程管道绑定事件
swoole_event_add($worker_process->pipe, function ($pipe) use ($worker_process) {
$data = trim($worker_process->read());
if ($data == 'complete') {
// 标记为空闲
// echo "{$worker_process->pid} 空闲了\n";
$this->used_workers[$worker_process->pid] = 0;
}
});
// 保存process对象
$this->workers[$worker_pid] = $worker_process;
// 标记为空闲
$this->used_workers[$worker_pid] = 0;
$this->active_time[$worker_pid] = time();
$this->curr_num = count($this->workers);
return $worker_pid;
}
}
new processPool();
来源:https://opso.coding.me/2018/07/07/swoole-process/


猜你喜欢
- 前言最近有人在Twisted邮件列表中提出诸如"为任务紧急的人提供一份Twisted介绍"的需求。值得提前透露的是,这个
- 说下思路吧:原图->灰度->根据像素亮度-映射到指定的字符序列中->输出。字符越多,字符变化稠密。效果会更好。如果根据灰度
- 问题概述:有时候在使用print函数输出时,往往需要不断地切换字符串和变量,操作起来很不方便,需要不断地打引号和逗号。比如:firstNam
- 直接上代码:# -*- coding: utf-8 -*- import Queue import threadingimport urll
- 在我们想要对不同变量进行判断的时候,会分析其中的之间的联系。这种理念同样也被用在实例生活中,最常见到的是做一个地理的热力图。很多人对画热力图
- 本章我们要制作一个俄罗斯方块游戏。Tetris译注:称呼:方块是由四个小方格组成的俄罗斯方块游戏是世界上最流行的游戏之一。是由一名叫Alex
- 一、简介我们在这里采用Python中的matplotlib来实现曲线图形的绘制。matplotlib是著名的python绘图库,它提供了一整
- 1. 二维(多维)数组降为一维数组方法1: reshape()+concatenate 函数,这个方法是间接法,利用 reshape() 函
- Sqlserver数据库分页查询一直是Sqlserver的短板,闲来无事,想出几种方法,假设有表ARTICLE,字段ID、YEAR...(其
- 一、文章前言此文主要实现识别人体的轮廓范围,与背景进行分离并保存效果图,适用于拍照背景替换及透明背景的人像图(png格式)转换。二、具体流程
- 利用PHP中的thinkphp5进行项目开发,将view一道项目跟目录下进入到/www/wwwroot/xxxx/application/c
- qqbot 是一个用 python 实现的、基于腾讯 SmartQQ 协议的 QQ 机器人框架,可运行在 Linux 、 Windows 和
- 本文实例讲述了python使用socket远程连接错误处理方法。分享给大家供大家参考。具体如下:import socket, syshost
- 目录1.简介2.如何解决3.虚线框类代码4.测试UI界面如下图所示5.拖动时的效果图如下所示1.简介看到很多才学QT的人都会问为啥无边框拖动
- 首先,创建一个存储过程 get_clob: t_name:要查询的表名;f_name:要查询的字段名;u_id:表的主键,查询条件;l_po
- 1、基于字典的创建规划问题上篇中介绍了使用 LpVariable 对逐一定义每个决策变量,设定名称、类型和上下界,类似地对约束条件也需要逐一
- 一、系统资源使用限制的必要性探讨对于一个脚本,最基础的限制是要限制单进程实例以保证了不会存在多个进程实例、在运行程序主体逻辑前检测系统资源剩
- 一般情况下:if(2 > 10){alert("不正确!");} 此比较不会是想要的结果:它相当于2 >1,
- MySQL函数CONCAT、CONCAT_WS、GROUP_CONCAT1.concat()函数CONCAT 函数用于将两个字符串连接为一个
- 一、手指触屏,利用touchstart和touchend计算前后滑动距离,判断是上拉还是下滑。二、js中距离:pageY、clientY、o