Python实现简单多线程任务队列
作者:hebedich 发布时间:2022-07-29 13:21:43
最近我在用梯度下降算法绘制神经网络的数据时,遇到了一些算法性能的问题。梯度下降算法的代码如下(伪代码):
def gradient_descent():
# the gradient descent code
plotly.write(X, Y)
一般来说,当网络请求 plot.ly 绘图时会阻塞等待返回,于是也会影响到其他的梯度下降函数的执行速度。
一种解决办法是每调用一次 plotly.write 函数就开启一个新的线程,但是这种方法感觉不是很好。 我不想用一个像 cerely(一种分布式任务队列)一样大而全的任务队列框架,因为框架对于我的这点需求来说太重了,并且我的绘图也并不需要 redis 来持久化数据。
那用什么办法解决呢?我在 python 中写了一个很小的任务队列,它可以在一个单独的线程中调用 plotly.write函数。下面是程序代码。
from threading import Thread
import Queue
import time
class TaskQueue(Queue.Queue):
首先我们继承 Queue.Queue 类。从 Queue.Queue 类可以继承 get 和 put 方法,以及队列的行为。
def __init__(self, num_workers=1):
Queue.Queue.__init__(self)
self.num_workers = num_workers
self.start_workers()
初始化的时候,我们可以不用考虑工作线程的数量。
def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))
我们把 task, args, kwargs 以元组的形式存储在队列中。*args 可以传递数量不等的参数,**kwargs 可以传递命名参数。
def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
我们为每个 worker 创建一个线程,然后在后台删除。
下面是 worker 函数的代码:
def worker(self):
while True:
tupl = self.get()
item, args, kwargs = self.get()
item(*args, **kwargs)
self.task_done()
worker 函数获取队列顶端的任务,并根据输入参数运行,除此之外,没有其他的功能。下面是队列的代码:
我们可以通过下面的代码测试:
def blokkah(*args, **kwargs):
time.sleep(5)
print “Blokkah mofo!”
q = TaskQueue(num_workers=5)
for item in range(1):
q.add_task(blokkah)
q.join() # wait for all the tasks to finish.
print “All done!”
Blokkah 是我们要做的任务名称。队列已经缓存在内存中,并且没有执行很多任务。下面的步骤是把主队列当做单独的进程来运行,这样主程序退出以及执行数据库持久化时,队列任务不会停止运行。但是这个例子很好地展示了如何从一个很简单的小任务写成像工作队列这样复杂的程序。
def gradient_descent():
# the gradient descent code
queue.add_task(plotly.write, x=X, y=Y)
修改之后,我的梯度下降算法工作效率似乎更高了。如果你很感兴趣的话,可以参考下面的代码。
from threading import Thread
import Queue
import time
class TaskQueue(Queue.Queue):
def __init__(self, num_workers=1):
Queue.Queue.__init__(self)
self.num_workers = num_workers
self.start_workers()
def add_task(self, task, *args, **kwargs):
args = args or ()
kwargs = kwargs or {}
self.put((task, args, kwargs))
def start_workers(self):
for i in range(self.num_workers):
t = Thread(target=self.worker)
t.daemon = True
t.start()
def worker(self):
while True:
tupl = self.get()
item, args, kwargs = self.get()
item(*args, **kwargs)
self.task_done()
def tests():
def blokkah(*args, **kwargs):
time.sleep(5)
print "Blokkah mofo!"
q = TaskQueue(num_workers=5)
for item in range(10):
q.add_task(blokkah)
q.join() # block until all tasks are done
print "All done!"
if __name__ == "__main__":
tests()


猜你喜欢
- 本文实例为大家分享了Python3连接MySQL模拟转账的具体实现代码,供大家参考,具体内容如下# coding:utf8import sy
- 1.绘制面积图面积图常用于描述某指标随时间的变化程度。其面积也通常可以有一定的含义。绘制面积图使用的是plt.stackplot()方法。以
- 其实很简单,一般的数组去重可以直接用 new Set() 方法即可,但是数组对象的话,比较复杂,不能直接用,我们可以采取间接的方法来去重un
- 以前讲过利用phantomjs做爬虫抓网页 https://www.jb51.net/article/55789.htm 是配合选择器做的利
- 我们每天接触到各类应用,如社交、在线文档、直播等,后端都需要使用WebSocket技术提供实时通信能力。本文介绍如何使用Golang实现实时
- 本文实例讲述了php获取文章内容第一张图片的方法。分享给大家供大家参考,具体如下:<?php$temp=mt_rand(1,4);$p
- python 定时器默认定时器只执行一次,第一个参数单位S,几秒后执行import threadingdef fun_timer(): pr
- 在Python中,变量是没有类型的,这和以往看到的大部分编辑语言都不一样。在使用变量的时候,不需要提前声明,只需要给这个变量赋值即可。但是,
- 主要采用的技术点Python + Numpy + PIL在正文代码开始前,大家先看看最初原图和转换手绘风图片前后对比。当然了,我先查了手绘的
- FTP服务的主动模式和被动模式在开始之前,先聊一下FTP的主动模式和被动模式,两者的区别 , 用两张图来表示可能会更加清晰一些:主动模式:主
- 本文实例讲述了Python简单实现安全开关文件的两种方式。分享给大家供大家参考,具体如下:以下代码经Python3.3测试。方式1:try:
- 前言Multiprocessing.Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一
- <?php header(“Content-Type:text/html;charset=utf-8″); if (isset($_G
- 前言第一次用mysql,打开mysql的图形化界面要连接时,出现2003错误。究其原因,可能是mysql的服务没有启动。本文章主要围绕这个解
- isdecimal()方法检查字符串是否仅由十进制字符组成。此方法只存在于unicode对象。注意:要定义一个字符串为Unico
- 我们先从一个常见的Python编程错误开始说起,我已经见过非常多的程序员犯过这种错误了:def do_not_raise(user_defi
- 我的项目环境:平台:Windows10语言环境:python3.7编译器:PyCharmPyTorch版本:1.11.0PyG版本:2.1.
- 比如下面一个listbinfo = ['lao','wang','python']我们通过h
- 这篇文章主要介绍了基于python调用psutils模块过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值
- 利用序列产生主键值。 序列(Sequence)是一种可以被多个用户使用的用于产生一系列唯一数字的数据库对象。序列定义存储在数据字典中,通过提