Python定时从Mysql提取数据存入Redis的实现
作者:wzqnls 发布时间:2024-01-22 01:31:39
标签:Python,Mysql,Redis
设计思路:
1.程序一旦run起来,python会把mysql中最近一段时间的数据全部提取出来
2.然后实例化redis类,将数据简单解析后逐条传入redis队列
3.定时器设计每天凌晨12点开始跑
ps:redis是个内存数据库,做后台消息队列的缓存时有很大的用处,有兴趣的小伙伴可以去查看相关的文档。
# -*- coding:utf-8 -*-
import MySQLdb
import schedule
import time
import datetime
import random
import string
import redis
# get the data from mysql
class FromSql(object):
def __init__(self, conn):
self.conn = conn
def acquire(self):
cursor = self.conn.cursor()
try:
sql = "SELECT * FROM test WHERE TO_DAYS(NOW()) - TO_DAYS(t) <= 1"
cursor.execute(sql)
rs = cursor.fetchall()
#print (rs)
for eve in rs:
print('%s, %s, %s, %s' % eve)
copy_rs = rs
cursor.close()
return copy_rs
except Exception as e:
print("The error: %s" % e)
class RedisQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db= redis.Redis(**redis_kwargs)
self.key = '%s:%s' %(namespace, name)
def qsize(self):
return self.__db.llen(self.key)
def put(self, item):
self.__db.rpush(self.key, item)
def get(self, block=True, timeout=None):
if block:
item = self.__db.blpop(self.key, timeout=timeout)
else:
item = self.__db.lpop(self.key)
if item:
item = item[1]
return item
def get_nowait(self):
return self.get(False)
if __name__ == "__main__":
# connect mysqldb
conn_sql = MySQLdb.connect(
host = '127.0.0.1',
port = 3306,
user = 'root',
passwd = '',
db = 'test',
charset = 'utf8'
)
def job_for_redis():
get_data = FromSql(conn_sql)
data = get_data.acquire()
q = RedisQueue('test',host='localhost', port=6379, db=0)
for single_data in data:
for meta_data in single_data:
q.put(meta_data)
print(meta_data)
print("All data had been inserted.")
"""
try:
schedule.every().day.at("00:00").do(job_for_redis)
except Exception as e:
print('Error: %s'% e)
# finally:
# conn.close()
while True:
schedule.run_pending()
time.sleep(1)
"""
补充知识:python定时获取汇率存入数据库
python定时任务:
我们可以使用 轻量级的第三方模块schedule。首先先安装:pip install schedule
定时任务的的小测试:
import schedule
import time
def job():
print("I'm working...")
schedule.every(10).minutes.do(job) # 每隔10分钟执行一次任务
schedule.every().hour.do(job) # 每隔一小时执行一次任务
schedule.every().day.at("10:30").do(job) # 每天10:30执行一次任务
schedule.every(5).to(10).days.do(job) # 每5-10天执行一次任务
schedule.every().monday.do(job) # 每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(job) # 每周三13:15执行一次任务
while True:
schedule.run_pending()
获取数据存入数据库:(格式可能不太对,还有一些符号。自己修改一下即可)
import pymysql
import schedule
import time
import requests
import pandas
from sqlalchemy import create_engine
#获取美元的所有外汇
def job():
content = '美元'
url = 'http://www.boc.cn/sourcedb/whpj/index.html' #外汇数据地址
html = requests.get(url).content.decode('utf-8')
index = html.index('<td>' + content + '</td>')
str = html[index:index+300]
result = re.findall('<td>(.*?)</td>',str)
print("币种:" + result[0])
print("现汇买入价:" + result[1])
print("现钞买入价:" + result[2])
print("现汇卖出价:" + result[3])
print("现钞卖出价:" + result[4])
print("中行结算价:" + result[5])
print("发布时间:" + result[6] + ' ' + result[7])
#本地地址 数据库账号 密码 数据库名
db = pymysql.connect('localhost','root','root','pinyougoudb')
cursor = db.cursor()
#sql语句
sql = "update tb_money set huiBuy = %s,chaoBuy = %s,huiSale = %s,chaoSale = %s,centerResult= %s,publishTime = '%s' where typeId = '%s'" % (result[1], result[2], result[3], result[4], result[5], result[6] + ' ' + result[7], result[0])
cursor.execute(sql)
db.commit()
print('success')
# 查询语句,将存入的数据查出来
# sqlalchemy 进行数据库初始化
engine = create_engine('mysql+pymysql://root:root@localhost:3306/pinyougoudb')
sql = '''select * from tb_money'''
# pandas 进行数据库读写
df = pandas.read_sql_query(sql,engine)
print(df)
db.commit()
# 每隔几分中刷新一次
#schedule.every(0.1).minutes.do(job)
#每天什么时候刷新
schedule.every().day.at("09:29").do(job)
schedule.every().day.at("09:30").do(job)
#一直循环 知道满足条件执行
while True:
schedule.run_pending()
来源:https://blog.csdn.net/wzqnls/article/details/51992693


