python结合shell自动创建kafka的连接器实战教程
作者:时空无限 发布时间:2023-01-06 19:17:13
标签:python,kafka,shell
环境
cat /etc/redhat-release
CentOS Linux release 7.5.1804 (Core)
[root@localhost ~]# uname -a
Linux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
python -V
Python 2.7.5
安装连接oracle的python包
pip install cx_Oracle==7.3
获取oracle表信息
cat query_oracle.py
#!/usr/bin/env python
import cx_Oracle
import sys
import os
import csv
import traceback
file = open("oracle.txt", 'w').close()
user = "test"
passwd = "test"
listener = '10.0.2.15:1521/orcl'
conn = cx_Oracle.connect(user, passwd, listener)
cursor = conn.cursor()
sql = "select table_name from user_tables"
cursor.execute(sql)
LIST1=[]
while True:
row = cursor.fetchone()
if row == None:
break
for table in row:
#print table
LIST1.append(table)
LIST2=[]
for i in LIST1:
sql3 = "select COLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALE from cols WHERE TABLE_name=upper('%s')" %i
cursor.execute(sql3)
cursor.execute(sql3)
row3 = cursor.fetchall()
for data in row3:
#LIST2.append(i)
LIST2.extend(list(data))
LIST2.append(i)
f=open('oracle.txt','a+')
print >> f,LIST2
LIST2=[]
#f=open('test.txt','a+')
#select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');
#select column_name,DATA_TYPE from cols WHERE TABLE_name=upper('student');
去掉多余部分
cat auto.sh
#!/bin/bash
#python query_oracle.py |tr "," ' '|tr "'" ' '|tr "[" " "|tr "]" " "
#>oracle.txt
>oracle_tables.txt
cat oracle.txt |tr "[],'" " "|sed "s#[ ][ ]*# #g"|sed 's/^[ \t]*//g' >> oracle_tables.txt
cat oracle_tables.txt
SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE
SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIME
SNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE
shell 脚本处理表信息文件
cat connect.sh
#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(cat oracle_tables.txt |egrep -v '#|^$'|wc -l)
#清空自动创建连接器的脚本
>create-connect.sh
#循环临时文件每一行
for i in `seq $FILE_NUM`
do
FILE_LINE=$(sed -n ${i}p oracle_tables.txt)
TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF-1)}')
COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')
REAL_COL_NUM=`expr $COL_NUM - 2`
#清空临时文件
>${TABLE_NAME}.txt
>${TABLE_NAME}.sql
#循环临时文件每行列名所在的列
for j in `seq 1 4 $REAL_COL_NUM`
do
k=`expr $j + 1`
m=`expr $j + 2`
n=`expr $j + 3`
COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})
COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})
COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})
COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})
#判断列的数据类型是否是NUMBER
if [ "$COL_DATA_TYPE" = "NUMBER" ]
then
#循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中
echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ${TABLE_NAME}.txt
else
#循环拼接SQL查询中的列名部分,追加到临时文件中
echo "$COL_NAME" >> ${TABLE_NAME}.txt
fi
done
#拼接完整的SQL语句,追加到临时文件中
echo "select $(cat ${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ${TABLE_NAME}.sql
#循环追加每个表对应的连接器到自动创建连接器的脚本中
cat >> create-connect.sh << EOF
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_$TABLE_NAME",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl",
"connection.user": "{{ ORACLE_USER }}",
"connection.password": "{{ ORACLE_PASSWD }}",
"topic.prefix": "YC_$TABLE_NAME",
"mode": "{{ CONNECT_MODE }}",
"query": "$(cat ${TABLE_NAME}.sql)"
}
}' >/dev/null 2>&1
EOF
done
说明:脚本中{{ 变量名 }}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。
增强版处理表信息脚本
#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(cat oracle_time_tables.txt |egrep -v '#|^$'|wc -l)
#清空创建连接器的脚本并追加echos函数
> create-jdbc-connect.sh
cat >> create-jdbc-connect.sh << EOF
#!/bin/bash
echos(){
case \$1 in
red) echo -e "\033[31m \$2 \033[0m";;
green) echo -e "\033[32m \$2 \033[0m";;
yellow) echo -e "\033[33m \$2 \033[0m";;
blue) echo -e "\033[34m \$2 \033[0m";;
purple) echo -e "\033[35m \$2 \033[0m";;
*) echo "\$2";;
esac
}
EOF
> create-jdbc-connect-time.sh
cat >> create-jdbc-connect-time.sh << EOF
#!/bin/bash
echos(){
case \$1 in
red) echo -e "\033[31m \$2 \033[0m";;
green) echo -e "\033[32m \$2 \033[0m";;
yellow) echo -e "\033[33m \$2 \033[0m";;
blue) echo -e "\033[34m \$2 \033[0m";;
purple) echo -e "\033[35m \$2 \033[0m";;
*) echo "\$2";;
esac
}
EOF
#创建表相关文件目录
mkdir -p ./TABLE_TIME
#循环临时文件每一行
for i in `seq $FILE_NUM`
do
FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt)
TABLE_NAME=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk '{print $(NF)}')
COL_NUM=$(echo ${FILE_LINE}|sed 's/[ \t]*$//g'|awk -F "[ ]" '{print NF}')
REAL_COL_NUM=`expr $COL_NUM - 2`
#清空临时文件
>./TABLE_TIME/${TABLE_NAME}_time.txt
>./TABLE_TIME/${TABLE_NAME}_time.sql
>./TABLE_TIME/${TABLE_NAME}.sql
#循环临时文件每行列名所在的列
for j in `seq 1 4 $REAL_COL_NUM`
do
k=`expr $j + 1`
m=`expr $j + 2`
n=`expr $j + 3`
COL_NAME=$(echo $FILE_LINE|cut -d " " -f${j})
COL_DATA_TYPE=$(echo $FILE_LINE|cut -d " " -f${k})
COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d " " -f${m})
COL_DATA_SCALE=$(echo $FILE_LINE|cut -d " " -f${n})
#判断列的数据类型是否是NUMBER
if [ "$COL_DATA_TYPE" = "NUMBER" ]
then
#循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中
echo "CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt
else
#循环拼接SQL查询中的列名部分,追加到临时文件中
echo "$COL_NAME" >> ./TABLE_TIME/${TABLE_NAME}_time.txt
fi
#判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中
TIME_COL=({{ TABLE_TIME_COL }})
for TIME in ${TIME_COL[@]}
do
if [ "$COL_NAME" = "$TIME" ]
then
echo "$COL_NAME" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt
fi
done
done
#拼接完整的SQL语句,追加到临时文件中
if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]
then
#echo "select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d ' ' -f$COL_NUM)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql
echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)<trunc(sysdate-1)" >> ./TABLE_TIME/${TABLE_NAME}_time.sql
else
echo "select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr "\n" ","|sed -e 's/,$/\n/') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME" >> ./TABLE_TIME/${TABLE_NAME}.sql
fi
#循环追加每个表对应的连接器到自动创建连接器的脚本中
if [ -f "./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt" ]
then
cat >> create-jdbc-connect-time.sh << EOF
#创建表 $TABLE_NAME 连接器的命令如下
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_time_$TABLE_NAME",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}",
"connection.user": "{{ ORACLE_USER }}",
"connection.password": "{{ ORACLE_PASSWD }}",
"topic.prefix": "YC_${TABLE_NAME}_INSERT",
"poll.interval.ms": "86400000",
"mode": "{{ CONNECT_MODE }}",
"numeric.mapping": "best_fit",
"query": "$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)"
}
}' >/dev/null 2>&1
#判断连接器是否创建成功
if [ \$? -eq 0 ]
then
echos green "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器成功"
else
echos red "\$(date +"%F %H:%M:%S") 创建jdbc_time_${TABLE_NAME} 连接器失败"
fi
EOF
else
cat >> create-jdbc-connect.sh << EOF
#创建表 $TABLE_NAME 连接器的命令如下
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_$TABLE_NAME",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}",
"connection.user": "{{ ORACLE_USER }}",
"connection.password": "{{ ORACLE_PASSWD }}",
"topic.prefix": "YC_${TABLE_NAME}_INSERT",
"poll.interval.ms": "86400000",
"mode": "{{ CONNECT_MODE }}",
"numeric.mapping": "best_fit",
"query": "$(cat ./TABLE_TIME/${TABLE_NAME}.sql)"
}
}' >/dev/null 2>&1
#判断连接器是否创建成功
if [ \$? -eq 0 ]
then
echos green "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器成功"
else
echos red "\$(date +"%F %H:%M:%S") 创建jdbc_${TABLE_NAME} 连接器失败"
fi
EOF
fi
done
来源:https://blog.csdn.net/weixin_40548182/article/details/118422058
0
投稿
猜你喜欢
- csv(Comma-Separated Values)文件是什么?它是一种文件格式,一般也被叫做逗号分隔值文件,可以使用 Excel 软件或
- 1、一般CentOS默认安装了mariadb,所以先查看是否安装mariadb,如果安装就需要先卸载mariadbrpm -qa|grep
- 前言深度学习涉及很多向量或多矩阵运算,如矩阵相乘、矩阵相加、矩阵-向量乘法等。深层模型的算法,如BP,Auto-Encoder,CNN等,都
- 学习前言一起来看看Efficientdet的keras实现吧,顺便训练一下自己的数据。什么是Efficientdet目标检测算法最近,谷歌大
- PDF是我们经常会接触到的一种文件格式,文献、文档...很多都是PDF格式。它以格式稳定的优势,使得我们在打印、分享、传输过程中能够最优的保
- django启动我们在启动一个django项目的时候,无论你是在命令行执行还是在pycharm直接点击运行,其实都是执行'runse
- 本章将覆盖所有在Python中使用的基本I/O功能。有关更多函数,请参考标准Python文档。打印到屏幕上:产生输出最简单的方法
- 1,打开cmd安装PyQt5pip install pyqt52,PyQt5不再提供Qt Designer等工具,所以需要再安装pyqt5-
- 本文实例讲述了django框架cookie和session用法。分享给大家供大家参考,具体如下:首先知道http协议http协议它是无状态的
- 如何实现动态单行刷新,答案是——覆盖但是怎么实现覆盖呢关键在于不换行而且能回退到开始位置那么就要用到 \r这个东西就是让光标回退到
- 摘要主要是介绍python 的回调函数callback。什么是回调函数当程序运行是,一般情况下,应用程序会时常通过API调用库里所预先备好的
- 我就废话不多说了,大家还是直接看代码吧~<input type="text" maxlength="11
- 1.collatz序列编写一个名为 collatz()的函数,它 有一个名为 number 的参数。如果参数是偶数, 那么 collatz(
- 在开发中有需求在详情显示里外键字段内容,并且添加按钮弹窗内容,以及按钮跳转内容。以前并没有做过相似的开发,我们的后台是xadmin,当时正在
- 名词解释断号:比如,连续生成的编号,由于某种操作(通常为删除)后,产生不连续的编号,我们将这种不连续的编号称为断号。例如,数据库中有一个字段
- 堆排序堆是一种完全二叉树(是除了最后一层,其它每一层都被完全填充,保持所有节点都向左对齐),首先需要知道概念:最大堆问题,最大堆就是根节点比
- 介绍zip() 函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表。ps. 如果各个迭代器的元
- PyCharm2020 激活到 2100年一次激活,用到退休,就问你怕了没?超级硬核破解,从此不再找激活码,不再改HOST !!!!!Pyc
- 事件背景过年在家正好闲得没有太多事情,想起年前一个研发项目负责人反馈的问题:“老李啊,我们组一直在使用你这边的 gin
- 本文实例讲述了Python读取Pickle文件信息并计算与当前时间间隔的方法。分享给大家供大家参考,具体如下:python—–读取Pickl