每天迁移MySQL历史数据到历史库Python脚本
作者:oeleven123456789 发布时间:2024-01-27 20:43:00
标签:Python,迁移,数据
本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下
#!/usr/bin/env python
# coding:utf-8
__author__ = 'John'
import MySQLdb
import sys
import datetime
import time
class ClassMigrate(object):
def _get_argv(self):
self.usage = """
usage():
python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\
--dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\
--delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180
"""
if len(sys.argv) == 1:
print self.usage
sys.exit(1)
elif sys.argv[1] == '--help' or sys.argv[1] == '-h':
print self.usage
sys.exit()
elif len(sys.argv) > 2:
for i in sys.argv[1:]:
_argv = i.split('=')
if _argv[0] == '--source':
_list = _argv[1].split('/')
self.source_host = _list[0].split(':')[0]
self.source_port = int(_list[0].split(':')[1])
self.source_db = _list[1].split(':')[0]
self.source_tab = _list[1].split(':')[1]
self.source_user = _list[2]
self.source_password = _list[3]
elif _argv[0] == '--dest':
_list = _argv[1].split('/')
self.dest_host = _list[0].split(':')[0]
self.dest_port = int(_list[0].split(':')[1])
self.dest_db = _list[1].split(':')[0]
self.dest_tab = _list[1].split(':')[1]
self.dest_user = _list[2]
self.dest_password = _list[3]
elif _argv[0] == '--delete_strategy':
self.deleteStrategy = _argv[1]
if self.deleteStrategy not in ('delete', 'drop'):
print (self.usage)
sys.exit(1)
elif _argv[0] == '--primary_key':
self.pk = _argv[1]
elif _argv[0] == '--date_col':
self.date_col = _argv[1]
elif _argv[0] == '--time_interval':
self.interval = _argv[1]
else:
print (self.usage)
sys.exit(1)
def __init__(self):
self._get_argv()
## --------------------------------------------------------------------
self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')
self.sourcedb_conn_str.autocommit(True)
self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')
self.destdb_conn_str.autocommit(True)
## --------------------------------------------------------------------
self.template_tab = self.source_tab + '_template'
self.step_size = 20000
## --------------------------------------------------------------------
self._migCompleteState = False
self._deleteCompleteState = False
## --------------------------------------------------------------------
self.source_cnt = ''
self.source_min_id = ''
self.source_max_id = ''
self.source_checksum = ''
self.dest_cn = ''
## --------------------------------------------------------------------
self.today = time.strftime("%Y-%m-%d")
# self.today = '2016-05-30 09:59:40'
def sourcedb_query(self, sql, sql_type):
try:
cr = self.sourcedb_conn_str.cursor()
cr.execute(sql)
if sql_type == 'select':
return cr.fetchall()
elif sql_type == 'dml':
rows = self.sourcedb_conn_str.affected_rows()
return rows
else:
return True
except Exception, e:
print (str(e) + "<br>")
return False
finally:
cr.close()
def destdb_query(self, sql, sql_type, values=''):
try:
cr = self.destdb_conn_str.cursor()
if sql_type == 'select':
cr.execute(sql)
return cr.fetchall()
elif sql_type == 'insertmany':
cr.executemany(sql, values)
rows = self.destdb_conn_str.affected_rows()
return rows
else:
cr.execute(sql)
return True
except Exception, e:
print (str(e) + "<br>")
return False
finally:
cr.close()
def create_table_from_source(self):
'''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''
try:
sql = "show create table %s;" % self.source_tab
create_str = self.sourcedb_query(sql, 'select')[0][1]
create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
self.destdb_query(create_str, 'ddl')
return True
except Exception, e:
print (str(e) + "<br>")
return False
def create_table_from_template(self):
try:
sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)
state = self.destdb_query(sql, 'ddl')
if state:
return True
else:
return False
except Exception, e:
print (str(e + "<br>") + "<br>")
return False
def get_min_max(self):
""" 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """
try:
print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
q = self.sourcedb_query(sql, 'select')
self.source_cnt = q[0][0]
self.source_min_id = q[0][1]
self.source_max_id = q[0][2]
self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)
if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:
print ("There is 0 record in source table been matched! <br>")
return False
else:
return True
except Exception, e:
print (str(e) + "<br>")
return False
def migrate_2_destdb(self):
try:
get_min_max_id = self.get_min_max()
if get_min_max_id:
k = self.source_min_id
desc_sql = "desc %s;" % self.source_tab
# self.filed = []
cols = self.sourcedb_query(desc_sql, 'select')
# for j in cols:
# self.filed.append(j[0])
fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句
fileds = fileds.rstrip(',')
while k <= self.source_max_id:
sql = """select * from %s where %s >= %d and %s< %d \
and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
print ("\n%s <br>") % sql
starttime = datetime.datetime.now()
results = self.sourcedb_query(sql, 'select')
insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds
rows = self.destdb_query(insert_sql, 'insertmany', results)
if rows == False:
print ("Insert failed!! <br>")
else:
print ("Inserted %s rows. <br>") % rows
endtime = datetime.datetime.now()
timeinterval = endtime - starttime
print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
k += self.step_size
print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())
return True
else:
return False
except Exception, e:
print (str(e) + "<br>")
return False
def verify_total_cnt(self):
try:
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
dest_result = self.destdb_query(sql, 'select')
self.dest_cnt = dest_result[0][0]
dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])
print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)
if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:
self._migCompleteState = True
print ("Verify successfully !!<br>")
else:
print ("Verify failed !!<br>")
sys.exit(77)
except Exception, e:
print (str(e) + "<br>")
def drop_daily_partition(self):
try:
if self._migCompleteState:
sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
% (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
partition_name = self.sourcedb_query(sql, 'select')
partition_name = partition_name[0][3]
sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \
% (self.pk, self.pk, self.source_tab, partition_name)
q = self.sourcedb_query(sql, 'select')
source_cnt = q[0][0]
source_min_id = q[0][1]
source_max_id = q[0][2]
checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)
if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:
print ("There is 0 record in source PARTITION been matched! <br>")
else:
if checksum == self.source_checksum:
drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)
droped = self.sourcedb_query(drop_par_sql, 'ddl')
if droped:
print (drop_par_sql + " <br>")
print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())
self._deleteCompleteState = True
else:
print (drop_par_sql + " <br>")
print ("Drop partition failed.. <br>")
else:
print ("The partition %s checksum failed !! Drop failed !!") % partition_name
sys.exit(77)
except Exception, e:
print (str(e) + "<br>")
def delete_data(self):
try:
if self._migCompleteState:
k = self.source_min_id
while k <= self.source_max_id:
sql = """delete from %s where %s >= %d and %s< %d \
and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
% (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
print ("\n%s <br>") % sql
starttime = datetime.datetime.now()
rows = self.sourcedb_query(sql, 'dml')
if rows == False:
print ("Delete failed!! <br>")
else:
print ("Deleted %s rows. <br>") % rows
endtime = datetime.datetime.now()
timeinterval = endtime - starttime
print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
time.sleep(1)
k += self.step_size
print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())
self._deleteCompleteState = True
except Exception, e:
print (str(e) + "<br>")
def do(self):
tab_create = self.create_table_from_template()
if tab_create:
migration = self.migrate_2_destdb()
if migration:
self.verify_total_cnt()
if self._migCompleteState:
if self.deleteStrategy == 'drop':
self.drop_daily_partition()
else:
self.delete_data()
print ("\n<br>")
print ("====="*5 + '<br>')
print ("source_total_cnt: %s <br>") % self.source_cnt
print ("dest_total_cnt: %s <br>") % self.dest_cnt
print ("====="*5 + '<br>')
if self._deleteCompleteState:
print ("\nFinal result: Successfully !! <br>")
sys.exit(88)
else:
print ("\nFinal result: Failed !! <br>")
sys.exit(254)
else:
print ("Create table failed ! Exiting. . .")
sys.exit(255)
f = ClassMigrate()
f.do()
来源:https://blog.csdn.net/oEleven123456789/article/details/51850169


猜你喜欢
- 一、适配器模式适配器,顾名思义是一种万能的接口,达到万能转换的效果。适配器模式,定义一个适配器类,并且在该类中定义了适配器接口,这些适配接口
- 在后台处理数据时,前台页面同时计数显示进度条Proess Bar使用了layer来显示用法:1。引用<script lang
- 目录1、发送get请求2、发送post请求3、发送https请求4、文件上传5、文件下载6、timeout超时7、鉴权7.1、auth参数鉴
- 随着 Node.js v8 的发布,Node.js 已原生支持 async/await 函数,Web 框架 Koa 也随之发布了 Koa 2
- 最近我因为要安装 Firebug 1.4 导致我不得不安装了 Firefox 3.5 ,所以很不小心地接触到了Wordpress后台那漂亮的
- 下面给大家介绍Java正则表达式验证固定电话号码符合性,具体代码如下所示:/** * 验证固定电话号码的合法性 * @author jy *
- LCase:转成小写 UCase:转成大写 下面是ASP中的代码,可以直接演示效果的。 代码如下:<% dim s
- 本文实例讲述了php+jQuery实现的 * 导航栏下拉菜单显示效果。分享给大家供大家参考,具体如下:首先看看效果图:1.数据配置文件 db.
- 代码:import tensorflow as tfsess = tf.Session()check_point_path = 'v
- 1. Express简介express是一个基于node.js平台的极简,灵活的web应用开发框架,它提供一系列强大的特征,帮助你创建各种w
- 本文实例讲述了Python实现批量将word转html并将html内容发布至网站的方法。分享给大家供大家参考。具体实现方法如下:#codin
- 今晚在Mac OS中搭建web服务器时遇到一点冲突,逛了几个论坛和网站,都说的太片面。先列出最正确的搭建步骤:(无论你是任何操作系统,或者任
- 前言在我们实际开发中,经常需要将一组数据存储起来,以便使用。如果学习了其他的语言可能知道数组(Array)这个数据结构,它就可以将多个数据进
- torch.nn 是专门为神经网络设计的模块化接口,nn构建于autgrad之上,可以用来定义和运行神经网络nn.Module 是nn中重要
- 一、新建一个用户老板:给我新建一个用户joytom,密码设置为123321,并任意远程主机都能访问,五分钟完成,实现不了就给我提桶走人!小王
- python简单的学生信息管理系统-文件版,供大家参考,具体内容如下功能如下主函数部分增加学生信息修改学生信息删除学生信息查询学生显示所有学
- 正常在使用百度地图时,我们可以通过BMap的实例对象提供的方法计算距离:var map = new BMap.Map('map_ca
- 本文为大家分享了python八皇后问题的解决方法,供大家参考,具体内容如下题目:给定一个 N*N 正方形棋盘,在上面放置 N个棋子,又叫皇后
- 闭包闭包就是能够读取其他函数内部变量的函数。def test1(k, b): def test1_1(x): &n
- 本文实例讲述了Java读取文件及基于正则表达式的获取电话号码功能。分享给大家供大家参考,具体如下:1、正则表达式正则表达式,又称 正规表示法