Python使用Redis实现作业调度系统(超简单)
作者:kongxx 发布时间:2023-04-22 03:57:59
概述
Redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的Web应用程序的完美解决方案。
Redis从它的许多竞争继承来的三个主要特点:
Redis数据库完全在内存中,使用磁盘仅用于持久性。
相比许多键值数据存储,Redis拥有一套较为丰富的数据类型。
Redis可以将数据复制到任意数量的从服务器。
Redis 优势
异常快速:Redis的速度非常快,每秒能执行约11万集合,每秒约81000+条记录。
支持丰富的数据类型:Redis支持最大多数开发人员已经知道像列表,集合,有序集合,散列数据类型。这使得它非常容易解决各种各样的问题,因为我们知道哪些问题是可以处理通过它的数据类型更好。
操作都是原子性:所有Redis操作是原子的,这保证了如果两个客户端同时访问的Redis服务器将获得更新后的值。
多功能实用工具:Redis是一个多实用的工具,可以在多个用例如缓存,消息,队列使用(Redis原生支持发布/订阅),任何短暂的数据,应用程序,如Web应用程序会话,网页命中计数等。
步入主题:
Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。
下面是实现上的想法
MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。
MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。
channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。
Master代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MyMaster():
def __init__(self):
pass
def start(self):
MyServerResultHandleThread().start()
MyServerDispatchThread().start()
class MyServerDispatchThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
for i in range(1, 100):
channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))
print("Dispatch job %s to %s" % (str(i), channel))
ret = r.publish(channel, str(i))
if ret == 0:
print("Dispatch job %s failed." % str(i))
time.sleep(5)
class MyServerResultHandleThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(CHANNEL_RESULT)
for message in p.listen():
if message['type'] != 'message':
continue
print("Received finished job %s" % message['data'])
if __name__ == "__main__":
MyMaster().start()
time.sleep(10000)
说明
MyMaster类 - master主程序,用来启动dispatch和resulthandler的线程
MyServerDispatchThread类 - 派发作业线程,产生作业并派发到计算节点
MyServerResultHandleThread类 - 作业运行结果处理线程,从channel里获取作业结果并显示
Slave代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MySlave():
def __init__(self):
pass
def start(self):
for i in range(1, 4):
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()
class MyJobWorkerThread(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.channel = channel
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(self.channel)
for message in p.listen():
if message['type'] != 'message':
continue
print("%s: Received dispatched job %s " % (self.channel, message['data']))
print("%s: Run dispatched job %s " % (self.channel, message['data']))
time.sleep(2)
print("%s: Send finished job %s " % (self.channel, message['data']))
ret = r.publish(CHANNEL_RESULT, message['data'])
if ret == 0:
print("%s: Send finished job %s failed." % (self.channel, message['data']))
if __name__ == "__main__":
MySlave().start()
time.sleep(10000)
说明
MySlave类 - slave节点主程序,用来启动MyJobWorkerThread的线程
MyJobWorkerThread类 - 从channel里获取派发的作业并将运行结果发送回master
测试
首先运行MySlave来定义派发作业channel。
然后运行MyMaster派发作业并显示执行结果。
有关Python使用Redis实现作业调度系统(超简单),小编就给大家介绍这么多,希望对大家有所帮助!


猜你喜欢
- SQLserver 2014 AlwaysOn增强了原有的数据库镜像功能,使得先前的单一数据库故障转移变成以组(多个数据)为单位的故障转移。
- 1. 数组数组是 Golang 中的一种基本数据类型,用于存储固定数量的同类型元素。在 Golang 中,数组的长度是固定的,并且必须在定义
- 一、简介Locust 是一个易于使用,分布式,用户负载测试工具。它用于负载测试 web 站点(或其他系统),并计算出一个系统可以处理多少并发
- 本文为大家分享了Python实现自己下载音乐统计的具体代码,供大家参考,具体内容如下今天看Python实例,学习了如何对文件进行操作,突然想
- 围绕一门语言,学习它的文化精髓,能让你成为一名更优秀的程序员。如果你还没读过Python之禅(Zen of Python) ,那么打开Pyt
- 问:假如我的一个表里含有(a,b,c,d)和(a,b)形成组合键。我能在列值中写这个查询吗?例如: select a,c,d from my
- 今天因工作需要写了个小程序,用于在图片集中自动抽取需要的照片。该程序只是实现了基本功能,还有很多需要完善的地方,展示出来算是给自己鼓鼓气吧。
- 1、root函数格式root()功能描述返回一个路径串变量应用代码'sample string = c:\intels\jingca
- 本文实例分析了PHP中怎样防止SQL注入。分享给大家供大家参考。具体分析如下:一、问题描述:如果用户输入的数据在未经处理的情况下插入到一条S
- 最近做了一个项目其中有项目需求涉及到访问控制,在访问需要登录才能使用的页面或功能时,会弹出登录框:效果如下: 图 1-点击用户名时,如未登录
- 一、django的模板:在settings.py的文件中可以看到并设置这个模板。1.直接映射:通过建立的文件夹(templates)和文件(
- 故事的开始:.count()假设你有一个Notification Model类,保存的主要是所有的站内通知:class Notificati
- Dump ALL MySQL Databasesmysqldump --user=XXXXXXXX --password=XXXXXXX -
- import上一级目录的模块python中,import module会去sys.path搜索,sys.path是个列表,并且我们可以动态修
- 一、提前准备为了大家学习方便,我在这里面建立两张表并为其添加一些数据。一张水果表,一张供应商表。水果表 fruits表f_idf_namef
- For 循环可以遍历字符串,也可以遍历列表# for 循环# 语法特点:遍历操作,依次取集合容器中的几个值# for 临时变量
- 目的工作中遇到一个需求,通过需要通过网站查询船舶名称得到MMSI码,网站来自船讯网。分析请求根据以往爬虫的经验,打开F12,通过输入船舶名称
- 业务的开发时候有一个需求,需要对比当前时间段和去年同星期的时间段的数据,例如当前时间是2019-04-11,是今年的第十五周的周四,如何去取
- 一、Tensorlow结构import tensorflow as tfimport numpy as np#创建数据x_data = np
- MySQL的索引类型有普通索引(normal),唯一索引(unique)和全文索引(full text),合理使用索引可大大提升数据库的查询