Scala数据库连接池的简单实现
作者:ncqingchuan1976 发布时间:2023-07-14 14:19:37
在使用JDBC的时候,数据库据连接是非常宝贵的资源。为了复用这些资源,可以将连接保存在一个队列中。当需要的时候可以从队列中取出未使用的连接。如果没有可用连接,则可以在一定时间内等待,直到队列中有可用的连接,否则将抛出异常。
以下是DataSoucre的代码,DataSoucre负责对连接的管理以及分发,同时设置队列的大小,等待时间,连接的账号、密码等。
核心方法为getConenction()方法。且实现AutoCloseable接口,以便后面可以使用using方法自动关闭资源。队列中的连接为封装了conenction的DbConnection类。
package pool
import scala.util.control.Breaks._
import scala.collection.mutable.ArrayBuffer
import java.{util => ju}
import scala.collection.mutable.Buffer
import scala.util.control.Breaks
class DataSource(
val driverName: String,
val url: String,
val user: String,
val password: String,
val minSize: Integer = 1,
val maxSize: Integer = 10,
val keepAliveTimeout: Long = 1000
) extends AutoCloseable {
if (minSize < 0 || minSize > maxSize || keepAliveTimeout < 0) {
throw new IllegalArgumentException("These arguments are Illegal")
}
Class.forName(driverName)
private val pool: Buffer[DbConnection] = ArrayBuffer[DbConnection]()
private val lock: ju.concurrent.locks.Lock = new ju.concurrent.locks.ReentrantLock(true)
for (i <- 0 until minSize) {
pool += new DbConnection(url, user, password)
}
def getConenction(): DbConnection = {
val starEntry = System.currentTimeMillis()
Breaks.breakable {
while (true) {
lock.lock()
try {
for (con <- pool) {
if (!con.used) {
con.used = true
return con;
}
}
if (pool.size < maxSize) {
var con = new DbConnection(url, user, password) { used = true }
pool.append(con)
return con
}
} finally {
lock.unlock()
}
if (System.currentTimeMillis() - starEntry > keepAliveTimeout) {
break()
}
}
}
throw new IllegalArgumentException("Connection Pool is empty")
}
def close(): Unit = {
lock.lock()
try {
if (pool != null) {
pool.foreach(t => t.innerConnection.close())
pool.clear()
}
} finally {
lock.unlock()
}
}
}
以下是Dbconnection类,该类提供了三个方法且实现了AutoCloseable接口
BeginTransaction:开启事务,并返回封装了的DbTransaction类
close:将连接释放
CreateCommand:创建DbCommand类,该类是负责操作连接的类,比如提交sql,读取数据等
package pool
import java.sql.Connection
import java.sql.DriverAction
import java.sql.DriverManager
class DbConnection(
val url: String,
val user: String,
val password: String
) extends AutoCloseable {
private[pool] var used: Boolean = false
private[pool] val innerConnection: Connection = DriverManager.getConnection(url, user, password)
def close(): Unit = {
if (used) {
used = false
}
}
def BeginTransaction(isolationLevel: Int = IsolationLevel.TRANSACTION_READ_COMMITTED): DbTransaction = {
if (innerConnection.getAutoCommit()) {
innerConnection.setAutoCommit(false)
}
innerConnection.setTransactionIsolation(isolationLevel)
new DbTransaction(this)
}
def CreateCommand(): DbCommand = {
new DbCommand(this)
}
}
以下是DbCommand类的代码,该类负责操作数据库。如ExecuteResultSet,ExecuteScalar等。
ExecuteScalar:查询数据库并返回第一行第一个值的方法。
ExecuteResultSet:该方法有两个重载方法。
参数为callBack: ResultSet => Unit的方法,提供了一个回调函数,解析数据的操作可以在回调中实现。
无参的版本则通过反射直接将ResultSet通过字段位置映射,转换成你需要的类型。
package pool
import java.sql.CallableStatement
import java.sql.ResultSet
import java.sql.SQLType
import java.sql.Statement
import java.sql.Types
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.implicitConversions
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
import Dispose.using
import java.{util => ju}
class DbCommand(val connection: DbConnection, var commandText: String = null, val queryTimeout: Integer = 30) extends AutoCloseable {
if (queryTimeout < 0) {
throw new IllegalArgumentException(s"timeout (${queryTimeout}) value must be greater than 0.")
}
val Parameters: Buffer[DbParameter] = ArrayBuffer[DbParameter]()
private val mirror = ru.runtimeMirror(getClass().getClassLoader())
private var statement: CallableStatement = null
/** @author:qingchuan
*
* @return
*/
def ExecuteScalar(): Any = {
var obj: Any = None
ExecuteResultSet(t => {
if (t.next()) {
if (t.getMetaData().getColumnCount() > 0)
obj = t.getObject(1)
}
})
obj
}
/** @author
* qingchuan
* @version 1.0
*
* @param callBack
*/
def ExecuteResultSet(callBack: ResultSet => Unit): Unit = {
if (callBack == null) throw new IllegalArgumentException("The value of parameter callback is null.")
statement = connection.innerConnection.prepareCall(commandText)
statement.setQueryTimeout(queryTimeout)
addParatemetrs()
using(statement.executeQuery()) { t =>
callBack(t)
if (!t.isClosed())
getOutParameterValue()
}
}
def ExecuteResultSet[T: ru.TypeTag](): ArrayBuffer[T] = {
val classSymbol = mirror.symbolOf[T].asClass
val classMirror = mirror.reflectClass(classSymbol)
val consMethodMirror = classMirror.reflectConstructor(classSymbol.primaryConstructor.asMethod)
val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
val result = new ArrayBuffer[T]()
ExecuteResultSet(t => {
while (t.next()) {
var i = 1
val values: Buffer[Any] = ArrayBuffer()
for (f <- fields) {
values += t.getObject(i)
i += 1
}
result += consMethodMirror.apply(values: _*).asInstanceOf[T]
}
})
result
}
def ExecuteBatch[T: ru.TypeTag: ClassTag](values: List[T]): Int = {
statement = connection.innerConnection.prepareCall(commandText)
var trans: DbTransaction = null
val fields = ru.typeOf[T].decls.filter(t => t.asTerm.isGetter && t.isPublic).map(t => t.asTerm)
for (t <- values) {
var i = 1
val filedMirror = mirror.reflect(t)
for (f <- fields) {
val instance = filedMirror.reflectField(f)
statement.setObject(i, instance.get)
i += 1
}
statement.addBatch()
}
try {
trans = connection.BeginTransaction()
val obj = statement.executeBatch()
trans.Commit()
statement.clearBatch()
obj.sum
} catch {
case e: Exception => {
if (trans != null)
trans.RollBack()
throw e
}
}
}
def ExecuteNoneQuery(): Integer = {
statement = connection.innerConnection.prepareCall(commandText)
statement.setQueryTimeout(queryTimeout)
addParatemetrs()
val obj = statement.executeUpdate()
getOutParameterValue()
obj
}
def CreateParameter(): DbParameter = {
new DbParameter();
}
private def getOutParameterValue(): Unit = {
for (i <- 1 to Parameters.size) {
val parameter: DbParameter = Parameters(i - 1);
if (parameter.parameterDirection == ParameterDirection.Output || parameter.parameterDirection == ParameterDirection.InputOutput) {
parameter.value = statement.getObject(i);
}
}
}
private def addParatemetrs(): Unit = {
statement.clearParameters()
for (i <- 1 to Parameters.size) {
val p = Parameters(i - 1);
if (p.parameterDirection == ParameterDirection.Input || p.parameterDirection == ParameterDirection.InputOutput) {
statement.setObject(i, p.value)
}
if (p.parameterDirection == ParameterDirection.Output || p.parameterDirection == ParameterDirection.InputOutput) {
statement.registerOutParameter(p.parameterName, p.sqlType, p.scale)
}
}
}
def close() {
if (statement != null) {
statement.close()
}
}
}
case class DbParameter(
var parameterName: String = null,
var value: Any = null,
var parameterDirection: Integer = ParameterDirection.Input,
var scale: Integer = 0,
var sqlType: Integer = null
) {}
object ParameterDirection {
val Input = 1
val InputOutput = 2
val Output = 3
}
以下代码是DbTransaction,该类提供了事务的操作如提交、回滚。
package pool
class DbTransaction(private val connection: DbConnection) {
def Commit(): Unit = {
connection.innerConnection.commit()
if (!connection.innerConnection.getAutoCommit()) {
connection.innerConnection.setAutoCommit(true);
}
}
def RollBack(): Unit = {
connection.innerConnection.rollback()
if (!connection.innerConnection.getAutoCommit()) {
connection.innerConnection.setAutoCommit(true)
}
}
def getConnection(): DbConnection = {
connection
}
def getTransactionIsolation(): Int = {
connection.innerConnection.getTransactionIsolation()
}
}
object IsolationLevel {
val TRANSACTION_NONE = 0
val TRANSACTION_READ_UNCOMMITTED = 1;
val TRANSACTION_READ_COMMITTED = 2;
val TRANSACTION_REPEATABLE_READ = 4;
val TRANSACTION_SERIALIZABLE = 8;
}
最后是using的方法。通过柯里化以及Try-catch-finally的方式 自动关闭实现了AutoCloseable接口的资源。
package pool
object Dispose {
def using[T <: AutoCloseable](cls: T)(op: T => Unit): Unit = {
try {
op(cls)
} catch {
case e: Exception => throw e
} finally {
cls.close()
}
}
}
以下是客户端调用,代码模拟了15个线程并发访问数据库,连接池最多3个资源,从而说明连接池是可以复用这些连接的。
import pool.DataSource
import pool.DbCommand
import pool.DbParameter
import pool.DbTransaction
import pool.Dispose.using
import pool.IsolationLevel
import pool.ParameterDirection
import java.sql.Date
import java.sql.ResultSet
import java.sql.Types
import java.time.LocalDate
import java.time.LocalDateTime
import java.time.LocalTime
import javax.xml.crypto.Data
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Buffer
import scala.language.experimental.macros
import scala.reflect.ClassTag
import scala.reflect.runtime.{universe => ru}
import com.nimbusds.oauth2.sdk.util.date.SimpleDate
import java.text.SimpleDateFormat
object App {
def main(args: Array[String]): Unit = {
val pool = new DataSource(
"com.microsoft.sqlserver.jdbc.SQLServerDriver",
"jdbc:sqlserver://localhost:1433;databaseName=HighwaveDW;trustServerCertificate=true",
"账号",
"密码",
minSize = 1,
maxSize = 3,
keepAliveTimeout = 3000
)
val formatter: SimpleDateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
for (i <- 1 to 15) {
val thread: Thread = new Thread(() => {
val date = new Date(System.currentTimeMillis())
using(pool.getConenction()) { con =>
{
using(new DbCommand(con)) { cmd =>
{
cmd.commandText = "{call p_get_out(?,?)}"
val p1 = new DbParameter("@id", i)
val p2 = new DbParameter(parameterName = "@msg", parameterDirection = ParameterDirection.Output, sqlType = Types.VARCHAR, scale = 20)
cmd.Parameters.append(p1)
cmd.Parameters.append(p2)
val result = cmd.ExecuteScalar()
println(s"result=${result},output=${p2.value},parameter=${i}")
}
}
}
}
})
thread.start()
}
}
}
开发环境VsCode,SQL Server数据库。以下是引用的第三方库。
version := "1.0"
libraryDependencies += "com.microsoft.sqlserver" % "mssql-jdbc" % "11.2.0.jre8"
libraryDependencies += "org.scala-lang" % "scala-reflect" % "2.13.8"
以下是执行结果。
来源:https://blog.csdn.net/ncqingchuan1976/article/details/128859029