猜你喜欢
- 很喜欢Python这门语言。在看过语法后学习了Django 这个 Web 开发框架。算是对 Python 有些熟悉了。不过对里面很多东西还是
- 我最近也在研究MySQL性能优化的路上,那么今天也算个学习笔记吧!在小伙伴们开发的项目中,对于MySQL排查问题找出性能瓶颈来说,最容易发现
- 1,exists和in的理解(参考https://www.jb51.net/article/28922.htm) exists:如果子查询中
- 前言在前边的几篇文章中已经基本分享完了编译器前端的一些工作,后边的几篇主要是关于编译器对抽象语法树进行分析和重构,然后完成一系列的优化,其中
- Python爬虫可以爬取的东西有很多,Python爬虫怎么学?简单的分析下:如果你仔细观察,就不难发现,懂爬虫、学习爬虫的人越来越多,一方面
- View in Browser功能不生效问题安装玩view in browser插件后,在文档中点击邮件的view in browser 不
- 科讯5.0 标签和之前版本变化不大,如果用老版本的科讯,可以参考这个标签使用。相关文章:新云4.0 模板通用标签说明 标签清单:======
- 今天,我们使用面部标记和 OpenCV 检测视频流中的眨眼次数。为了构建我们的眨眼检测器,我们将计算一个称为眼睛纵横比 (EAR) 的指标,
- 什么是 PiniaPinia (西班牙语中的菠萝),本质上依然是一个状态管理的库,用于跨组件、页面进行状态共享.pinia 与 vuex 的
- 最近需要用python根据收集到的数据进行绘图,决定使用rrd数据库,然后配合rrdtool来绘图,故学习一下rrdtool的用法。用法如下
- 在平时开发过程中,经常遇到需要在数据中获取特定的元素的信息,如到达目的地最近的车站,橱窗里面最贵的物品等等。怎么办?看下面方法一: 利用数组
- 库的管理1、库的管理创建、修改、删除1、库的创建CREATE DATABASE UF NOT EXISTS books;2、库的修改库名一般
- 这个javascript划词搜索功能,在很多地方我们都会看到,特别是装了浏览器插件的时候,当我们用鼠标选择一段文字的时候,就会出现搜索提示,
- 本文实例讲述了微信小程序列表渲染功能之列表下拉刷新及上拉加载的实现方法。分享给大家供大家参考,具体如下:微信小程序为2017年1月9日打下了
- 前言随时随地发现新鲜事!微博带你欣赏世界上每一个精彩瞬间,了解每一个幕后故事。分享你想表达的,让全世界都能听到你的心声!今天我们通过pyth
- 前言:Python内置对SMTP的支持,可以发送纯文本邮件、HTML邮件以及带附件的邮件。Python对SMTP支持有smtplib和ema
- 详解微信小程序中的页面代码中的模板的封装 最近在进行微信小程序中的页面开发,其实在c++或者说是js中都
- 1.预备知识python语言,scrapy爬虫基础,json模块,flask后端2.抓取CSDN数据接口使用谷歌抓包工具抓取CSDN搜索引擎
- 思路利用scapy库,在这个库的基础下能够做很多的事情,python读取pcap包网上一找一大把将读取出来的pcap包改一个名字,然后写回,
- 如果不配置.gitignore的文件,带push代码的时候就会把一写不必要的文件push到远程仓库,如.idea文件。如果不小心出现此文件在