Python 获取 datax 执行结果保存到数据库的方法
作者:薛定谔的DBA 发布时间:2024-01-26 23:32:08
标签:Python,datax,结果,保存,数据库
执行 datax 作业,创建执行文件,在 crontab 中每天1点(下面有关系)执行:
其中 job_start 及 job_finish 这两行记录是自己添加的,为了方便识别出哪张表。
#!/bin/bash
source /etc/profile
user1="root"
pass1="pwd"
user2="root"
pass2="pwd"
job_path="/opt/datax/job/"
jobfile=(
job_table_a.json
job_table_b.json
)
for filename in ${jobfile[@]}
do
echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename}
echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
done
# 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1
# egrep '任务|速度|总数|job_start|job_finish' /opt/datax/job/log/
datax 执行日志:
job_start: 2018-08-08 01:13:28 job_table_a.json
任务启动时刻 : 2018-08-08 01:13:28
任务结束时刻 : 2018-08-08 01:14:49
任务总计耗时 : 81s
任务平均流量 : 192.82KB/s
记录写入速度 : 1998rec/s
读出记录总数 : 159916
读写失败总数 : 0
job_finish: 2018-08-08 01:14:49 job_table_a.json
job_start: 2018-08-08 01:14:49 job_table_b.json
任务启动时刻 : 2018-08-08 01:14:50
任务结束时刻 : 2018-08-08 01:15:01
任务总计耗时 : 11s
任务平均流量 : 0B/s
记录写入速度 : 0rec/s
读出记录总数 : 0
读写失败总数 : 0
job_finish: 2018-08-08 01:15:01 job_table_b.json
接下来读取这些信息保存到数据库,在数据库中创建表:
CREATE TABLE `datax_job_result` (
`log_file` varchar(200) DEFAULT NULL,
`job_file` varchar(200) DEFAULT NULL,
`start_time` datetime DEFAULT NULL,
`end_time` datetime DEFAULT NULL,
`seconds` int(11) DEFAULT NULL,
`traffic` varchar(50) DEFAULT NULL,
`write_speed` varchar(50) DEFAULT NULL,
`read_record` int(11) DEFAULT NULL,
`failed_record` int(11) DEFAULT NULL,
`job_start` varchar(200) DEFAULT NULL,
`job_finish` varchar(200) DEFAULT NULL,
`insert_time` datetime DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
定时执行以下文件,因为 datax 作业 1 点执行,为了获取一天内最新生产的日志,脚本中取 82800内生产的日志文件,及23 小时内生产的那个最新日志。所以一天内任何时间执行都可以。此文件也是定时每天执行(判断 datax 作业完成后执行)
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1
import re
import os
import sqlalchemy
import pandas as pd
import datetime as dt
def save_to_db(df):
engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8")
df.to_sql("datax_job_result", engine, index=False, if_exists='append')
def get_the_latest_file(path):
t0 = dt.datetime.utcfromtimestamp(0)
d2 = (dt.datetime.now() - t0).total_seconds()
d1 = d2 - 82800
for (dirpath, dirnames, filenames) in os.walk(path):
for filename in sorted(filenames, reverse = True):
if filename.endswith(".log"):
f = os.path.join(dirpath,filename)
ctime = os.stat(f)[-1]
if ctime>=d1 and ctime <=d2:
return f
def get_job_result_from_logfile(path):
result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])
log_file = get_the_latest_file(path)
index = 0
content = open(log_file, "r")
for line in content:
result.loc[index, 'log_file'] = log_file
if re.compile(r'job_start').match(line):
result.loc[index, 'job_file'] = line.split(' ')[4].strip()
result.loc[index, 'job_start'] = line,
elif re.compile(r'任务启动时刻').match(line):
result.loc[index, 'start_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
elif re.compile(r'任务结束时刻').match(line):
result.loc[index, 'end_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()
elif re.compile(r'任务总计耗时').match(line):
result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','')
elif re.compile(r'任务平均流量').match(line):
result.loc[index, 'traffic'] = line.split(':')[1].strip()
elif re.compile(r'记录写入速度').match(line):
result.loc[index, 'write_speed'] = line.split(':')[1].strip()
elif re.compile(r'读出记录总数').match(line):
result.loc[index, 'read_record'] = line.split(':')[1].strip()
elif re.compile(r'读写失败总数').match(line):
result.loc[index, 'failed_record'] = line.split(':')[1].strip()
elif re.compile(r'job_finish').match(line):
result.loc[index, 'job_finish'] = line,
index = index + 1
else:
pass
save_to_db(result)
get_job_result_from_logfile("/opt/datax/job/log")
来源:https://blog.csdn.net/kk185800961/article/details/81506457


猜你喜欢
- 疫情数据程序源码// An highlighted blockimport requestsimport jsonclass epidemi
- 一、偏好资源的积累利用DreamWeaver 4制作网页会应用到许多各种类型的要素,比如色彩、图片、模板、脚本等。利用站点资源面板将这些东东
- Math.abs(x):可返回数的绝对值Math.ceil(x):向上取整Math.floor(x):向下取整Math.max(x,y):最
- 用 xlrd 模块读取 Excelxlrd 安装cmd 中输入pip install xlrd 即可安装 xlrd 模块若失败请自行百度”p
- 在写脚本的过程中,除了发送form表单参数之外,我们还会发送json格式的参数。那么碰见json格式要怎么发送呢,这篇我们来解决这个问题。直
- 例1、一个简单存储过程游标实例DELIMITER $$ DROP PROCEDURE IF EXISTS getUserInfo $$CRE
- 编译和解释的区别是什么?编译器是把源程序的每一条语句都编译成机器语言,并保存成二进制文件,这样运行时计算机可以直接以机器语言来运行此程序,速
- 这句代码在IE9之前曾被称为世界上最短的IE判定代码。代码虽短但确包含了不少javascript基础知识在里面。var ie&nb
- JavaScript 的成功让人津津乐道,为 Web 网页编写 JavaScript 代码已经是所有 Web 设计师的基本功,这门有趣的语言
- 1、首先访问http://www.python.org/download/去下载最新的python版本。2、安装下载包,一路next。3、为
- PHP mysqli_stmt_init() 函数初始化声明并返回 mysqli_stmt_prepare() 使用的对象:<?php
- python的hashlib库中提供的hexdigest返回长度32的字符串。直接通过digest返回的16字节,有不可打印字符。问题来了,
- 对于map()它的原型是:map(function,sequence),就是对序列sequence中每个元素都执行函数function操作。
- OOM全称"Out Of Memory",即内存溢出。内存溢出已经是软件开发历史上存在了近40年的“老大难”问题。在操作
- 没配置的可以看一下我上一篇 地址开启小皮数据库text 数据库表 student 字段 student_no name age sex效果图
- 使用Python可视化Pygal包来生成可缩放的矢量图形文件!对于在尺寸不同的屏幕上显示图标,它们将自动缩放以适合观看者的屏幕,如果以在线的
- ASP是Web上的客户机/服务器结构的中间层,虽然它使用脚本语言(Java Script,VB Script等)编写,程序代码在服务器上运行
- Kubernetes的控制器模式是其非常重要的一个设计模式,整个Kubernetes定义的资源对象以及其状态都保存在etcd数据库中,通过a
- 远控终端的本质1、服务端(攻击者)传输消息 ----> socket连接 ----> 客户端(被攻击者)接收消息2、客户端执行消
- PDOStatement::nextRowsetPDOStatement::nextRowset — 在一个多行集语句句柄中推进到下一个行集