Python ORM数据库框架Sqlalchemy的使用教程详解
作者:胡安民-独行者 发布时间:2024-01-13 20:02:53
对象关系映射(Object Relational Mapping,简称ORM)模式是一种为了解决面向对象与关系数据库存在的互不匹配的现象的技术。面向对象的开发方法是当今企业级应用开发环境中的主流开发方法,关系数据库是企业级应用环境中永久存放数据的主流数据存储系统。对象和关系数据是业务实体的两种表现形式,业务实体在内存中表现为对象,在数据库中表现为关系数据。内存中的对象之间存在关联和继承关系,而在数据库中,关系数据无法直接表达多对多关联和继承关系。因此,对象-关系映射(ORM)系统一般以中间件的形式存在,主要实现程序对象到关系数据库数据的映射。
学过java的hibernate框架的那么这个很好上手,非常简单 ,他有两种模式一种纯orm另一种模式是支持原生sql这两种可以混合使用
优点:
简洁易读:将数据表抽象为对象(数据模型),更直观易读
可移植:封装了多种数据库引擎,面对多个数据库,操作基本一致,代码易维护
更安全:有效避免SQL注入
缺点: 虽然性能稍稍不及原生SQL,但是操作数据库真的很方便!
官网: https://www.sqlalchemy.org/
概念和数据类型
概念
常见数据类型
安装
pip install SQLAlchemy
pip install mysqlclient # 安装自己的数据库客户端(可以是mysql 可以是oracle)
连接
from sqlalchemy import create_engine
engine = create_engine("mysql://user:password@hostname/dbname?charset=uft8",
echo=True,
pool_size=8,
pool_recycle=60*30
)
创建好了Engine的同时,Pool和Dialect也已经创建好了,但是此时并没有真正与数据库连接,等到执行具体的语句.connect()等时才会连接到数据库。
echo: 当设置为True时会将orm语句转化为sql语句打印,一般debug的时候可用
pool_size: 连接池的大小,默认为5个,设置为0时表示连接无限制
pool_recycle: 设置时间以限制数据库多久没连接自动断开
创建数据库表类(模型)
ORM的重要特点就是操作类来操作数据库,现在我们来创建一个类,以常见的用户表举例:
from sqlalchemy import Column, Integer, String
from src.database.SQLalchemyFast import SQLalchemyFast
class UserDB(SQLalchemyFast.Base):
__tablename__ = "User" # __tablename__ 声明表名
# primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
email = Column(String(64), doc="邮箱")
def __init__(self, name=None, email=None):
self.name = name
self.email = email
def __str__(self):
return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)
上面的SQLalchemyFast.Base是我自己封装的Base ,用于统一管理所有模型类,可以将Python类和数据库表关联映射起来。数据库表模型类通过__tablename__和表关联起来,Column表示数据表的列
生成数据库表
Base = declarative_base()
Base.metadata.create_all(engine)
会自动创建表,如果存在则忽略,执行以上代码,就会发现在db中创建了users表。 前提必须有模型类继承了Base
会话
会话就和打电话一样,打一次电话就是一个会话,就相当于和数据库交互一次就是一个会话,一个会话可以运行多个或单个语句,会话结束必须关闭
sqlalchemy中使用session用于创建程序和数据库之间的会话,所有对象的载入和保存都需要通过session对象 。
通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源:
from sqlalchemy.orm import sessionmaker
# 创建session
DbSession = sessionmaker(bind=engine)
session = DbSession()
session的常见操作方法包括:
flush:预提交,提交到数据库文件,还未写入数据库文件中 (没事用)
commit:提交了一个事务
rollback:回滚
close:关闭
增删改查
增
add_user = UserDB("test", "test123@qq.com")
session.add(add_user)
session.commit()
session.add()将会把Model加入当前session维护的持久空间(可以从session.dirty看到)中,直到commit时提交到数据库。
查
users = session.query(UserDB).filter(UserDB.id=1).all()
for item in users:
print(item.name)
改
session.query(Users).filter(UserDB.id=1).update({'name': "Jack"})
删
session.query(UserDB).filter(UserDB.name == "test").delete()
session.commit()
执行裸sql
session.execute(text(sql), params)
session.commit()
sql: select * from User where id = :id and name = :name
params: {"id":1,"name":"张三"}`
参数名必须和sql语句中的参数名一致
with关闭会话
DbSession = sessionmaker(bind=engine)
with DbSession() as conn:
# 代码
conn.commit()
sql建造者模式
需要导入的包
from sqlalchemy import delete, update, text, select, join, desc, asc
sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
where(text("id = :id and name = :name")).\
group_by(UserDB.id,UserDB.name).\
having(text("id = :id and name = :name")).\
order_by(desc("id"),asc("name")).\
offset(1).limit(10).\
params(id=1, name="张三")
print(sql)
以上sql放入到execute里直接就能跑了
多表联查(只支持内查询和左查询和全查询)
sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
join(BookDB,UserDB.id == BookDB.id).\
join(alias(BookDB,"b"),text("b.id == b.id"),isouter=True).\
join(alias(BookDB,"e"),text("e.id == e.id"),full=True). \
where(text("id = :id and name = :name"))
print(sql)
封装的工具
数据库配置文件database.properties
url=mysql://root:root@106.12.174.220:3306/demo?charset=utf8
echo=True # 是否打印sql语句
pool_size=10 # 连接池大小
pool_recycle=1800 # 连接池回收时间
pool_timeout=30 # 连接池超时时间
isolation_level=READ_COMMITTED # 事务隔离级别
工具
from sqlalchemy import create_engine, delete, update, text, alias
from sqlalchemy.future import select
from sqlalchemy.orm import declarative_base, sessionmaker
from src.file.FileReadAndWrite import FileReadAndWrite
from src.log.Log import Log
class SQLalchemyFast(object):
Base = declarative_base()
"""
功能: SQLalchemy工具
"""
def __init__(self, dbFile):
file = FileReadAndWrite.readPropertiesFile(dbFile)
self.engine = create_engine(
url=file['url'],
echo=bool(file['echo']),
pool_size=int(file['pool_size']),
pool_recycle=int(file['pool_recycle']),
pool_timeout=int(file['pool_timeout']),
isolation_level=file['isolation_level'],
)
SQLalchemyFast.Base.metadata.create_all(self.engine) # 创建表,如果表存在则不创建(必须对象继承Base)
# 创建会话
def createSession(self):
Session = sessionmaker(bind=self.engine)
return Session()
# 添加一条数据
def addData(self, object):
with self.createSession() as conn:
conn.add(object)
conn.commit()
# 添加多条数据
def addDataList(self, objectList):
with self.createSession() as conn:
conn.add_all(objectList)
conn.commit()
# 删除主键id的数据
def deleteDataById(self, cla, id):
with self.createSession() as conn:
conn.query(cla).filter(cla.id == id).delete()
conn.commit()
# 删除指定数据(where是并且的关系,不支持or和其他复杂查询)
def deleteDataWhere(self, cla, *where):
with self.createSession() as conn:
stmt = delete(cla).where(*where)
conn.execute(stmt)
conn.commit()
# 清空表
def truncateTable(self, cla):
with self.createSession() as conn:
conn.query(cla).delete()
conn.commit()
# 更新指定主键id的数据
def updateDataById(self, cla, id, data):
"""
:param cla: 类(表)
:param id: 主键id
:param data: {'key': "value",...} key为表中的字段名,value为要修改的值
:return:
"""
with self.createSession() as conn:
stmt = update(cla).where(cla.id == id).values(data)
result = conn.execute(stmt)
conn.commit()
return result
# 更新指定条件的数据 (where是并且的关系,不支持or和其他复杂查询)
def updateDataWhere(self, cla, data, *where):
"""
:param cla: 类(表)
:param data: {'key': "value",...} key为表中的字段名,value为要修改的值
:param where: 过滤条件
:return:
"""
with self.createSession() as conn:
stmt = update(cla).where(*where).values(data)
conn.execute(stmt)
conn.commit()
# 查询全部数据
def queryDataAll(self, cla):
with self.createSession() as conn:
result = conn.query(cla).all()
return result
# 查询主键id的数据
def queryDataById(self, cla, id):
with self.createSession() as conn:
result = conn.query(cla).filter(cla.id == id).first()
return result
# 查询指定数据,不支持分组查询(因为聚合后的数据无法转换成对象)
def queryDataWhere(self, cla,aliasName=None, column=None, where=None,
join=None, on=None, left=None, full=None,
order="", limit="", offset="", distinct=None, params=None):
with self.createSession() as conn:
stmt = select(cla)
if aliasName:
stmt = select(alias(cla,aliasName))
if column:
stmt = stmt.with_only_columns(text(column)) .select_from(cla)
if join is not None and on is not None:
if left:
stmt = stmt.join(join, text(on), isouter=True)
elif full:
stmt = stmt.join(join, text(on), full=True)
else:
stmt = stmt.join(join, text(on))
if where:
stmt = stmt.where(text(where))
if order:
stmt = stmt.order_by(text(order))
if limit:
stmt = stmt.limit(limit)
if offset:
stmt = stmt.offset(offset)
if distinct:
stmt = stmt.distinct()
result = conn.execute(stmt,params).all()
result= [row[0] for row in result]
return result
# 创建事物(运行多条sql语句 ,function(conn)是一个函数,里面包含多条sql语句,需要使用原生的sqlalchemy)
def createTransaction(self, function):
with self.createSession() as conn:
conn.begin()
try:
function(conn)
conn.commit()
except Exception as e:
Log.logError(e)
conn.rollback()
# 执行sql语句(包括增删改查,和存储过程...只要是sql语句都可以执行)
def executeSql(self, sql, params=None):
"""
:param sql: sql语句 如: "select * from User where id = :id and name = :name "
:param params: 参数 例如: {"id":1,"name":"张三"} 注意:参数名必须和sql语句中的参数名一致
发送多个参数时,参数名必须以列表的形式传入,例如: {"id":["1","2"],"name":"张三"}
"INSERT INTO some_table (x, y) VALUES (:x, :y)" 参数可以是 [{"x": 11, "y": 12}, {"x": 13, "y": 14}]
:return:
"""
with self.createSession() as conn:
result = conn.execute(text(sql), params)
conn.commit()
return result
# 执行构建sql语句
def executeSqlBuild(self, sql):
with self.createSession() as conn:
result = conn.execute(sql)
conn.commit()
return result
测试实体
from sqlalchemy import Column, Integer, String
from src.database.SQLalchemyFast import SQLalchemyFast
class UserDB(SQLalchemyFast.Base):
__tablename__ = "User" # __tablename__ 声明表名
# primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
email = Column(String(64), doc="邮箱")
def __init__(self, name=None, email=None):
self.name = name
self.email = email
def __str__(self):
return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)
from sqlalchemy import Column, Integer, String
from src.database.SQLalchemyFast import SQLalchemyFast
class BookDB(SQLalchemyFast.Base):
__tablename__ = "Book" # __tablename__ 声明表名
# primary_key为主键,autoincrement为自增, doc 为注释但是不会在数据库中生成
id = Column(Integer, primary_key=True,autoincrement=True,doc="主键")
name = Column(String(64), unique=True, doc="用户名") # unique=True 为唯一约束会在数据库中生成索引
email = Column(String(64), doc="邮箱")
def __init__(self, name=None, email=None):
self.name = name
self.email = email
def __str__(self):
return "UserDB(id=%s,name=%s,email=%s)" % (self.id, self.name, self.email)
验证代码
import unittest
from sqlalchemy import delete, update, text, select, join, desc, asc, alias
from src.database.BookDB import BookDB
from src.database.SQLalchemyFast import SQLalchemyFast
from src.database.UserDB import UserDB
from src.file.FileTool import FileTool
class SQLalchemyFastTest(unittest.TestCase):
# 测试添加数据
def test_add(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
db.addData(UserDB("name1", "123456789"))
db.addData(UserDB("name2", "123456789"))
db.addData(UserDB("name3", "123456789"))
db.addData(UserDB("name4", "123456789"))
# 测试添加多条数据
def test_addAll(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
db.addDataList([UserDB("name111", "123456789"), UserDB("name211", "123456789")])
# 测试删除数据
def test_deleteDataById(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
db.deleteDataById(UserDB, 1)
# 测试条件删除数据
def test_deleteWhere(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
db.deleteDataWhere(UserDB, UserDB.name == "name1", UserDB.email == "123456789")
# 测试更新数据
def test_update(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
db.updateDataById(UserDB, 10, {"name": "name31", "email": "123456789"})
# 测试条件更新数据
def test_updateFilter(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
db.updateDataWhere(UserDB, {"name": "name33", "email": "123456789"}, UserDB.name == "name2",
UserDB.email == "1231")
# 测试查询数据
def test_queryDataAll(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
data_all = db.queryDataAll(UserDB)
for data in data_all:
print(data)
# 测试查询指定id数据
def test_queryDataById(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
data = db.queryDataById(UserDB, 10)
print(data)
# 测试条件查询数据(不支持分组查询和链表查询)
def test_queryDataWhere(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
data_all = db.queryDataWhere(UserDB,
where="name like CONCAT(:name,'%')",
order="id desc",
offset=1,
limit=3,
params={"name": "name"})
# db.queryDataWhere(UserDB,
# where="name like CONCAT(:name,'%')",
# order="id desc",
# offset=1,
# limit=3,
# params={"name": "name"})
# db.queryDataWhere(UserDB,aliasName="a",
# join=alias(BookDB,"b"),on="a.id == b.id",
# where="a.name like CONCAT(:name,'%')",
# params={"name": "name"})
for data in data_all:
print(data)
# 测试创建事物
def test_createTransaction(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
def test1(conn):
conn.add(UserDB("name111", "123456789"))
conn.add(UserDB("name211", "123456789"))
# raise Exception("test122")
# conn.add(UserDB("name333", "123456789"))
# conn.add(UserDB("name444", "123456789"))
db.createTransaction(test1)
# 测试执行sql(执行失败会回滚的(存储过程,函数))
def test_executeSql(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
# data_all = db.executeSql("select * from User")
# data_all = db.executeSql("select * from User where name like CONCAT(:name,'%')", params={"name":"name"})
# for data in data_all:
# print(data)
# 创建存储过程
# db.executeSql("CREATE PROCEDURE `test_procedure` \
# (IN `in_name` VARCHAR(255), IN `in_email` VARCHAR(255)) \
# BEGIN \
# INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
# END")
# 调用存储过程
# db.executeSql("call test_procedure(:name, :email)", params={"name": "name555", "email": "email12131"})
# 创建函数
# db.executeSql("CREATE FUNCTION `test_function`( `in_name` VARCHAR(255), `in_email` VARCHAR(255)) \
# RETURNS INT(11) \
# BEGIN \
# DELETE FROM `User`; \
# INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
# INSERT INTO `User` (`name`, email) VALUES (in_name, in_email); \
# RETURN 1; \
# END")
# 调用函数
# data_all = db.executeSql("select test_function(:name, :email)", params={"name": "name5551", "email": "email12131"})
# 测试sql构造
def test_executeSqlBuild(self):
path = FileTool.getProjectPath(subpath="src/database/database.properties")
db = SQLalchemyFast(path)
# sql = select(UserDB.id,UserDB.name).select_from(UserDB).\
# join(BookDB,UserDB.id == BookDB.id)
# print(sql)
# db.executeSqlBuild(sql)
来源:https://blog.csdn.net/weixin_45203607/article/details/127406768


猜你喜欢
- 创建测试dataframe:>>> import pandas as pd>>> df = pd.Dat
- <? //很好用的PHP数据库类,三、四句代码搞定一个表的操作,无论这个表字段有多复杂。 //此类多次大量用在大型网站程序的开发上,效
- 用read_csv读数据遇到分隔符问题的两种解决方式import pandas as pd1.更改read_csv函数中的传参&ld
- 本文实例为大家分享了python抓取网页中链接的静态图片的具体代码,供大家参考,具体内容如下# -*- coding:utf-8 -*- #
- 1.turtle库turtle库是Python语言中一个很流行的绘制图像的函数库,想象一个小乌龟,在一个横轴为x、纵轴为y的坐标系原点,(0
- python是支持多线程的, 主要是通过thread和threading这两个模块来实现的,本文主要给大家分享python实现多线程网页爬虫
- 1)当我们拿到一个题目时,首先会根据已经知道的条件,进行数据的初步整理和分析。相当于填写出9宫格里,所有的“确定项”,以及标记“可能选项”。
- 目录深度遍历递归用栈来遍历磁盘广度遍历磁盘用队列遍历磁盘深度遍历递归import osdef get_files(path):
- 正则表达式正则表达式是一种强大的字符串操作工具。它是一种领域特定语言 (DSL),不管是 Python 还是在大多数现代编程语言中都是作为库
- 了解如何在sublime编辑器中安装python软件包,以实现自动完成等功能,并在sublime编辑器本身中运行build。安装Sublim
- 虽然在win2003配置PHP有点非主流,但你还是要会怎么弄。你也可以将本文的虚拟机看成是服务器,宿主机看成是客户端。不像Linux系统,由
- 面包屑导航可以将浏览过的页面记录下来,方便很快速的跳转回某一个页面,本文介绍了几种自己封装面包屑组件的方式。一、为什么需要面包屑?当网页进行
- 1.问题描述当我们在实用ElementUI组件完成项目的时候可能会遇到这样的需求,比如:新建一个活动,需要定义活动的时间范围;因此我们在新建
- parseFloat()方法的定义和用法:parseFloat()方法可以解析一个字符串,并返回一个浮点数。注:如果字符串中的第一个字符不能
- 柱形图bar()函数绘制柱形图import matplotlib.pyplot as plx = [1,2,3,4,5,6,7]y = [1
- 本文主要介绍的关于Python切片赋值的相关内容,分享出来供大家参考学习,下面来一起看看详细的介绍:昨天有同学问了我这么个问题:t = [1
- CSS hack是指我们为了兼容各浏览器,而使用的特别的css定义技巧。这是国外摘来的一张CSS hack列表,显示了各浏览器对css ha
- 1 丰富的二维动画/图形和视音频表现 Rich 2D animation/graphics with audio and video这点毋庸
- Juan Pablo De Gregorio 的 原文很多人都问我如何为一本杂志、一份报纸、一张海报、一份简报或是一份出版物选择
- #@project = facepalm#@file = main#@author = Maoliang Ran#@create_time