Python定义一个Actor任务
作者:David Beazley 发布时间:2022-05-30 09:24:50
问题
你想定义跟actor模式中类似“actors”角色的任务
解决方案
actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。 事实上,它天生的简单性是它如此受欢迎的重要原因之一。 简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。 actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知。
结合使用一个线程和一个队列可以很容易的定义actor,例如:
from queue import Queue
from threading import Thread, Event
# Sentinel used for shutdown
class ActorExit(Exception):
pass
class Actor:
def __init__(self):
self._mailbox = Queue()
def send(self, msg):
'''
Send a message to the actor
'''
self._mailbox.put(msg)
def recv(self):
'''
Receive an incoming message
'''
msg = self._mailbox.get()
if msg is ActorExit:
raise ActorExit()
return msg
def close(self):
'''
Close the actor, thus shutting it down
'''
self.send(ActorExit)
def start(self):
'''
Start concurrent execution
'''
self._terminated = Event()
t = Thread(target=self._bootstrap)
t.daemon = True
t.start()
def _bootstrap(self):
try:
self.run()
except ActorExit:
pass
finally:
self._terminated.set()
def join(self):
self._terminated.wait()
def run(self):
'''
Run method to be implemented by the user
'''
while True:
msg = self.recv()
# Sample ActorTask
class PrintActor(Actor):
def run(self):
while True:
msg = self.recv()
print('Got:', msg)
# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()
这个例子中,你使用actor实例的 send()
方法发送消息给它们。 其机制是,这个方法会将消息放入一个队里中, 然后将其转交给处理被接受消息的一个内部线程。 close()
方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor。 用户可以通过继承Actor并定义实现自己处理逻辑run()方法来定义新的actor。 ActorExit
异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求 (异常被get()方法抛出并传播出去)。
如果你放宽对于同步和异步消息发送的要求, 类actor对象还可以通过生成器来简化定义。例如:
def print_actor():
while True:
try:
msg = yield # Get a message
print('Got:', msg)
except GeneratorExit:
print('Actor terminating')
# Sample use
p = print_actor()
next(p) # Advance to the yield (ready to receive)
p.send('Hello')
p.send('World')
p.close()
讨论
actor模式的魅力就在于它的简单性。 实际上,这里仅仅只有一个核心操作 send()
. 甚至,对于在基于actor系统中的“消息”的泛化概念可以已多种方式被扩展。 例如,你可以以元组形式传递标签消息,让actor执行不同的操作,如下:
class TaggedActor(Actor):
def run(self):
while True:
tag, *payload = self.recv()
getattr(self,'do_'+tag)(*payload)
# Methods correponding to different message tags
def do_A(self, x):
print('Running A', x)
def do_B(self, x, y):
print('Running B', x, y)
# Example
a = TaggedActor()
a.start()
a.send(('A', 1)) # Invokes do_A(1)
a.send(('B', 2, 3)) # Invokes do_B(2,3)
a.close()
a.join()
作为另外一个例子,下面的actor允许在一个工作者中运行任意的函数, 并且通过一个特殊的Result对象返回结果:
from threading import Event
class Result:
def __init__(self):
self._evt = Event()
self._result = None
def set_result(self, value):
self._result = value
self._evt.set()
def result(self):
self._evt.wait()
return self._result
class Worker(Actor):
def submit(self, func, *args, **kwargs):
r = Result()
self.send((func, args, kwargs, r))
return r
def run(self):
while True:
func, args, kwargs, r = self.recv()
r.set_result(func(*args, **kwargs))
# Example use
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
worker.close()
worker.join()
print(r.result())
最后,“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。 例如,一个类actor对象的 send()
方法可以被编程让它能在一个套接字连接上传输数据 或通过某些消息中间件(比如AMQP、ZMQ等)来发送。
来源:https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p10_defining_an_actor_task.html


猜你喜欢
- 这篇文章主要介绍了python StringIO如何在内存中读写str,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学
- 本文实例讲述了python config文件的读写操作。分享给大家供大家参考,具体如下:1、设置配置文件[mysql]host = 1234
- 在Soundbreak我们每天24小时不间断地播放实况音频和视频,所以对于MySQL的新增的复制特性,我们不能做出很令人信服的测试。通过测试
- 本文实例讲述了Zend Framework动作助手Redirector用法。分享给大家供大家参考,具体如下:Redirector 提供另一种
- 本人刚开始学习python,看了一段时间视频教程之后,决定做一个小游戏来巩固一下知识点,就做了一个文字版飞行棋,暂不具备图形界面。把代码贴出
- 这篇是Nicholas讨论如果防止脚本失控的第二篇,主要讨论了如何重构嵌套循环、递归,以及那些在函数内部同时执行很多子操作的函数。基本的思想
- 代码在ext里的src\core\ext.js下 最新的ext3.0beat1的代码如下: ua = navigator.userAgent
- pycharm安装cv2模块安装失败和无法使用的解决步骤一我们先到这个网址:cv2下载地址去下载与自己python版本号和电脑位数对应的op
- 尽管XML还处在开发阶段,其标准正在由W3C组织制定,但是已经有许多公司表示全力支持XML,并开发了不少XML工具。Adobe公司的Fram
- 一,原图和效果图二,代码//#########################产生随机颜色#########################
- 前言:在 Web 页面经常会有各种事件发生,事件发生后需要进行一些特定处理,即执行特定的函数或者语句。这就需要对事件进行监听,监听事件的常见
- substr(string ,1,3) 函数解读:取string 中重左往右取3字长的字符串。结果为:str从右往左呢?应该有另一个函数来提
- 前言使用 Python 的时候,我们知道 list 是一个长度可变对的数组, 可以通过 insert,append 和 extend 轻易的
- MySQL中有许多操作符和函数可以返回字符串。本节回答这个问题:返回的字符串使用什么字符集和 校对规则?对于简单的函数,即接收字符串输入然后
- 将有安全问题的SQL过程删除.比较全面.一切为了安全!删除了调用shell,注册表,COM组件的破坏权限MS SQL SERVER2000使
- 本文实例讲述了Java常用正则表达式验证类。分享给大家供大家参考,具体如下:package com.fsti.icop.util.regex
- 在将数据库从MSSQL迁移到MySQL的过程中,基于业务逻辑的要求,需要在MySQL的自增列插入0值。在MSSQL中是这样完成的: stri
- 用来制作甘特图的专业工具也不少,常见的有:Microsoft Office Project、GanttProject、WARCHART XG
- 1、自动化代码中,用到了哪些设计模式?单例设计模式工厂模式PO设计模式数据驱动模式面向接口编程设计模式2、什么是断言( Assert) ?断
- 1. 使用 in 和 not inin 和 not in 在 Python 中是很常用的关键字,我们将它们归类为 成员运算符。使用这两个成员