python实现MySQL指定表增量同步数据到clickhouse的脚本
作者:xiaoxiao树0 发布时间:2024-01-28 04:03:18
标签:python,MySQL,clickhouse,增量同步
python实现MySQL指定表增量同步数据到clickhouse,脚本如下:
#!/usr/bin/env python3
# _*_ coding:utf8 _*_
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (DeleteRowsEvent,UpdateRowsEvent,WriteRowsEvent,)
import clickhouse_driver
import configparser
import os
configfile='repl.ini'
########## 配置文件repl.ini 操作 ##################
def create_configfile(configfile,log_file,log_pos):
config = configparser.ConfigParser()
if not os.path.exists(configfile):
config['replinfo'] = {'log_file':log_file,'log_pos':str(log_pos)}
with open(configfile,'w+') as f:
config.write(f)
### repl.ini 写操作 ##################
def write_config(configfile,log_file,log_pos):
config = configparser.ConfigParser()
config.read(configfile)
config.set('replinfo','log_file',log_file)
config.set('replinfo','log_pos',str(log_pos))
if os.path.exists(configfile):
with open(configfile,'w+') as f:
config.write(f)
else:
create_configfile(configfile)
### 配置文件repl.ini 读操作 ##################
def read_config(configfile):
config = configparser.ConfigParser()
config.read(configfile)
# print(config['replinfo']['log_file'])
# print(config['replinfo']['log_pos'])
return (config['replinfo']['log_file'],int(config['replinfo']['log_pos']))
############# clickhouse 操作 ##################
def ops_clickhouse(db,table,sql):
column_type_dic={}
try:
client = clickhouse_driver.Client(host='127.0.0.1',\
port=9000,\
user='default',\
password='clickhouse')
# sql="select name,type from system.columns where database='{0}' and table='{1}'".format(db,table)
client.execute(sql)
except Exception as error:
message = "获取clickhouse里面的字段类型错误. %s" % (error)
# logger.error(message)
print(message)
exit(1)
MYSQL_SETTINGS = {'host':'127.0.0.1','port':13306,'user':'root','passwd':'Root@0101'}
only_events=(DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent)
def main():
## 每次重启时,读取上次同步的log_file,log_pos
(log_file,log_pos) = read_config(configfile)
# print(log_file+'|'+ str(log_pos))
print('-----------------------------------------------------------------------------')
stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS, resume_stream=True, blocking=True, \
server_id=10,
only_tables='t_repl', only_schemas='test', \
log_file=log_file,log_pos=log_pos, \
only_events=only_events, \
fail_on_table_metadata_unavailable=True, slave_heartbeat=10)
try:
for binlogevent in stream:
for row in binlogevent.rows:
## delete操作
if isinstance(binlogevent, DeleteRowsEvent):
info = dict(row["values"].items())
# print("DELETE FROM `%s`.`%s` WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
# print("ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key]) )
sql="ALTER TABLE `%s`.`%s` DELETE WHERE %s = %s ;" %(binlogevent.schema ,binlogevent.table,binlogevent.primary_key,info[binlogevent.primary_key])
## update 操作
elif isinstance(binlogevent, UpdateRowsEvent):
info_before = dict(row["before_values"].items())
info_after = dict(row["after_values"].items())
# info_set = str(info_after).replace(":","=").replace("{","").replace("}","")
info_set = str(info_after).replace(":", "=").replace("{", "").replace("}", "").replace("'","")
# print("UPDATE `%s`.`%s` SET %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key] ) )
# print("ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key] ) )
sql = "ALTER TABLE %s.%s UPDATE %s WHERE %s = %s ;"%(binlogevent.schema,binlogevent.table,info_set,binlogevent.primary_key,info_before[binlogevent.primary_key] )
## insert 操作
elif isinstance(binlogevent, WriteRowsEvent):
info = dict(row["values"].items())
# print("INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) ) )
sql = "INSERT INTO %s.%s(%s)VALUES%s ;"%(binlogevent.schema,binlogevent.table , ','.join(info.keys()) ,str(tuple(info.values())) )
ops_clickhouse('test', 't_repl',sql )
# 当前log_file,log_pos写入配置文件
write_config(configfile, stream.log_file, stream.log_pos)
except Exception as e:
print(e)
finally:
stream.close()
if __name__ == "__main__":
main()
'''
BinLogStreamReader()参数
ctl_connection_settings:集群保存模式信息的连接设置
resume_stream:从位置或binlog的最新事件或旧的可用事件开始
log_file:设置复制开始日志文件
log_pos:设置复制开始日志pos(resume_stream应该为true)
auto_position:使用master_auto_position gtid设置位置
blocking:在流上读取被阻止
only_events:允许的事件数组
ignored_events:被忽略的事件数组
only_tables:包含要观看的表的数组(仅适用于binlog_format ROW)
ignored_tables:包含要跳过的表的数组
only_schemas:包含要观看的模式的数组
ignored_schemas:包含要跳过的模式的数组
freeze_schema:如果为true,则不支持ALTER TABLE。速度更快。
skip_to_timestamp:在达到指定的时间戳之前忽略所有事件。
report_slave:在SHOW SLAVE HOSTS中报告奴隶。
slave_uuid:在SHOW SLAVE HOSTS中报告slave_uuid。
fail_on_table_metadata_unavailable:如果我们无法获取有关row_events的表信息,应该引发异常
slave_heartbeat:(秒)主站应主动发送心跳连接。这也减少了复制恢复时GTID复制的流量(在许多事件在binlog中跳过的情况下)。请参阅mysql文档中的MASTER_HEARTBEAT_PERIOD以了解语义
'''
知识点扩展:
MySQL备份-增量同步
mysql增量同步主要使用binlog文件进行同步,binlog文件主要记录的是数据库更新操作相关的内容。
1. 备份数据的意义
针对不同业务,7*24小时提供服务和数据的重要性不同。
数据库数据是比较核心的数据,对企业的经营至关重要,数据库备份显得尤为重要。
2. 备份数据库
MySQL数据库自带的备份命令 `mysqldump`,基本使用方法:
语法:`mysqldump -u username -p password dbname > filename.sql`
执行备份命令
`mysqldump -uroot -pmysqladmin db_test > /opt/mysql_bak.sql`
查看备份内容
`grep -v "#|\*|--|^$" /opt/mysql_bak.sql`
来源:https://blog.csdn.net/u010719917/article/details/114085351
0
投稿
猜你喜欢
- Nonetype和空值是不一致的,可以理解为Nonetype为不存在这个参数,空值表示参数存在,但是值为空判断方式如下:if hostip
- 前言:为了获取一定高级操作,如:微信模板消息(xiao,xin)推送,把消息推送给用户,或者是获取用户授权信息都需要用到access tok
- 1.下载 Microsoft SQL Server 2019 Red Hat 存储库配置文件:#curl -o /etc/yum.repos
- 一、http请求1、http请求方式:get和postget一般用于获取/查询资源信息,在浏览器中直接输入url+请求参数点击enter之后
- 相比于逻辑回归,在很多情况下,SVM算法能够对数据计算从而产生更好的精度。而传统的SVM只能适用于二分类操作,不过却可以通过核技巧(核函数)
- 一、介绍Django特点:具有完整的封装,开发者可以高效率的开发项目,Django将大部分的功能进行了封装,开发者只需要调用即可,如此,大大
- 单线程实现单线程实现道理比较简单,这里尝试Soket连接3389,连接成功说明端口开放,否则说明没有开远程服务。随便修改了一下就ok了,代码
- 这篇论坛文章(赛迪网技术社区)主要介绍了数据仓库基本报表制作过程中的SQL写法,详细内容请参考下文:在数据仓库的基本报表制作过程中,通常会使
- 到目前为止,我已经开发了两个HTML编辑器了,一个用在公司的CMS项目,另一个用在这个Blog(TidyEditor,暂时没有单独发布)。下
- PyTorch nn.Module类的简介torch.nn.Module类是所有神经网络模块(modules)的基类,它的实现在torch/
- 本文实例讲述了Python pandas自定义函数的使用方法。分享给大家供大家参考,具体如下:自定义函数的使用import numpy as
- TEMPLATESDjango 1.8的新特性一个列表,包含所有在Django中使用的模板引擎的设置。列表中的每一项都是一个字典,包含某个引
- 简介:本文介绍了图像检索的三种实现方式,均用python完成,其中前两种基于直方图比较,哈希法基于像素分布。 检索方式是:提前导入图片库作为
- 前言在pyhton的学习中,相信大家通常都会碰到第三方库的安装问题,这个问题对于很多初学者而言头疼不已。这里我做一些简单的总结,如何正确高效
- 前一阵,我在为朋友编写一个源代码监控程序的时候,发现了一个 Python 领域非常简单好用的图形界面库。说起图形界面库,你可能会想到 TkI
- 比如我们有一张school表,里面有一个字段county_name,现在我们要查询county_name字段中包含a-w字母和数字以外字符的
- 在项目中遇到了个json数据需要解析,利用Python脚本尝试分享给大家如下:import osimport pandas as pdimp
- 本文实例讲述了Python堆排序原理与实现方法。分享给大家供大家参考,具体如下:在这里要事先说明一下我也是新手,很多东西我了解不是很深入,写
- 这句代码在IE9之前曾被称为世界上最短的IE判定代码。代码虽短但确包含了不少javascript基础知识在里面。var ie&nb
- MySQL在5.1引入了一个rename database操作,但在MySQL5.1.23后又不支持这个命令。可以说是一个实验性的功能,没有