用python简单实现mysql数据同步到ElasticSearch的教程
作者:jingxian 发布时间:2024-01-15 14:40:39
标签:python,mysql,同步,ElasticSearch
之前博客有用logstash-input-jdbc同步mysql数据到ElasticSearch,但是由于同步时间最少是一分钟一次,无法满足线上业务,所以只能自己实现一个,但是时间比较紧,所以简单实现一个
思路:
网上有很多思路用什么mysql的binlog功能什么的,但是我对mysql了解实在有限,所以用一个很呆板的办法查询mysql得到数据,再插入es,因为数据量不大,而且10秒间隔同步一次,效率还可以,为了避免服务器之间的时间差和mysql更新和查询产生的时间差,所以在查询更新时间条件时是和上一次同步开始时间比较,这样不管数据多少,更新耗时多少都不会少数据,因为原则是同步不漏掉任何数据,也可以程序多开将时间差和间隔时间差异化,因为用mysql中一个id当作es中的id,也避免了重复数据
使用:
只需要按照escongif.py写配置文件,然后写sql文件,最后直接执行mstes.py就可以了,我这个也是参考logstash-input-jdbc的配置形式
MsToEs
|----esconfig.py(配置文件)
|----mstes.py(同步程序)
|----sql_manage.py(数据库管理)
|----aa.sql(需要用到sql文件)
|----bb.sql(需要用到sql文件)
sql_manage.py:
# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# 用于不需要回滚和提交的操作
def find(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as e:
print(traceback.format_exc())
print(str(e))
return traceback.format_exc()
finally:
self.session.close()
return wrapper
class MysqlManager(object):
def __init__(self):
mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
pool_recycle=3600)
# self.DB_Session = sessionmaker(bind=self.engine)
# self.session = self.DB_Session()
self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
self.db = scoped_session(self.DB_Session)
self.session = self.db()
@find
def select_all_dict(self, sql, keys):
a = self.session.execute(sql)
a = a.fetchall()
lists = []
for i in a:
if len(keys) == len(i):
data_dict = {}
for k, v in zip(keys, i):
data_dict[k] = v
lists.append(data_dict)
else:
return False
return lists
# 关闭
def close(self):
self.session.close()
aa.sql:
select
CONVERT(c.`id`,CHAR) as id,
c.`code` as code,
c.`project_name` as project_name,
c.`name` as name,
date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time,
from `cc` c
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
bb.sql:
select
CONVERT(c.`id`,CHAR) as id,
CONVERT(c.`age`,CHAR) as age,
c.`code` as code,
c.`name` as name,
c.`project_name` as project_name,
date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time,
from `bb` c
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
esconfig.py:
# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql 文件名与es中的type名一致
mysql = {
# mysql连接信息
"mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
# sql文件信息
"statement_filespath":[
# sql对应的es索引和es类型
{
"index":"a1",
"sqlfile":"aa.sql",
"type":"aa"
},
{
"index":"a1",
"sqlfile":"bb.sql",
"type":"bb"
},
],
}
# es的ip和端口
elasticsearch = {
"hosts":"127.0.0.1:9200",
}
# 字段顺序与sql文件字段顺序一致,这是存进es中的字段名,这里用es的type名作为标识
db_field = {
"aa":
("id",
"code",
"name",
"project_name",
"update_time",
),
"bb":
("id",
"code",
"age",
"project_name",
"name",
"update_time",
),
}
es_config = {
# 间隔多少秒同步一次
"sleep_time":10,
# 为了解决服务器之间时间差问题
"time_difference":3,
# show_json 用来展示导入的json格式数据,
"show_json":False,
}
mstes.py:
# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
def __init__(self):
try:
# 是否展示json数据在控制台
self.show_json = es_config.get("show_json")
# 间隔多少秒同步一次
self.sleep_time = es_config.get("sleep_time")
# 为了解决同步时数据更新产生的误差
self.time_difference = es_config.get("time_difference")
# 当前时间,留有后用
self.datetime_now = ""
# es的ip和端口
es_host = elasticsearch.get("hosts")
# 连接es
self.es = Elasticsearch(es_host)
# 连接mysql
self.mm = MysqlManager()
except :
print(traceback.format_exc())
def tongbu_es_mm(self):
try:
# 同步开始时间
start_time = time.time()
print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
# 这个list用于批量插入es
actions = []
# 获得所有sql文件list
statement_filespath = mysql.get("statement_filespath",[])
if self.datetime_now:
# 当前时间加上时间差(间隔时间加上执行同步用掉的时间,等于上一次同步开始时间)再字符串格式化
# sql中格式化时间时年月日和时分秒之间不能空格,不然导入es时报解析错误,所以这里的时间格式化也统一中间加一个T
self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
else:
self.datetime_now = "1999-01-01T00:00:00"
if statement_filespath:
for filepath in statement_filespath:
# sql文件
sqlfile = filepath.get("sqlfile")
# es的索引
es_index = filepath.get("index")
# es的type
es_type = filepath.get("type")
# 读取sql文件内容
with open(sqlfile,"r") as opf:
sqldatas = opf.read()
# ::datetime_now是一个自定义的特殊字符串用于增量更新
if "::datetime_now" in sqldatas:
sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
else:
sqldatas = sqldatas
# es和sql字段的映射
dict_set = db_field.get(es_type)
# 访问mysql,得到一个list,元素都是字典,键是字段名,值是数据
db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
if db_data_list:
# 将数据拼装成es的格式
for db_data in db_data_list:
action = {
"_index": es_index,
"_type": es_type,
"@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
"_source": db_data
}
# 如果没有id字段就自动生成
es_id = db_data.get("id", "")
if es_id:
action["_id"] = es_id
# 是否显示json再终端
if self.show_json:
print(action)
# 将拼装好的数据放进list中
actions.append(action)
# list不为空就批量插入数据到es中
if len(actions) > 0 :
helpers.bulk(self.es, actions)
except Exception as e:
print(traceback.format_exc())
else:
end_time = time.time()
print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
self.time_difference = end_time-start_time
finally:
# 报错就关闭数据库
self.mm.close()
def main():
tb = TongBu()
# 间隔多少秒同步一次
sleep_time = tb.sleep_time
# 死循环执行导入数据,加上时间间隔
while True:
tb.tongbu_es_mm()
time.sleep(sleep_time)
if __name__ == '__main__':
main()
0
投稿
猜你喜欢
- 情人节快乐!这个节日怎么会少了浪漫的玫瑰花!用Python的turtle库绘图是很简单的,画了一个玫瑰花,下面奉上源码:源码:'
- 背景测试的出现是为了避免项目中出现重大事故测试是避免事故的最后一道屏障测试单元测试的覆盖率在一定程度上而言,决定了代码的质量单元测试通过测试
- DECLARE @LocalDate DATETIME, @UTCDate DATETIME, @LocalDate2 DATETIME S
- django的表单系统,分2种基于django.forms.Form的所有表单类的父类基于django.forms.ModelForm,可以
- 1.统计我们可以使用date_format()函数格式化时间,然后进行分组操作例如有一个学生表,结构如下idnameageheightgen
- 如何做一个只搜索本网站的引擎? 用下面两个文件即可实现:searchfiles.html &l
- 如下所示:File–>Settings–>Editor–> Color Scheme–>Language Defau
- HTML代码: <div class="float" id="float"> 我是个腼腆
- jQuery 将马上发布 1.4 正式版,代码也从 googlecode 上迁移到了 github. jQuery 是我接触的第一个 JS
- 1.下载dlib安装包安装dlib真是费劲,dlib下载地址:http://dlib.net/files/我下载的是dlib-19.14.z
- django restframework 导入excel内容,可以查看另外一篇文章一、基础环境web架构:前后端分离,前端使用vue,后端使
- 全局变量与局部变量# num1是全局变量num1 = 1# num2是局部变量def func():num2 = 2在函数外(且不在函数里)
- tf.nn.bidirectional_dynamic_rnn()函数:def bidirectional_dynamic_rnn( &nb
- 本文实例讲述了JS生成一维码(条形码)功能的方法。分享给大家供大家参考,具体如下:1、js代码:(function() { if (!exp
- context 有什么作用context 主要用来在goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。
- 字典dict1 = {'name':'han','age':18,'class
- 只能输入中文/** * 22.验证汉字 * 表达式 ^[\u4e00-\u9fa5]{0,}$ * 描述 只能汉字 * 匹配的例子 清清月儿
- 这是一条颠覆常规的插入方法,一条INSERT语句可以完成向多张表的插入任务。小小地展示一下这种插入方法。1.创建表T并初始化测试数据,此表作
- 你知道吗?实际上Python早在20世纪90年代初就已经诞生,可是火爆时间却并不长,就小编本人来说,也是前几年才了解到它。据统计,目前Pyt
- 窗口背景主要包括,背景色与背景图片,设置窗口背景有三种方法使用QSS设置窗口背景使用QPalette设置窗口背景实现PainEvent,使用