Python使用Redis实现作业调度系统(超简单)
作者:kongxx 发布时间:2023-04-22 03:57:59
概述
Redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的Web应用程序的完美解决方案。
Redis从它的许多竞争继承来的三个主要特点:
Redis数据库完全在内存中,使用磁盘仅用于持久性。
相比许多键值数据存储,Redis拥有一套较为丰富的数据类型。
Redis可以将数据复制到任意数量的从服务器。
Redis 优势
异常快速:Redis的速度非常快,每秒能执行约11万集合,每秒约81000+条记录。
支持丰富的数据类型:Redis支持最大多数开发人员已经知道像列表,集合,有序集合,散列数据类型。这使得它非常容易解决各种各样的问题,因为我们知道哪些问题是可以处理通过它的数据类型更好。
操作都是原子性:所有Redis操作是原子的,这保证了如果两个客户端同时访问的Redis服务器将获得更新后的值。
多功能实用工具:Redis是一个多实用的工具,可以在多个用例如缓存,消息,队列使用(Redis原生支持发布/订阅),任何短暂的数据,应用程序,如Web应用程序会话,网页命中计数等。
步入主题:
Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。
下面是实现上的想法
MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。
MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。
channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。
Master代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MyMaster():
def __init__(self):
pass
def start(self):
MyServerResultHandleThread().start()
MyServerDispatchThread().start()
class MyServerDispatchThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
for i in range(1, 100):
channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))
print("Dispatch job %s to %s" % (str(i), channel))
ret = r.publish(channel, str(i))
if ret == 0:
print("Dispatch job %s failed." % str(i))
time.sleep(5)
class MyServerResultHandleThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(CHANNEL_RESULT)
for message in p.listen():
if message['type'] != 'message':
continue
print("Received finished job %s" % message['data'])
if __name__ == "__main__":
MyMaster().start()
time.sleep(10000)
说明
MyMaster类 - master主程序,用来启动dispatch和resulthandler的线程
MyServerDispatchThread类 - 派发作业线程,产生作业并派发到计算节点
MyServerResultHandleThread类 - 作业运行结果处理线程,从channel里获取作业结果并显示
Slave代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime
import time
import threading
import random
import redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0
CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
CHANNEL_RESULT = 'CHANNEL_RESULT'
class MySlave():
def __init__(self):
pass
def start(self):
for i in range(1, 4):
MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()
class MyJobWorkerThread(threading.Thread):
def __init__(self, channel):
threading.Thread.__init__(self)
self.channel = channel
def run(self):
r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
p = r.pubsub()
p.subscribe(self.channel)
for message in p.listen():
if message['type'] != 'message':
continue
print("%s: Received dispatched job %s " % (self.channel, message['data']))
print("%s: Run dispatched job %s " % (self.channel, message['data']))
time.sleep(2)
print("%s: Send finished job %s " % (self.channel, message['data']))
ret = r.publish(CHANNEL_RESULT, message['data'])
if ret == 0:
print("%s: Send finished job %s failed." % (self.channel, message['data']))
if __name__ == "__main__":
MySlave().start()
time.sleep(10000)
说明
MySlave类 - slave节点主程序,用来启动MyJobWorkerThread的线程
MyJobWorkerThread类 - 从channel里获取派发的作业并将运行结果发送回master
测试
首先运行MySlave来定义派发作业channel。
然后运行MyMaster派发作业并显示执行结果。
有关Python使用Redis实现作业调度系统(超简单),小编就给大家介绍这么多,希望对大家有所帮助!
猜你喜欢
- SQL Server 2005的新功能为动态管理对象,它们是在指定时间返回某个数据库实例的特殊状态信息的数据库视图或函数。这些对象允许数据库
- 这篇文章主要介绍了微信小程序顶部导航栏可滑动并选中放大,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋
- 在我们设计网页的时候,总会遇到一些不愉快的事情,最常见的莫过于在后台添加内容后才发现显示的页面被撑开,导致网页极度不美观。以前大
- 持续更新一些常用的Tensor操作,比如List,Numpy,Tensor之间的转换,Tensor的拼接,维度的变换等操作。其它Tensor
- 本文实例讲述了Python中list以及list与array的相互转换实现方法。分享给大家供大家参考,具体如下:python中的list是一
- 问题你想定义跟actor模式中类似“actors”角色的任务解决方案actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。 事实
- asp抓取网页。偶要实现实实更新天气预报。利用了XMLHTTP组件,抓取网页的指定部分。很多小偷查询都是使用这个方法来实现的。需要分件htm
- 今天有个服务器速度变得非常慢,用工具一看,IIS堵塞进程达到100多个,肯定有问题,下面进入排查(聚友习惯把过程写出来,如果不需要过程,请直
- 功能1: 爬取西拉ip代理官网上的代理ip环境:python3.8+pycharm库:requests,lxml浏览器:谷歌IP地址:htt
- 第一列按照goodsid局部分组,然后在分组后的记录中按照audittime升序排序得到序号,从而显示某商品得第几次变迁。 第二列是取该商品
- JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,它基于ECMAScript的一个子集。 JSON
- 背景:今天同事写代码,用python读取一个四五百兆的文件,然后做一串逻辑上很直观的处理。结果处理了一天还没有出来结果。问题出在哪里呢?解决
- 而随着设备硬件配置的不断提升,对中小型应用程序来说,对算法的空间复杂度的要求也宽松了不少。不过,在当今 Web2.0 时代,对应用程序的时间
- 前言在做项目的时候一些配置文件都会写在settings配置文件中,今天在研究"州的先生"开源文档写作系统-MrDoc的时
- 前言:前面提到了Python中的数值型内置数据类型,接下来呢我们就着重介绍一下字符串类型。在Python中字符串是一个有序的字符集合,没有独
- 看下面的例子就会明白了: print '|','*'.ljust(10),'|' print
- python中不存在所谓的传值调用,一切传递的都是对象的引用,也可以认为是传址。一、可变对象和不可变对象Python在heap中分配的对象分
- 本文实例讲述了Django框架静态文件使用/中间件/禁用ip功能。分享给大家供大家参考,具体如下:静态文件一、静态文件的使用静态文件:网页中
- 一、算术运算符运算符+-*/%**(幂)求次方//(取整除,向下取整)如:9//2 =4二、比较运算符运算符==!=<>(不等于
- 1.函数array()功能:创建一个数组变量格式:array(list)参数:list为数组变量中的每个数值列,中间用逗号间隔例子:<