python分布式编程实现过程解析
作者:Mars.wang 发布时间:2023-11-10 21:13:48
标签:python,分布,式,编程
分布式编程的难点在于:
1.服务器之间的通信,主节点如何了解从节点的执行进度,并在从节点之间进行负载均衡和任务调度;
2.如何让多个服务器上的进程访问同一资源的不同部分进行执行
第一部分涉及到网络编程的底层细节
第二个问题让我联想到hdfs的一些功能。
首先分布式进程还是解决的是单机单进程无法处理的大数据量大计算量的问题,希望能加通过一份代码(最多主+从两份)来并行执行一个大任务。
这就面临两个问题,首先将程序分布到多台服务器,其次将输入数据分配给多台服务器。
第一个问题相对比较简单,毕竟程序一般不会太长,即便是超级jar包的spark程序,也不过百兆。
但数据里不同,如今企业级别的数据动辄GB、TB,如果在分布式程序执行之前首先要进行大容量数据的转移,显然是不可取的。
这时候我们就需要一个中央共享数据源,所有服务器都可以对这个数据源进行并行存取(块block),这就已经非常接近hdfs的功能。
因为在hdfs中,集群中的多台服务器共享同一个hdfs,每台机器访问hdfs就像访问本地数据一样(还是稍微慢一点);
计算任务执行完之后,每台服务器还可以将自己的计算结果写回hdfs,每台服务器的结果被存储成了结果目录中的小文件。
# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')
来源:https://www.cnblogs.com/wangbin2188/p/9923190.html


猜你喜欢
- 想弄个截屏工具,整理一下学生错题什么的,原来用的方法是:先运行QQ,再监听键盘热键(“ctrl+alt+a”)。后来发现有些问题:需要先上Q
- 一.雅黑设计理念 雅黑字体是为微软公司设计的屏幕显示汉字。它具有个性独特、结体优美、识别性强、块状效果好、显示清晰等优点。在当今数字化时代更
- Python程序中,在进程和进程之间是不共享全局变量的数据的。我们来看一个例子:from multiprocessing import Pr
- 数据合并是数据处理过程中的必经环节,pandas作为数据分析的利器,提供了四种常用的数据合并方式,让我们看看如何使用这些方法吧!1.conc
- 第一次碰到这个问题的时候,确实不知道该怎么办,后来请教了一个大神,加上自己的理解,才了解是什么意思,这个东西写python的会经常用到,而且
- 在找工作的时候,我们会选择上网查询招聘的信息,或者是通过一些招聘会进行现场面试。但由于信息更新不及时,有一些岗位会出现下架的情况,如果我们不
- 这个自定义损失函数的背景:(一般回归用的损失函数是MSE, 但要看实际遇到的情况而有所改变)我们现在想要做一个回归,来预估某个商品的销量,现
- 学习目标:对二值图进行分析,设定最大最小面积区间保留该面积区间内的区域示例代码//src为二值图,minArea、maxArea为面积阈值,
- 一、pytorch finetuning 自己的图片进行训练这种读取图片的方式用的是torch自带的 ImageFolder,读取的文件夹必
- 今天在给一个img加链接时发现的<body> <a href="#1" >&
- 一、性能度量性能度量目的是对学习期的泛华能力进行评估,性能度量反映了任务需求,在对比不同算法的泛华能力时,使用不同的性能度量往往会导致不同的
- 目录问题注意总结问题如何在一张表上对多个表进行外键关联from django.db import modelsclass Appliance
- 问题提出最近,使用tqdm模块,对于大文件的阅读进行进度监控。然而我发现有个问题,即在tqdm模块使用一定没错的情况下,进度条死活打印不出来
- 前言:二分法也就是二分查找,它是一种效率较高的查找方法假如公司新来了一个人,叫张三,他是你们公司第47个人,过了一段时间后,有些人呢看张三不
- mysql 误删除ibdata1之后如何恢复如果误删除了在线服务器中mysql innodb相关的数据文件ibdata1以及日志文件 ib_
- 某日,一同学给小的发了 Github 源码,说是可以轻松查到删除自己的微信好友,于是就开始了作死之路。Github 源码请看:0x5e/we
- 前言官方手册:https://dev.mysql.com/doc/refman/5.7/en/server-logs.html不管是哪个数据
- 在dreamweaver4中,你可以存储你自己设定的图片,链接,flash影片,颜色表,模板等等,组成这个站点的资产,这就是Assets面板
- #coding:utf8import reimport urllibdef getHTML(url):
- 函数的作用域python中的作用域分4种情况:L:local,局部作用域,即函数中定义的变量;E:enclosing,嵌套的父级函数的局部作