Python多线程中线程数量如何控制
作者:Python热爱者 发布时间:2021-01-12 04:05:05
标签:Python,线程,控制
前言
前段时间学习了python的多线程爬虫,当时爬取一个图片网站,开启多线程后,并没有限制线程的数量,也就是说,如果下载1000张图片,会一次性开启1000个子线程同时进行下载
现在希望控制线程数量:例如每次只下载5张,当下载完成后再下载另外5张,直至全部完成
查了一些资料,发现在python中,threading 模块有提供 Semaphore类 和 BoundedSemaphore类来限制线程数
官网给出例子如下:
信号量通常用于保护容量有限的资源,例如数据库服务器。在资源大小固定的任何情况下,都应使用有界信号量。在产生任何工作线程之前,您的主线程将初始化信号量:
maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)
产生后,工作线程在需要连接到服务器时会调用信号量的获取和释放方法:
改造之前的多线程爬虫
首先贴出原来的代码
# -*- coding:utf-8 -*-
import requests
from requests.exceptions import RequestException
import os, time
import re
from lxml import etree
import threading
lock = threading.Lock()
def get_html(url):
"""
定义一个方法,用于获取一个url页面的响应内容
:param url: 要访问的url
:return: 响应内容
"""
response = requests.get(url, timeout=10)
# print(response.status_code)
try:
if response.status_code == 200:
# print(response.text)
return response.text
else:
return None
except RequestException:
print("请求失败")
# return None
def parse_html(html_text):
"""
定义一个方法,用于解析页面内容,提取图片url
:param html_text:
:return:一个页面的图片url集合
"""
html = etree.HTML(html_text)
if len(html) > 0:
img_src = html.xpath("//img[@class='photothumb lazy']/@data-original") # 元素提取方法
# print(img_src)
return img_src
else:
print("解析页面元素失败")
def get_image_pages(url):
"""
获取所查询图片结果的所有页码
:param url: 查询图片url
:return: 总页码数
"""
html_text = get_html(url) # 获取搜索url响应内容
# print(html_text)
if html_text is not None:
html = etree.HTML(html_text) # 生成XPath解析对象
last_page = html.xpath("//div[@class='pages']//a[last()]/@href") # 提取最后一页所在href链接
print(last_page)
if last_page:
max_page = re.compile(r'(\d+)', re.S).search(last_page[0]).group() # 使用正则表达式提取链接中的页码数字
print(max_page)
print(type(max_page))
return int(max_page) # 将字符串页码转为整数并返回
else:
print("暂无数据")
return None
else:
print("查询结果失败")
def get_all_image_url(page_number):
"""
获取所有图片的下载url
:param page_number: 爬取页码
:return: 所有图片url的集合
"""
base_url = 'https://imgbin.com/free-png/naruto/'
image_urls = []
x = 1 # 定义一个标识,用于给每个图片url编号,从1递增
for i in range(1, page_number):
url = base_url + str(i) # 根据页码遍历请求url
try:
html = get_html(url) # 解析每个页面的内容
if html:
data = parse_html(html) # 提取页面中的图片url
# print(data)
# time.sleep(3)
if data:
for j in data:
image_urls.append({
'name': x,
'value': j
})
x += 1 # 每提取一个图片url,标识x增加1
except RequestException as f:
print("遇到错误:", f)
continue
# print(image_urls)
return image_urls
def get_image_content(url):
"""请求图片url,返回二进制内容"""
# print("正在下载", url)
try:
r = requests.get(url, timeout=15)
if r.status_code == 200:
return r.content
return None
except RequestException:
return None
def main(url, image_name):
"""
主函数:实现下载图片功能
:param url: 图片url
:param image_name: 图片名称
:return:
"""
semaphore.acquire() # 加锁,限制线程数
print('当前子线程: {}'.format(threading.current_thread().name))
save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
try:
file_path = '{0}/{1}.jpg'.format(save_path, image_name)
if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取
with open(file_path, 'wb') as f:
f.write(get_image_content(url))
f.close()
print('第{}个文件保存成功'.format(image_name))
else:
print("第{}个文件已存在".format(image_name))
semaphore.release() # 解锁imgbin-多线程-重写run方法.py
except FileNotFoundError as f:
print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", f)
raise
except TypeError as e:
print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", e)
class MyThread(threading.Thread):
"""继承Thread类重写run方法创建新进程"""
def __init__(self, func, args):
"""
:param func: run方法中要调用的函数名
:param args: func函数所需的参数
"""
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self):
print('当前子线程: {}'.format(threading.current_thread().name))
self.func(self.args[0], self.args[1])
# 调用func函数
# 因为这里的func函数其实是上述的main()函数,它需要2个参数;args传入的是个参数元组,拆解开来传入
if __name__ == '__main__':
start = time.time()
print('这是主线程:{}'.format(threading.current_thread().name))
urls = get_all_image_url(5) # 获取所有图片url列表
thread_list = [] # 定义一个列表,向里面追加线程
semaphore = threading.BoundedSemaphore(5) # 或使用Semaphore方法
for t in urls:
# print(i)
m = MyThread(main, (t["value"], t["name"])) # 调用MyThread类,得到一个实例
thread_list.append(m)
for m in thread_list:
m.start() # 调用start()方法,开始执行
for m in thread_list:
m.join() # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出
end = time.time()
print(end-start)
# get_image_pages("https://imgbin.com/free-png/Naruto")
将代码进行改造
1、下面的第8、9行表示调用 threading 的 BoundedSemaphore类,初始化信号量为5,把结果赋给变量 pool_sema
if __name__ == '__main__':
start = time.time()
print('这是主线程:{}'.format(threading.current_thread().name))
urls = get_all_image_url(5) # 获取所有图片url列表
thread_list = [] # 定义一个列表,向里面追加线程
# 更多Python相关视频、资料加群778463939免费获取
max_connections = 5 # 定义最大线程数
pool_sema = threading.BoundedSemaphore(max_connections) # 或使用Semaphore方法
for t in urls:
# print(i)
m = MyThread(main, (t["value"], t["name"])) # 调用MyThread类,得到一个实例
thread_list.append(m)
for m in thread_list:
m.start() # 调用start()方法,开始执行
for m in thread_list:
m.join() # 子线程调用join()方法,使主线程等待子线程运行完毕之后才退出
end = time.time()
print(end-start)
2、修改main()函数
(1)方法一:通过with语句实现,第9行添加 with pool_sema
使用 with 语句来获得一个锁、条件变量或信号量,相当于调用 acquire();离开 with 块后,会自动调用 release()
def main(url, image_name):
"""
主函数:实现下载图片功能
:param url: 图片url
:param image_name: 图片名称
:return:
"""
with pool_sema:
print('当前子线程: {}'.format(threading.current_thread().name))
save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
try:
file_path = '{0}/{1}.jpg'.format(save_path, image_name)
if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取
with open(file_path, 'wb') as f:
f.write(get_image_content(url))
f.close()
print('第{}个文件保存成功'.format(image_name))
else:
print("第{}个文件已存在".format(image_name))
except FileNotFoundError as f:
print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", f)
raise
except TypeError as e:
print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", e)
(2)方法二:直接使用 acquire()和 release()
下面的第8行调用 acquire(),第24行调用release()
def main(url, image_name):
"""
主函数:实现下载图片功能
:param url: 图片url
:param image_name: 图片名称
:return:
"""
pool_sema.acquire() # 加锁,限制线程数
# with pool_sema:
print('当前子线程: {}'.format(threading.current_thread().name))
save_path = os.path.dirname(os.path.abspath('.')) + '/pics/'
try:
file_path = '{0}/{1}.jpg'.format(save_path, image_name)
if not os.path.exists(file_path): # 判断是否存在文件,不存在则爬取
with open(file_path, 'wb') as f:
f.write(get_image_content(url))
f.close()
print('第{}个文件保存成功'.format(image_name))
else:
print("第{}个文件已存在".format(image_name))
pool_sema.release() # 解锁imgbin-多线程-重写run方法.py
except FileNotFoundError as f:
print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", f)
raise
except TypeError as e:
print("第{}个文件下载时遇到错误,url为:{}:".format(image_name, url))
print("报错:", e)
最终效果是一样的,每次启用5个线程,完成后再启动下一批
来源:https://blog.csdn.net/qdPython/article/details/121907420


