网络编程
位置:首页>> 网络编程>> 数据库>> 使用Python实现将多表分批次从数据库导出到Excel

使用Python实现将多表分批次从数据库导出到Excel

作者:幸福丶如此  发布时间:2024-01-13 06:17:27 

标签:Python,数据库,导出,Excel

一、应用场景

为了避免反复的手手工从后台数据库导出某些数据表到Excel文件、高效率到多份离线数据。

二、功能事项

支持一次性导出多个数据源表、自动获取各表的字段名。

支持控制批次的写入速率。例如:每5000行一个批次写入到excel。

支持结构相同的表导入到同一个Excel文件。可适用于经过水平切分后的分布式表。

三、主要实现

1、概览

A[创建类] -->|方法1| B(创建数据库连接)
A[创建类] -->|方法2| C(取查询结果集)
A[创建类] -->|方法3| D(利用句柄写入Excel)
A[创建类] -->|方法4| E(读取多个源表)

B(创建数据库连接) -->U(调用示例)
C(取查询结果集) -->U(调用示例)
D(利用句柄写入Excel) -->U(调用示例)
E(读取多个源表) -->U(调用示例)

2、主要方法

首先需要安装第三方库pymssql实现对SQLServer的连接访问,自定义方法__getConn()需要指定如下五个参数:服务器host、登录用户名user、登录密码pwd、指定的数据库db、字符编码charset。连接成功后,通过cursor()获取游标对象,它将用来执行数据库脚本,并得到返回结果集和数据总量。

创建数据库连接和执行SQL的源码:


def __init__(self,host,user,pwd,db):
   self.host = host
   self.user = user
   self.pwd = pwd
   self.db = db

def __getConn(self):
   if not self.db:
     raise(NameError,'没有设置数据库信息')
   self.conn = pymssql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db, charset='utf8')
   cur = self.conn.cursor()
   if not cur:
     raise(NameError,'连接数据库失败')
   else:
     return cur

3、方法3中写入Excel时,注意一定要用到Pandas中的公共句柄ExcelWriter对象writer。当数据被分批多次写入同一个文件时,如果直接使用to_excel()方法,则前面批次的结果集将会被后续结果覆盖。增加了这个公共句柄限制后,后面的写入会累加到前面写入的数据尾部行,而不是全部覆盖。

writer = pd.ExcelWriter(file)
df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow)

分批次写入到目标Excel时的另一个要注意的参数是写入行startrow的设置。每次写入完成后需要重新指下一批次数据的初始位置值。每个批次的数据会记录各自的所属批次信息。

利用关键字参数**args 指定多个数据源表和数据库连接。


def exportToExcel(self, **args):
 for sourceTB in args['sourceTB']:    
   arc_dict = dict(
     sourceTB = sourceTB,
     path=args['path'],
     startRow=args['startRow'],
     isHeader=args['isHeader'],
     batch=args['batch']
   )
   print('\n当前导出的数据表为:%s' %(sourceTB))
   self.writeToExcel(**arc_dict)
 return 'success'

四、先用类MSSQL创建对象,再定义关键字参数args,最终调用方法导出到文件即完成数据导出。


#!/usr/bin/env python
# coding: utf-8

# 主要功能:分批次导出大数据量、结构相同的数据表到excel
# 导出多个表的数据到各自的文件,
# 目前问题:to_excel 虽然设置了分批写入,但先前的数据会被下一次写入覆盖,
# 利用Pandas包中的ExcelWriter()方法增加一个公共句柄,在写入新的数据之时保留原来写入的数据,等到把所有的数据都写进去之后关闭这个句柄
import pymssql
import pandas as pd
import datetime
import math

class MSSQL(object):
 def __init__(self,host,user,pwd,db):
   self.host = host
   self.user = user
   self.pwd = pwd
   self.db = db

def __getConn(self):
   if not self.db:
     raise(NameError,'没有设置数据库信息')
   self.conn = pymssql.connect(host=self.host, user=self.user, password=self.pwd, database=self.db, charset='utf8')
   cur = self.conn.cursor()
   if not cur:
     raise(NameError,'连接数据库失败')
   else:
     return cur

