python3 queue多线程通信
作者:lilongsy??????? 发布时间:2022-09-20 08:41:05
标签:python,queue,多线程,通信
queue分类
python3 queue分三类:
先进先出队列
后进先出的栈
优先级队列
他们的导入方式分别是:
from queue import Queue
from queue import LifoQueue
from queue import
具体方法见下面引用说明。
例子一、生产消费模式
Queue
对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。 当使用队列时,协调生产者和消费者的关闭问题可能会有一些麻烦。一个通用的解决方法是在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行。
例如:
from queue import Queue
from threading import Thread
# 用来表示终止的特殊对象
_sentinel = object()
# A thread that produces data
def producer(out_q):
for i in range(10):
print("生产")
out_q.put(i)
out_q.put(_sentinel)
# A thread that consumes data
def consumer(in_q):
while True:
data = in_q.get()
if data is _sentinel:
in_q.put(_sentinel)
break
else:
print("消费", data)
# Create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
结果:
本例中有一个特殊的地方:消费者在读到这个特殊值之后立即又把它放回到队列中,将之传递下去。这样,所有监听这个队列的消费者线程就可以全部关闭了。 尽管队列是最常见的线程间通信机制,但是仍然可以自己通过创建自己的数据结构并添加所需的锁和同步机制来实现线程间通信。最常见的方法是使用 Condition
变量来包装你的数据结构。下边这个例子演示了如何创建一个线程安全的优先级队列。
import heapq
import threading
class PriorityQueue:
def __init__(self):
self._queue = []
self._count = 0
self._cv = threading.Condition()
def put(self, item, priority):
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item))
self._count += 1
self._cv.notify()
def get(self):
with self._cv:
while len(self._queue) == 0:
self._cv.wait()
return heapq.heappop(self._queue)[-1]
例子二、task_done和join
使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,你没有办法知道接收数据的线程是什么时候接收到的数据并开始工作的。不过队列对象提供一些基本完成的特性,比如下边这个例子中的task_done()
和 join()
:
from queue import Queue
from threading import Thread
class Producer(Thread):
def __init__(self, q):
super().__init__()
self.count = 5
self.q = q
def run(self):
while self.count > 0:
print("生产")
if self.count == 1:
self.count -= 1
self.q.put(2)
else:
self.count -= 1
self.q.put(1)
class Consumer(Thread):
def __init__(self, q):
super().__init__()
self.q = q
def run(self):
while True:
print("消费")
data = self.q.get()
if data == 2:
print("stop because data=", data)
# 任务完成,从队列中清除一个元素
self.q.task_done()
break
else:
print("data is good,data=", data)
# 任务完成,从队列中清除一个元素
self.q.task_done()
def main():
q = Queue()
p = Producer(q)
c = Consumer(q)
p.setDaemon(True)
c.setDaemon(True)
p.start()
c.start()
# 等待队列清空
q.join()
print("queue is complete")
if __name__ == '__main__':
main()
结果:
例子三、多线程里用queue
设置俩队列,一个是要做的任务队列todo_queue
,一个是已经完成的队列done_queue
。
每次执行线程,先从todo_queue
队列里取出一个值,然后执行完,放入done_queue
队列。
如果todo_queue
为空,就退出。
import logging
import logging.handlers
import threading
import queue
log_mgr = None
todo_queue = queue.Queue()
done_queue = queue.Queue()
class LogMgr:
def __init__(self, logpath):
self.LOG = logging.getLogger('log')
loghd = logging.handlers.RotatingFileHandler(logpath, "a", 0, 1)
fmt = logging.Formatter("%(asctime)s %(threadName)-10s %(message)s", "%Y-%m-%d %H:%M:%S")
loghd.setFormatter(fmt)
self.LOG.addHandler(loghd)
self.LOG.setLevel(logging.INFO)
def info(self, msg):
if self.LOG is not None:
self.LOG.info(msg)
class Worker(threading.Thread):
global log_mgr
def __init__(self, name):
threading.Thread.__init__(self)
self.name = name
def run(self):
while True:
try:
task = todo_queue.get(False)
if task:
log_mgr.info("HANDLE_TASK: %s" % task)
done_queue.put(1)
except queue.Empty:
break
return
def main():
global log_mgr
log_mgr = LogMgr("mylog")
for i in range(30):
todo_queue.put("data"+str(i))
workers = []
for i in range(3):
w = Worker("worker"+str(i))
workers.append(w)
for i in range(3):
workers[i].start()
for i in range(3):
workers[i].join()
total_num = done_queue.qsize()
log_mgr.info("TOTAL_HANDLE_TASK: %d" % total_num)
exit(0)
if __name__ == '__main__':
main()
输出日志文件结果:
来源:https://blog.51cto.com/lilongsy/5454381
0
投稿
猜你喜欢
- 之前在做数据分析的过程中,需要对数据进行实时的写入,比如对新生成的数据写入之前已经生成的txt或csv文件中。现在想想其实很简单,所以做一个
- 如下所示:from openpyxl import workbookfrom openpyxl import load_workbookfr
- 对于python,一切事物都是对象,程序中存储的所有数据都是对象,对象基于类创建计算机能处理的远不止数值,还可以处理文本、图形、音频、视频、
- 1、官网下载地址在官网找到你想安装的版本 官网地址:https://www.python.org/并且选择下载windows版本目前最新的版
- 今天在学习dubbo-go的时候,下载了dubbo-go的example,依赖的包太多了,之前都是手动下载某个依赖的包,现在手动一个一个 g
- 本文为大家分享了python查看微信消息撤回的具体代码,供大家参考,具体内容如下1.安装itchatitchat是一个开源的python微信
- 废话不多说,直接开始拉~~~我们总共有 6 只海龟,颜色不同,它们以随机长度移动。首先,我们应该通过输入乌龟的颜色来押注乌龟。第一个越线的乌
- 本文实例讲述了Flask框架Flask-Login用法。分享给大家供大家参考,具体如下:Flask-Login插件中带了6种信号,可以基于其
- 近期因工作需要,需对几十万条商品和订单数据进行初步的数据分析,本来尝试过用Excel,但是数据量一旦超过10万条,Excel和电脑的性能瓶颈
- 在照着Tensorflow官网的demo敲了一遍分类器项目的代码后,运行倒是成功了,结果也不错。但是最终还是要训练自己的数据,所以尝试准备加
- 废话不多说原因:在Anaconda下打包的很多不必要的模块进去,导致最终的exe文件过于庞大。解决办法:要用纯净的python来打包即可避免
- 将有安全问题的SQL过程删除.比较全面.一切为了安全!删除了调用shell,注册表,COM组件的破坏权限MS SQL SERVER2000使
- 代码检测textarea内填写的长度,未填写时提示需要重新填写,少于15字符时提示需要长于15字符,成功时显示所填写建议。<scrip
- QThread是Qt的线程类中最核心的底层类。由于PyQt的的跨平台特性,QThread要隐藏所有与平台相关的代码要使用的QThread开始
- 效果图:(灰色区域可拖动)代码如下:<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0
- mysql可以通过下面语句判断是否支持分区:SHOW VARIABLES LIKE '%partition%';如果输出:h
- 描述tan() 返回x弧度的正弦值。语法以下是 tan() 方法的语法:import mathmath.tan(x)注意:tan()是不能直
- 一:购物车管理功能1.添加商品(不重复添加)、2.删除商品(购物车中有的才能删除)、3.查看购物车4.退出系统产品列表products =
- 前言本文主要给大家介绍的是关于Python中表达式x += y和x = x+y 区别的相关内容,分享出来供大家参考学习,下面来看看详细的介绍
- 什么是pyscriptpyscript由来自 Anaconda 的团队开发,包括 Peter Wang、Fabio Pliger 和 Phi