Python自定义主从分布式架构实例分析
作者:RQSLT 发布时间:2022-09-24 01:28:29
标签:Python,主从分布式,架构
本文实例讲述了Python自定义主从分布式架构。分享给大家供大家参考,具体如下:
环境:Win7 x64,Python 2.7,APScheduler 2.1.2。
原理图如下:
代码部分:
(1)、中心节点:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 中心节点(主要功能是分配任务)
import SocketServer, socket, Queue
CenterIP = '127.0.0.1' #中心节点IP
CenterListenPort = 9999 #中心节点监听端口
CenterClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #中心节点用于发送网络消息的socket
TaskQueue = Queue.Queue() #任务队列
#获取任务队列
def GetTaskQueue():
for i in range(1, 11):
TaskQueue.put(str(i))
#CenterServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print(data)
if data.startswith('wait'):
vec = data.split(':')
if len(vec) != 3:
print('Error: len(vec) != 3')
else:
nodeIP = vec[1]
nodeListenPort = vec[2]
nodeID = nodeIP + ':' + nodeListenPort
if not TaskQueue.empty():
task = TaskQueue.get()
print('send task ' + task + ' to ' + nodeID)
CenterClient.sendto('task:' + task, (nodeIP, int(nodeListenPort)))
else:
print('TaskQueue is empty!')
GetTaskQueue() #获取任务队列
CenterServer = SocketServer.UDPServer((CenterIP, CenterListenPort), MyUDPHandler)
print('Listen port ' + str(CenterListenPort) + ' ...')
CenterServer.serve_forever()
(2)、任务节点:
#encoding=utf-8
#author: walker
#date: 2014-12-03
#function: 任务节点(请求/接收/执行任务)
import time, socket, SocketServer
from apscheduler.scheduler import Scheduler
CenterIP = '127.0.0.1' #中心节点IP
CenterListenPort = 9999 #中心节点监听端口
NodeIP = socket.gethostbyname(socket.gethostname()) #任务节点自身IP
NodeClient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #任务节点用于发送网络消息的socket
#任务:发送网络信息
def jobSendNetMsg():
msg = ''
if NodeServer.TaskState == 'wait':
msg = 'wait:' + NodeIP + ':' + str(NodeListenPort)
elif NodeServer.TaskState == 'exec':
msg = 'exec:' + NodeIP + ':' + str(NodeListenPort)
print(msg)
NodeClient.sendto(msg, (CenterIP, CenterListenPort))
#添加并启动定时任务
def InitTimer():
sched = Scheduler()
sched.add_interval_job(jobSendNetMsg, seconds=1)
sched.start()
#执行任务
def ExecTask(task):
print('ExecTask ' + task + ' ...')
time.sleep(2)
print('ExecTask ' + task + ' over')
#NodeServer的回调函数,在接受到udp报文是触发
class MyUDPHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0].strip()
socket = self.request[1]
print('recv data: ' + data)
if data.startswith('task'):
vec = data.split(':')
if len(vec) != 2:
print('Error: len(vec) != 2')
else:
task = vec[1]
self.server.TaskState = 'exec'
ExecTask(task)
self.server.TaskState = 'wait'
InitTimer()
NodeServer = SocketServer.UDPServer(('', 0), MyUDPHandler)
NodeServer.TaskState = 'wait' #(exec/wait)
NodeListenPort = NodeServer.server_address[1]
print('NodeListenPort:' + str(NodeListenPort))
NodeServer.serve_forever()
希望本文所述对大家Python程序设计有所帮助。
0
投稿
猜你喜欢
- <?php $path = "D:\\in.txt"; try //检测异常 { file_open($path)
- 前言在做项目的时候一些配置文件都会写在settings配置文件中,今天在研究"州的先生"开源文档写作系统-MrDoc的时
- 窗口标题pygame.display.set_caption(title, icontitle=None)'''•
- 正态分布(Normal distribution)又成为高斯分布(Gaussian distribution)若随机变量X服从一个数学期望为
- yaml简介1.yaml [ˈjæməl]: Yet Another Markup Language :另一种标记语言。
- 介绍psutil能够轻松实现获取系统运行的进程和系统利用率。导入模块import psutils获取系统性能信息CPU信息使用cpu_tim
- 在Python所有的数据结构中,list具有重要地位,并且非常的方便,这篇文章主要是讲解list列表的高级应用,基础知识可以查看博客。 此文
- 高动态范围成像一、引言如今,大多数数字图像和成像设备每通道使用 8 位整数表示灰度,因此将设备的动态范围限制在两个数量级(实际上是 256
- 区块链区块链是在计算机网络的节点之间共享数据的分类账(分布式数据库)。作为数据库,区块链以电子格式储存信息。区块链的创新之处在于它保证了数据
- 这个javascript划词搜索功能,在很多地方我们都会看到,特别是装了浏览器插件的时候,当我们用鼠标选择一段文字的时候,就会出现搜索提示,
- 一、merge(合并)的语法:pd.merge(left, right, how='inner', on=None, lef
- 看到有人用的PJBlog使用的是自动填写验证码,这样其实也不使用验证码基本上没有什么区别,很容易被 * ,因此在参照许多修改的基础上,找到
- EF Core 是一个ORM(对象关系映射),它使 .NET 开发人员可以使用 .NET对象操作数据库,避免了像ADO.NET访问数据库的代
- 学习目的:掌握下拉列表框的用法,并理解AutoPostBack属性; 理解IsPoskBack及用法; 初识DataTable的
- 本文实例讲述了Flask框架各种常见装饰器。分享给大家供大家参考,具体如下:效果类似django的process_request的装饰器@a
- datetime模块用于是date和time模块的合集,datetime有两个常量,MAXYEAR和MINYEAR,分别是9999和1.da
- 本文用于利用Pytorch实现神经网络的分类!!!1.训练神经网络分类模型import torchfrom torch.autograd i
- 一、先来看看效果接口请求返回的数据:二、官方登录流程图三、小程序登录流程梳理:1、小程序端调用wx.login2、判断用户是否授权3、小程序
- 一、数字类型内置方法1.1 整型的内置方法作用描述年龄、号码、id号定义方式x = 10x = int('10')x = i
- defer介绍defer是golang的一个特色功能,被称为“延迟调用函数”。当外部函数返回后执行defer。类似于其他语言的 try… c