猜你喜欢
- java引用传递的三种类型我这里使用了mldn视频里的例子,只用于学习交流。第一种结果:调用前:50调用后:1000分析:理解:好理解第二种
- Spring Data 概述Spring Data用于简化数据库访问,支持NoSQL 和 关系数据存储,其主要目标是使数据库的访问变得方便快
- 特性一:委托委托是C#语言 * 有的概念,相当于C/C++中的函数指针,与C/C++中函数指针的不同之处是:委托是面向对象的、类型安全的和保险
- 目录springboot中定时任务的创建springboot通过注解创建定时任务首先引入pom直接上代码来一个栗子@Scheduled注解的
- 前言在我们的项目中,通常会把数据存储到关系型数据库中,比如Oracle,SQL Server,Mysql等,但是关系型数据库对于并发的支持并
- 1. 生命周期感知1.1 生命周期感知组件我们知道,Controller(Activity or Fragment) 都是有生命周期的,但是
- ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布
- 本文实例讲述了Java面向对象程序设计:继承,多态用法。分享给大家供大家参考,具体如下:本文内容:继承多态首发时期:2018-03-23继承
- 画廊视图(Gallery)表示,能够按水平方向显示内容,并且可用手指直接拖动图片移动,一般用来浏览图片,被选中的选项位于中间,并且可以响应事
- 引言什么是Parser CombinatorParser Combinator是函数式语言中的概念,它是一种通过组合小型解析器来构建复杂解析
- 前言前几篇文章着重介绍了后端服务数据库和多线程并行处理优化,并示例了改造前后的伪代码逻辑。当然了,优化是无止境的,前人栽树后人乘凉。作为我们
- 前言WebJar官网:https://www.webjars.org/,对于任何与Servlet 3兼容的容器,WEB-INF/lib目录中
- Jackson 是当前用的比较广泛的,用来序列化和反序列化 json 的 Java 的开源框架。Jackson 社 区相对比较活跃,更新速度
- 适配器(Adapter)模式:适配器模式把一个类的接口变换成客户端所期待的另一种接口,从而使原本因接口不匹配而无法在一起工作的两个类能够在一
- 虽然Android给我们提供了众多组件,但是使用起来都不是很方便,我们开发的APK都有自己的风格,如果使用了系统自带的组件,总是觉得和应用的
- void UpdateContactSign() {&n
- 方法一: IDictionaryEnumerator enumerator = thProduct.GetEn
- Unity中利用材质自发光实现物体闪烁效果,供大家参考,具体内容如下补充:这种方法有一点问题,在测试(Windows平台)的时候发现,要想在
- Android EditText输入手机号空格开发需求是在登录页面的手机EditText中间插入空格,让用户看起来方便点, 130 1234
- 下载和上传附件、发送短信和发送邮件,都算是程序中很常用的功能,之前记录了文件的上传和下载还有发送短信,由于最近比较忙,邮件发送的功能就没有时