def executeQuery(self,sql):
   cur = self.__getConn()
   cur.execute(sql)
   # 获取所有数据集
   # fetchall()获取结果集中的剩下的所有行
   # 如果数据量太大,是否需要分批插入
   resList, rowcount = cur.fetchall(),cur.rowcount
   self.conn.close()
   return (resList, rowcount)

# 导出单个数据表到excel
 def writeToExcel(self,**args):
   sourceTB = args['sourceTB']
   columns = args.get('columns')
   path=args['path']
   fname=args.get('fname')
   startRow=args['startRow']
   isHeader=args['isHeader']
   N=args['batch']

# 获取指定源数据列
   if columns is None:
     columns_select = ' * '
   else:
     columns_select = ','.join(columns)

if fname is None:
     fname=sourceTB+'_exportData.xlsx'

file = path + fname
   # 增加一个公共句柄,写入新数据时,保留原数据
   writer = pd.ExcelWriter(file)

sql_select = 'select '+ columns_select + ' from '+ sourceTB
   fetch_data, rowcount = self.executeQuery(sql_select)
   # print(rowcount)

df_fetch_data = pd.DataFrame(fetch_data)
   # 一共有roucount行数据,每N行一个batch提交写入到excel
   times = math.floor(rowcount/N)
   i = 1
   rs_startrow = 0
   # 当总数据量 > 每批插入的数据量时
   print(i, times)
   is_while=0
   while i <= times:
     is_while = 1
     # 如果是首次,且指定输入标题,则有标题
     if i==1:
       # isHeader = True
       startRow = 1
     else:
       # isHeader = False
       startRow+=N
     # 切片取指定的每个批次的数据行 ,前闭后开
     # startrow: 写入到目标文件的起始行。0表示第1行,1表示第2行。。。
     df_fetch_data['batch'] = 'batch'+str(i)
     df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow)
     print('第',str(i),'次循环,取源数据第',rs_startrow,'行至',i*N,'行','写入到第',startRow,'行')
     print('第',str(i),'次写入数据为:',df_fetch_data[rs_startrow:i*N])
     # 重新指定源数据的读取起始行
     rs_startrow =i * N
     i+=1

# 写入文件的开始行数
   # 当没有做任何循环时,仍然从第一行开始写入
   if is_while == 0:
     startRow = startRow
   else:
     startRow+=N
   df_fetch_data['batch'] = 'batch'+str(i)
   print('第{0}次读取数据,从第{1}行开始,写入到第{2}行!'.format(str(i), str(rs_startrow), str(startRow)))
   print('第',str(i),'写入数据为:',df_fetch_data[rs_startrow:i*N])
   df_fetch_data[rs_startrow:i*N].to_excel(writer, header=isHeader, index=False, startrow=startRow)

# 注: 这里一定要saver()将数据从缓存写入磁盘!!!!!!!!!!!!!!!!!!!!!1
   writer.save()

start_time=datetime.datetime.now()
 # 导出结构相同的多个表到同一样excel
 def exportToExcel(self, **args):
   for sourceTB in args['sourceTB']:    
     arc_dict = dict(
       sourceTB = sourceTB,
       path=args['path'],
       startRow=args['startRow'],
       isHeader=args['isHeader'],
       batch=args['batch']
     )
     print('\n当前导出的数据表为:%s' %(sourceTB))
     self.writeToExcel(**arc_dict)

return 'success'
   start_time=datetime.datetime.now()

if __name__ == "__main__":
 ms = MSSQL(host="localhost",user="test",pwd="test",db="db_jun")

args = dict(
  sourceTB = ['tb2', 'tb1'],# 待导出的表
  path='D:\\myPC\\Python\\',# 导出到指定路径
  startRow=1,#设定写入文件的首行,第2行为数据首行
  isHeader=False,# 是否包含源数据的标题
  batch=5
 )
 # 导出多个文件
 ms.exportToExcel(**args)

来源:https://blog.csdn.net/m0_37886429/article/details/94965255

0
投稿

猜你喜欢

手机版 网络编程 asp之家 www.aspxhome.com