实例分析python3实现并发访问水平切分表
作者:laozhang 发布时间:2023-12-29 06:43:45
标签:python3,水平切分表
场景说明
假设有一个mysql表被水平切分,分散到多个host中,每个host拥有n个切分表。
如果需要并发去访问这些表,快速得到查询结果, 应该怎么做呢?
这里提供一种方案,利用python3的asyncio异步io库及aiomysql异步库去实现这个需求。
代码演示
import logging
import random
import asynciofrom aiomysql
import create_pool
# 假设mysql表分散在8个host, 每个host有16张子表
TBLES = { "192.168.1.01": "table_000-015",
# 000-015表示该ip下的表明从table_000一直连续到table_015
"192.168.1.02": "table_016-031",
"192.168.1.03": "table_032-047",
"192.168.1.04": "table_048-063",
"192.168.1.05": "table_064-079",
"192.168.1.06": "table_080-095",
"192.168.1.07": "table_096-0111",
"192.168.1.08": "table_112-0127",
}
USER = "xxx"PASSWD = "xxxx"# wrapper函数,用于捕捉异常def query_wrapper(func):
async def wrapper(*args, **kwargs):
try:
await func(*args, **kwargs) except Exception as e:
print(e) return wrapper
# 实际的sql访问处理函数,通过aiomysql实现异步非阻塞请求@
query_wrapperasync def query_do_something(ip, db, table):
async with create_pool(host=ip, db=db, user=USER, password=PASSWD) as pool:
async with pool.get() as conn:
async with conn.cursor() as cur:
sql = ("select xxx from {} where xxxx")
await cur.execute(sql.format(table))
res = await cur.fetchall()
# then do something...# 生成sql访问队列, 队列的每个元素包含要对某个表进行访问的函数及参数def gen_tasks():
tasks = [] for ip, tbls in TBLES.items():
cols = re.split('_|-', tbls)
tblpre = "_".join(cols[:-2])
min_num = int(cols[-2])
max_num = int(cols[-1])
for num in range(min_num, max_num+1):
tasks.append(
(query_do_something, ip, 'your_dbname', '{}_{}'.format(tblpre, num))
)
random.shuffle(tasks)
return tasks# 按批量运行sql访问请求队列def run_tasks(tasks, batch_len):
try:
for idx in range(0, len(tasks), batch_len):
batch_tasks = tasks[idx:idx+batch_len]
logging.info("current batch, start_idx:%s len:%s" % (idx, len(batch_tasks)))
for i in range(0, len(batch_tasks)):
l = batch_tasks[i]
batch_tasks[i] = asyncio.ensure_future(
l[0](*l[1:])
)
loop.run_until_complete(asyncio.gather(*batch_tasks))
except Exception as e:
logging.warn(e)# main方法, 通过asyncio实现函数异步调用def main():
loop = asyncio.get_event_loop()
tasks = gen_tasks()
batch_len = len(TBLES.keys()) * 5 # all up to you
run_tasks(tasks, batch_len)
loop.close()


猜你喜欢
- 一、项目效果学校宿舍今天搬家,累麻了,突然发现展示处理的也很粗糙,就这样吧嘿嘿~~~二、核心流程1、openCV读取视频流、在每一帧图片上画
- 一、实现划词功能说是划词翻译,实际上我们是通过获取用户的剪切板内容,通过一系列的操作得到的。首先呢,我们就先实现如何获取剪切板内容的程序首先
- 前言报错如下:Could not open JDBC Connection for transaction; nested exceptio
- 前言(结尾有彩蛋欧)目前,许多网站采取各种各样的措施来反爬虫,其中一个措施便是使用验证码。随着技术的发展,验证码的花样越来越多。验证码最初是
- 我就废话不多说了,大家还是直接看代码吧~package mainimport ("fmt""reflect&q
- 本文实例讲述了Python基于SMTP协议实现发送邮件功能。分享给大家供大家参考,具体如下:SMTP(Simple Mail Transfe
- ADB是Android SDK中的一个工具, 使用ADB可以直接操作管理Android模拟器或者真实的Andriod设备。ADB主要功能有:
- 在python里面,读取或写入csv文件时,首先要import csv这个库,然后利用这个库提供的方法进行对文件的读写。典型的数据集stoc
- Python空格的转义字符因为平时用到空格都是直接打一个空格,今天突然想到空格的转义字符是什么。查了查,发现空格没有像换行(\n)、制表符(
- 今天在使用PyTorch中Dataset遇到了一个问题。先看代码class psDataset(Dataset): def __
- <input type="button" onclick="opened('ALERT'
- 一、方法2此方法是两个表构建某一相同字段,然后全连接,在做匹配结果筛选,此方法针对数据量不大的时候,逻辑比较简单,但是内存消耗较大1. 导入
- 当用户关闭cookie时,通过asp程序提示访客打开,源代码如下: Dim strCookie, strT
- PHP session用法其实很简单它可以把用户提交的数据以全局变量形式保存在一个session中并且会生成一个唯一的session_id,
- drop PROCEDURE if exists my_procedure; create PROCEDURE my_procedure()
- 前言:数据的排序是比较常用的操作,DataFrame 的排序分为两种,一种是对索引进行排序,另一种是对值进行排序,接下来就分别介绍一下。1.
- JS获取多选框checkbox被选中的个数。var checkbox = document.getElementsByName("
- HTML5本地存储初探(二)完成了数据的本地存储,就要将文件存储也搞定。为了实现文件的本地存储,html5搞了一个叫 manifest 的文
- 本文实例讲述了thinkphp5框架调用其它控制器方法 实现自定义跳转界面功能。分享给大家供大家参考,具体如下:Loader::action
- 在一个项目中,制作呃echart图表的时候,遇到一个需求,需要从后端接口获取数据----售票员的姓名和业绩所以需要在订单表中,获取不同售票员