Python 中创建 PostgreSQL 数据库连接池
作者:Yanbin Blog 发布时间:2024-01-19 22:33:37
习惯于使用数据库之前都必须创建一个连接池,即使是单线程的应用,只要有多个方法中需用到数据库连接,建立一两个连接的也会考虑先池化他们。连接池的好处多多,
1) 如果反复创建连接相当耗时,
2) 对于单个连接一路用到底的应用,有连接池时避免了数据库连接对象传来传去,
3) 忘记关连接了,连接池幸许还能帮忙在一定时长后关掉,当然密集取连接的应用势将耗尽连接,
4) 一个应用打开连接的数量是可控的
接触到 Python
后,在使用 PostgreSQL
也自然而然的考虑创建连接池,使用时从池中取,用完后还回去,而不是每次需要连接时创建一个物理的。Python
连接 PostgreSQL
是主要有两个包, py-postgresql
和 psycopg2
, 而本文的实例将使用后者。
Psycopg
在 psycopg2.pool
模块中提供了两个连接池的实现在,它们都继承自 psycopg2.pool.AbstractConnectionPool,
该抽象类的基本方法是
getconn(key=None):
获取连接putconn(conn, key=None, close=False):
归还连接closeall():
关闭连接池中的所有连接
两个连接池的实现类是
psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwars)
: 给单线程应用用的psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwars)
: 多线程时更安全,其实就是在getconn()
和putconn()
时加了锁来控制
所以最安全保险的做法还是使用 ThreadedConnectionPool
, 在单线程应用中, SimpleConnectionPool
也不见得比 ThreadedConnectionPool
效率高多少。
下面来看一个具体的连接池实现,其中用到了 Context Manager
, 使用时结合 with
键字更方便,用完后不用显式的调用 putconn()
归还连接
db_helper.py
from psycopg2 import pool
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
import atexit
class DBHelper:
def __init__(self):
self._connection_pool = None
def initialize_connection_pool(self):
db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn)
@contextmanager
def get_resource(self, autocommit=True):
if self._connection_pool is None:
self.initialize_connection_pool()
conn = self._connection_pool.getconn()
conn.autocommit = autocommit
cursor = conn.cursor(cursor_factory=RealDictCursor)
try:
yield cursor, conn
finally:
cursor.close()
self._connection_pool.putconn(conn)
def shutdown_connection_pool(self):
if self._connection_pool is not None:
self._connection_pool.closeall()
db_helper = DBHelper()
@atexit.register
def shutdown_connection_pool():
db_helper.shutdown_connection_pool()
from psycopg2 import pool
from psycopg2 . extras import RealDictCursor
from contextlib import contextmanager
import atexit
class DBHelper :
def __init__ ( self ) :
self . _connection_pool = None
def initialize_connection_pool ( self ) :
db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
self . _connection_pool = pool . ThreadedConnectionPool ( 1 , 3 , db_dsn )
@ contextmanager
def get_resource ( self , autocommit = True ) :
if self . _connection_pool is None :
self . initialize_connection_pool ( )
conn = self . _connection_pool . getconn ( )
conn . autocommit = autocommit
cursor = conn . cursor ( cursor_factory = RealDictCursor )
try :
yield cursor , conn
finally :
cursor . close ( )
self . _connection_pool . putconn ( conn )
def shutdown_connection_pool ( self ) :
if self . _connection_pool is not None :
self . _connection_pool . closeall ( )
db_helper = DBHelper ( )
@ atexit . register
def shutdown_connection_pool ( ) :
db_helper . shutdown_connection_pool ( )
几点说明:
只在第一次调用
get_resource()
时创建连接池,而不是在from db_helper import db_helper
引用时就创建连接池Context Manager
返回了两个对象,cursor
和connection
, 需要用connection
管理事物时用它默认时
cursor
返回的记录是字典,而非数组默认时连接为自动提交
最后的
@atexit.register
那个ShutdownHook
可能有点多余,在进程退出时连接也被关闭,TIME_WAIT
时间应该会稍长些
使用方式:
如果不用事物
from db_helper import db_helper
with db_helper.get_resource() as (cursor, _):
cursor.execute('select * from users')
for record in cursor.fetchall():
... process record, record['name'] ...
from db_helper import db_helper
with db_helper . get_resource ( ) as ( cursor , _ ) :
cursor . execute ( 'select * from users' )
for record in cursor . fetchall ( ) :
. . . process record , record [ 'name' ] . . .
如果需要用到事物
with db_helper.get_resource(autocommit=False) as (cursor, _):
try:
cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
cursor.execute('delete from orders where user_id = %s', (1,))
conn.commit()
except:
conn.rollback()
with db_helper . get_resource ( autocommit = False ) as ( cursor , _ ) :
try :
cursor . execute ( 'update users set name = %s where id = %s' , ( 'new_name' , 1 ) )
cursor . execute ( 'delete from orders where user_id = %s' , ( 1 , ) )
conn . commit ( )
except :
conn . rollback ( )
在写作本文时,查看 psycopg 的官网时,发现 Psycopg 3.0
正式版在 2021-10-13 日发布了( Psycopg 3.0 released ), 更好的支持 async。在 Psycopg2 2.2 版本时就开始支持异步了。而且还注意到 Psycopg
的主要部分是用 C 实现的,才使得它效率比较高,也难怪经常用 pip install psycopg2
安装不成功,而要用 pip install psycopg2-binary
来安装的原因。
在创建连接池时加上参数 keepalivesXxx
能让服务器及时断掉死链接,否则在 Linux
下默认要 2 个小时后才断开。死链接的情况发生在客户端异常退出(如断电)时先前建立的链接就变为死链接了。
pool.ThreadedConnectionPool(1, 3, db_dsn, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)
PostgreSQL
服务端会对连接在空闲 tcp_keepalives_idle
秒后,主动发送tcp_keepalives_count 个 tcp_keeplive
侦测包,每个侦探包在 tcp_keepalives_interval
秒内都没有回应,就认为是死连接,于是切断它。
来源:https://www.tuicool.com/articles/Zzmeqyi
猜你喜欢
- 目录一.定义二.命名方法2.1小驼峰命名法2.2大驼峰命名法2.3下划线命名法三.命名规则3.1标识符3.2关键字四.使用方法4.1单变量赋
- PDOStatement::fetchObjectPDOStatement::fetchObject — 获取下一行并作为一个对象返回。(P
- 在DOS界面运行python的py文件我用的Notepad++编写代码,编写完后需要在DOS界面运行打开DOS界面按键盘上的WIN+R,输入
- 1、正则表达式包括两部分①定义正则表达式的规则;②正则表达式的模式(i/g/m);2、声明正则表达式① 字面量声明: var reg = /
- 使用python实现单向循环链表,供大家参考,具体内容如下单向循环链表将所有的链接在一起,每一个节点分为数据存储区和链接区,数据区存储数据,
- 我的操作系统为centos6.51 首先选择django要使用什么数据库。django1.10默认数据库为sqlite3,本人想
- DataGrip支持几乎所有主流的关系数据库产品,如DB2、Derby、H2、MySQL、Oracle、PostgreSQL、SQL Ser
- python提供了两个非常重要的功能来处理python程序在运行中出现的异常和错误。你可以使用该功能来调试python程序。1.异常处理:
- 前言本文将深入研究 preg_replace /e 模式下的代码执行问题,其中包括 preg_replace 函数的执行过程分析、正则表达式
- 之前用的Python2,连接MySQL用的是MySQLdb。现在换成python3.x了,由于 MySQLdb 模块还不支持 Python3
- 对批量WAV音频进行等长分割对WAV格式的音频以相同长度进行分割。import osimport waveimport numpy as n
- 前言很多文章在谈及曲线平滑的时候,习惯使用拟合的概念,我认为这是不恰当的。平滑后的曲线,一定经过原始的数据点,而拟合曲线,则不一定要经过原始
- Python中进行Base64编码和解码要用base64模块,代码示例:#-*- coding: utf-8 -*-import base6
- 首先,这次讲解的tansforms功能,通俗地讲,类似于在计算机视觉流程里的图像预处理部分的数据增强。transforms的原理:说明:图片
- python中函数定义参数有四种形式:def fun1(a,b,c): passdef fun2(a=1,b=2,c=3): &
- 识别快递单号这次跟老师做项目,这项目大概是流水线上识别快递上的快递单号。首先我尝试了解条形码的基本知识 百度百科:条形码 条形码(barco
- 如果要在应用程序中周期性地进行某项操作,比如周期性地检测主机的CPU值,则需要用到QTimer定时器,QTimer类提供了重复的和单次的定时
- 在我们修改过页面的某些数据后,通过想要把页面刷新一下,看看修改后的结果。由于vue的存在,页面是不会自动刷新的,需要我们手动进行操作。在vu
- 计算信息熵的公式:n是类别数,p(xi)是第i类的概率假设数据集有m行,即m个样本,每一行最后一列为该样本的标签,计算数据集信息熵的代码如下
- 一、创建飞书机器人自定义飞书机器人操作步骤,具体详见飞书官方文档:《机器人 | 如何在群聊中使用机器人?》二、调用飞书发送消息自定义机器人添