猜你喜欢
- 首先我们来安装python1、首先进入网站下载:点击打开链接(或自己输入网址https://www.python.org/downloads
- 利用numpy和scipy,我们可以很容易根据欧拉角求出旋转矩阵,这里的旋转轴我们你理解成四元数里面的旋转轴 import nu
- 这个是用vue-cli生成的项目下使用比如有个路由跳转时需要带两个参数:<router-link to='/tr'&g
- 等啊等,约会都回来了,终于等到了Google放出今年的情人节Logo,原本下午四点就可以上线的这篇文章,为了等待Google谷歌美国总部的那
- 利用二进制反格雷码(bynary reflected Gray code)的方式生成n个元素的全组合,Cn1+Cn2+...+Cnn,如在利
- 一、在控制器中引用:use cache;二、基本方法及使用1、put() 键 值 有效时间(分钟)Cache::put('key1&
- 具体代码如下所示:#coding:utf8import random,wxdef password(event): a = [c
- 用goland打开别人的go项目。可能碰到下面的问题goland cannot find package "server/comm
- 前言:python多进程,经常在使用,却没有怎么系统的学习过,官网上面讲得比较细,结合自己的学习,整理记录下官网:https://docs.
- 1.查看Anaconda中有哪些虚拟环境及所在目录:在桌面搜索框中输入Anaconda Prompt并打开输入命令conda env lis
- SELECT ABS(DATEDIFF(dd,EndDate,BeginDate)) FROM dbo.WorkingPlan 其中,Wor
- Redisredis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串
- 开启MySQL的远程访问权限默认mysql的用户是没有远程访问的权限的,因此当程序跟数据库不在同一台服务器上时,我们需要开启mysql的远程
- 今天星期天,因数据库太慢,最后决定将数据库进行重新整理. (假定数据库名称为:DB_ste) 1、根据现在的数据库的脚本创建一个脚本文件(F
- 关于django celery的使用网上有很多文章,本文就不多做更多的说明。本文使用版本python==3.8.15Django==3.2.
- blur事件在元素失去焦点时触发。在一些jquery的教程、api手册等上面对blur事件,提供了一个错误的例子,就是关于p标签失去焦点的问
- 简介Flv.js 是 HTML5 Flash 视频(FLV)播放器,纯原生 JavaScript 开发,没有用到 Flash。由 bilib
- 网上关于各种降维算法的资料参差不齐,同时大部分不提供源代码。这里有个 GitHub 项目整理了使用 Python 实现了 11 种经典的数据
- 看一看自己写的类是否能符合这样的标准.要成为高手,我要走的路还很长.摘抄自《OOD 启示录》--Arthur J.Riel(1)所有数据都应
- 本文实例讲述了Python二叉树的镜像转换实现方法。分享给大家供大家参考,具体如下:问题描述操作给定的二叉树,将其变换为源二叉树的镜像。思路