手把手教你实现Python连接数据库并快速取数的工具
作者:Python编程学习圈 发布时间:2024-01-27 12:28:09
在数据生产应用部门,取数分析是一个很常见的需求,实际上业务人员需求时刻变化,最高效的方式是让业务部门自己来取,减少不必要的重复劳动,一般情况下,业务部门数据库表结构一般是固定的,根据实际业务将取数需求做成sql 脚本,快速完成数据获取---授人以渔的方式,提供平台或工具
那如何实现一个自助取数查询工具?
基于底层数据来开发不难,无非是将用户输入变量作为筛选条件,将参数映射到 sql 语句,并生成一个 sql 语句然后再去数据库执行
前言
最后再利用 QT 开发一个 GUI 界面,用户界面的点击和筛选条件,信号触发对应按钮与绑定的传参槽函数执行
具体思路:
1.数据库连接类
此处利用 pandas 读写操作 oracle 数据库
2.主函数模块
1)输入参数模块,外部输入条件参数,建立数据库关键字段映射
--注:读取外部 txt 文件,将筛选字段可能需要进行键值对转换
2)sql 语句集合模块,将待执行的业务 sql 语句统一存放到这里
3)数据处理函数工厂
4)使用多线程提取数据
一、数据库连接类
cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新
Pandas 是基于 NumPy 开发,为了解决数据分析任务的模块,Pandas 引入了大量库和一些标准的数据模型,提供了高效地操作大型数据集所需的方法类和函数
pandas 调用数据库主要有 read_sql_table,read_sql_query,read_sql 三种方式
本文主要介绍一下 Pandas 中 read_sql_query 方法的使用
1:pd.read_sql_query()
读取自定义数据,返还DataFrame格式,通过SQL查询脚本包括增删改查。
pd.read_sql_query(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None,chunksize=None)
sql:要执行的sql脚本,文本类型
con:数据库连接
index_col:选择返回结果集索引的列,文本/文本列表
coerce_float:非常有用,将数字形式的字符串直接以float型读入
parse_dates:将某一列日期型字符串转换为datetime型数据,与pd.to_datetime函数功能类似。
params:向sql脚本中传入的参数,官方类型有列表,元组和字典。用于传递参数的语法是数据库驱动程序相关的。
chunksize:如果提供了一个整数值,那么就会返回一个generator,每次输出的行数就是提供的值的大小
read_sql_query()中可以接受SQL语句,DELETE,INSERT INTO、UPDATE操作没有返回值(但是会在数据库中执行),程序会抛出SourceCodeCloseError,并终止程序。SELECT会返回结果。如果想继续运行,可以try捕捉此异常。
2:pd.read_sql_table()
读取数据库中的表,返还DataFrame格式(通过表名)
import pandas as pd
pd.read_sql_table(table_name, con, schema=None,index_col=None, coerce_float=True, parse_dates=None, columns=None,chunksize=None)
3:pd.read_sql()
读数据库通过SQL脚本或者表名
import pandas as pd
pd.read_sql(sql, con, index_col=None,coerce_float=True, params=None, parse_dates=None, columns=None, chunksize=None)
以下创建连接 oracel 数据库的连接类 Oracle_DB
主要提供 2 种操作数据的函数方法。
import cx_Oracle
# Pandas读写操作Oracle数据库
import pandas as pd
# 避免编码问题带来的乱码
import os
os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'
class Oracle_DB(object):
def __init__(self):
try:
# 连接oracle
# 方法1:sqlalchemy 提供的create_engine()
# from sqlalchemy import create_engine
# engine = create_engine('oracle+cx_oracle://username:password@ip:1521/ORCL')
# #方法2:cx_Oracle.connect()
self.engine = cx_Oracle.connect('username', 'password', 'ip:1521/database')
except cx_Oracle.Error as e:
print("Error %d:%s" % (e.args[0], e.args[1]))
exit()
# 查询部分信息
def search_one(self, sql,sparm):
try:
# #查询获取数据用sql语句
# 代传参数:sparm--查询指定字段参数
df = pd.read_sql_query(sql, self.engine,params=sparm)
self.engine.close()
except Exception as e:
return "Error " + e.args[0]
return df
# 查询全部信息
def search_all(self, sql):
try:
# #查询获取数据用sql语句
df = pd.read_sql_query(sql, self.engine)
self.engine.close()
except Exception as e:
return "Error " + e.args[0]
return df
二、数据提取主函数模块
cx_Oracle 是一个 Python 扩展模块,相当于 python 的 Oracle 数据库的驱动,通过使用所有数据库访问模块通用的数据库 API 来实现 Oracle 数据库的查询和更新。
1)外部输入参数模块
txt 文本中,就包含一列数据,第一行列名,读取的时候忽略第一行
#建立ID——编号字典
def buildid():
sqlid = """select * from b_build_info"""
db = Oracle_DB() # 实例化一个对象
b_build_info = db.search_all(sqlid)
ID_bUILDCODE = b_build_info.set_index("BUILDCODE")["ID"].to_dict()
return ID_bUILDCODE
#通过文本传入待导出数据清单
def read_task_list():
build_code=buildid()
tasklist=[]
is_first_line=True
with open("./b_lst.txt") as lst:
for line in lst:
if is_first_line:
is_first_line=False
continue
tasklist.append(build_code.get(line.strip('\n'))) #键值对转换
return tasklist
2)业务 sql 语句集合
注意in后面{0}不要加引号,这里传入为元组,params 参数传入sparm
= {'Start_time':'2021-04-01','End_time':'2021-05-01'},此处参数可根据需要改变
def sql_d(lst):
# 逐月数据
sql_d_energy_item_month = """select * from d_energy_item_month
where recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and recorddate < to_date(:End_time, 'yyyy-MM-dd')
and buildid in {0}
order by recorddate asc""".format(lst)
# 逐月数据
sql_d_energy_month = """select d.*,t.name from d_energy_month d join t_device_info t on d.branchid = t.id
where d.recorddate >= to_date(:Start_time, 'yyyy-MM-dd')
and d.recorddate < to_date(:End_time, 'yyyy-MM-dd')
and d.buildid = '{0}'
order by d.recorddate asc""".format(lst)
# 查询当日数据
sql_energy_item_hour_cheak = """select * from d_energy_item_hour
where trunc(sysdate)=trunc(recorddate)
order by recorddate asc""".format(lst)
sql_collection = [sql_d_energy_item_month, sql_d_energy_item_day, sql_d_energy_item_hour, sql_d_energy_month,
sql_d_energy_day, sql_d_energy_hour, sql_energy_hour_cheak]
#此处省略部分sql语句
return sql_collection
3)业务数据处理
业务数据处理流程,原始数据后处理,这里不作介绍:
def db_extranction(lst,sparm,sql_type):
"""sql_type--输入需要操作的sql业务序号"""
sql_=sql_d(lst)[sql_type] #输出sql语句
db = Oracle_DB() # 实例化一个对象
res=db.search_one(sql_,sparm)
# 数据处理加工
RES=Data_item_factory(res) #此处省略
# res = db.search_all(sql_d_energy_item_month)
print(RES)
return RES
多线程提取数据部分,这里 tasklist 列表多线程提取数据
import threading
# Pandas读写操作Oracle数据库
from tools.Data_Update_oracle import Oracle_DB
import pandas as pd
from concurrent import futures
if __name__ == '__main__':
#外部传入
tasklist= read_task_list()
print(tasklist)
# 输入时间查找范围参数,可手动修改
sparm = {'Start_time':'2021-04-01','End_time':'2021-05-01'}
lst = tuple(list(tasklist))
#业务类型序号,可手动修改
sql_type=0
#全部提取
db_extranction(lst,sparm,sql_type)
#多线程按字段分批提取
方法一:使用threading模块的Thread类的构造器创建线程
#threads=[threading.Thread(target=db_extranction,args=(lst,sparm,sql_type)) for lst in tasklist]
# [threads[i].start() for i in range(len(threads))]
方法二:使用python的concurrent库,这是官方基于 threading 封装,先安装该库
# with futures.ThreadPoolExecutor(len(tasklist)) as executor:
# executor.map([db_extranction(lst,sparm,sql_type) for lst in tasklist],tasklist)
到此整个数据库取数工具开发流程介绍完毕,就差最后一步分享给小伙伴使用了,做成 GUI 应用此处不做详细介绍,构建独立的 python 环境,快速发布你的应用
来源:https://segmentfault.com/a/1190000042289978


猜你喜欢
- 我使用的是tensorflow-gpu (1.2.1)和Theano (0.9.0),2个4G显存Nvidia Quadro M2000 G
- 视频读取视频读取,主要利用VideoCapture类下的方法打开视频并获取视频中的帧,具体示例如下:#include<iostream
- 面试题:索引的作用?首先建立一张数据库表:create table single_table(id int not auto_increme
- PHP mysqli_select_db() 函数更改连接的默认数据库:删除数据库<?php // 假定数据库用户名:root,密码:
- 我们在前面已经分别介绍了如何在spring Boot中使用JPA以及如何在Spring Boot中输出REST资源。那么关于数据库访问还有一
- 下面的文章主要描述的是如何正确通过rpm包安装、对MySQL进行配置与卸载,以下就有详细内容对配置MySQL与卸载MySQL(和PHP搭配之
- python的set和其他语言类似, 是一个无序不重复元素集, 基本功能包括关系测试和消除重复元素. 集合对象还支持union(联合), i
- pandas的DataFrame对象,本质上是二维矩阵,跟常规二维矩阵的差别在于前者额外指定了每一行和每一列的名称。这样内部数据抽取既可以用
- 一、前言 JDK(Java Development Kit )是一切java应用程序的基础,可以说,所有的java应用程序是构建
- 基本思想:归并排序是一种典型的分治思想,把一个无序列表一分为二,对每个子序列再一分为二,继续下去,直到无法再进行划分为止。然后,就开始合并的
- JavaScript onkeypress 事件用户按下或按住一个键盘按键时会触发 onkeypress 事件。注意:onkeypress
- 模块的作用是:允许从任何文件里得到任何一行或几行,并且使用缓存进行优化。有几个API接口linecache.getlines(filenam
- 1.汇率换算程序案例描述设计一个汇率换算器程序,其功能是将外币换算成人民币,或者相反案例分析分析问题:分析问题的计算部分;确定问题:将问题划
- 前言pandas 是基于 Numpy 构建的含有更高级数据结构和工具的数据分析包类似于 Numpy 的核心是 ndarray,pandas
- 今天想用python做个demo,含两个子图的动态gif,代码如下:import matplotlib.pyplot as pltimpor
- 这个可能是容易被忽略的问题,首选我们要清楚:MySQL中,AND的执行优先级高于OR。也就是说,在没有小括号()的限制下,总是优先执行AND
- mysq 正确清理binlog日志前言:MySQL中的binlog日志记录了数据库中数据的变动,便于对数据的基于时间点和基于位置的恢复,但是
- 模板引擎的思想是来源于MVC(Model View Controller)模型,即模型层、视图层、控制器层。在Web端,模型层为数据库的操作
- 目录CanalMaxwellDatabus阿里云的数据传输服务DTSCanal定位:基于数据库增量日志解析,提供增量数据订阅&消费,
- 刚入职新公司,等了好几天,今天公司给发了新电脑,就要开始进行开发环境的安装了。在软件(JDK,GIT,IDEA,MYSQL,Navicate