Java实现多线程大批量同步数据(分页)
作者:宋发元 发布时间:2021-12-18 17:41:18
标签:Java,同步数据,多线程
背景
最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据。
最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率。
一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!!
最后,改造了一番,2小时,就成功同步了300w+数据。
以下是主要逻辑。
线程的个数请根据你自己的服务器性能酌情设置。
思路
先通过count查出结果集的总条数,设置每个线程分页查询的条数,通过总条数和单次条数得到线程数量,通过改变limit的下标实现分批查询。
代码实现
package com.github.admin.controller.loans;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
/**
* 多线程同步历史数据
* @author songfayuan
* @date 2019-09-26 15:38
*/
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {
private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController"); //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
@Value("${spring.profiles.active}")
private String profile;
@Autowired
private DuyanCallRecordDetailService duyanCallRecordDetailService;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private CaseCallRemarkRecordService caseCallRemarkRecordService;
/**
* 多线程同步通话记录历史数据
* @param params
* @return
* @throws Exception
*/
@GetMapping("/syncHistoryData")
public Response syncHistoryData(Map<String, Object> params) throws Exception {
executor.execute(new Runnable() {
@Override
public void run() {
try {
logicHandler(params);
} catch (Exception e) {
log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
}
}
});
return Response.success("请求成功");
}
/**
* 处理数据逻辑
* @param params
* @throws Exception
*/
private void logicHandler(Map<String, Object> params) throws Exception {
/******返回结果:多线程处理完的最终数据******/
List<DuyanCallRecordDetail> result = new ArrayList<>();
/******查询数据库总的数据条数******/
int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
.eq("is_delete", 0)
.eq("platform_type", 1));
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");
// int count = 2620266;
/******限制每次查询的条数******/
int num = 1000;
/******计算需要查询的次数******/
int times = count / num;
if (count % num != 0) {
times = times + 1;
}
/******每个线程开始查询的行数******/
int offset = 0;
/******添加任务******/
List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
for (int i = 0; i < times; i++) {
Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
tasks.add(qfe);
offset = offset + num;
}
/******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/
List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
if (CollectionUtils.isNotEmpty(callableList)) {
// executor.execute(new Runnable() {
// @Override
// public void run() {
// log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName());
//
// log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName());
// }
// });
try {
List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
/******处理线程返回结果******/
if (futures != null && futures.size() > 0) {
for (Future<List<DuyanCallRecordDetail>> future : futures) {
List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
executor.execute(new Runnable() {
@Override
public void run() {
/******异步存储******/
log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
saveMongoDB(duyanCallRecordDetailList);
log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
}
});
}
//result.addAll(future.get());
}
}
} catch (Exception e) {
log.warn("任务拆分执行异常,errMsg = {}", e);
DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
}
}
}
}
/**
* 数据存储MongoDB
* @param duyanCallRecordDetailList
*/
private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
/******重复数据不同步MongoDB******/
org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
continue;
}
/******关联填写的记录******/
CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
.eq("is_delete", 0)
.eq("call_uuid", duyanCallRecordDetail.getCallUuid()));
CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
//补充
caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
if (caseCallRemarkRecord != null) {
//补充
caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());
}
log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
}
}
@Override
public void destroy() throws Exception {
executor.shutdown();
}
}
class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
/******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/
private DuyanCallRecordDetailService myService;
/******查询条件 根据条件来定义该类的属性******/
private Map<String, Object> params;
/******分页index******/
private int offset;
/******数量******/
private int num;
public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
this.myService = myService;
this.params = params;
this.offset = offset;
this.num = num;
}
@Override
public List<DuyanCallRecordDetail> call() throws Exception {
/******通过service查询得到对应结果******/
List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
.eq("is_delete", 0)
.eq("platform_type", 1)
.last("limit "+offset+", "+num));
return duyanCallRecordDetailList;
}
}
ListUtils工具
package com.github.common.util;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
/**
* 描述:List工具类
* @author songfayuan
* 2018年7月22日下午2:23:22
*/
@Slf4j
public class ListUtils {
/**
* 描述:list集合深拷贝
* @param src
* @return
* @throws IOException
* @throws ClassNotFoundException
* @author songfayuan
* 2018年7月22日下午2:35:23
*/
public static <T> List<T> deepCopy(List<T> src) {
try {
ByteArrayOutputStream byteout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteout);
out.writeObject(src);
ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
ObjectInputStream in = new ObjectInputStream(bytein);
@SuppressWarnings("unchecked")
List<T> dest = (List<T>) in.readObject();
return dest;
} catch (ClassNotFoundException e) {
e.printStackTrace();
return null;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* 描述:对象深拷贝
* @param src
* @return
* @throws IOException
* @throws ClassNotFoundException
* @author songfayuan
* 2018年12月14日
*/
public static <T> T objDeepCopy(T src) {
try {
ByteArrayOutputStream byteout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteout);
out.writeObject(src);
ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
ObjectInputStream in = new ObjectInputStream(bytein);
@SuppressWarnings("unchecked")
T dest = (T) in.readObject();
return dest;
} catch (ClassNotFoundException e) {
log.error("errMsg = {}", e);
return null;
} catch (IOException e) {
log.error("errMsg = {}", e);
return null;
}
}
/**
* 将一个list均分成n个list,主要通过偏移量来实现的
* @author songfayuan
* 2018年12月14日
*/
public static <T> List<List<T>> averageAssign(List<T> source, int n) {
List<List<T>> result = new ArrayList<List<T>>();
int remaider = source.size() % n; //(先计算出余数)
int number = source.size() / n; //然后是商
int offset = 0;//偏移量
for (int i = 0; i < n; i++) {
List<T> value = null;
if (remaider > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remaider--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
/**
* List按指定长度分割
* @param list the list to return consecutive sublists of (需要分隔的list)
* @param size the desired size of each sublist (the last may be smaller) (分隔的长度)
* @author songfayuan
* @date 2019-07-07 21:37
*/
public static <T> List<List<T>> partition(List<T> list, int size){
return Lists.partition(list, size); // 使用guava
}
/**
* 测试
* @param args
*/
public static void main(String[] args) {
List<Integer> bigList = new ArrayList<>();
for (int i = 0; i < 101; i++){
bigList.add(i);
}
log.info("bigList长度为:{}", bigList.size());
log.info("bigList为:{}", bigList);
List<List<Integer>> smallists = partition(bigList, 20);
log.info("smallists长度为:{}", smallists.size());
for (List<Integer> smallist : smallists) {
log.info("拆分结果:{},长度为:{}", smallist, smallist.size());
}
}
}
来源:https://blog.csdn.net/u011019141/article/details/101521565
![](https://www.aspxhome.com/images/zang.png)
![](https://www.aspxhome.com/images/jiucuo.png)
猜你喜欢
- 简介本文用示例介绍java的Duration的用法。Duration和Period说明Duration类通过秒和纳秒相结合来描述一个时间量,
- 前言SpringCloud 是微服务中的翘楚,最佳的落地方案。Spring Cloud Config 是一个解决分布式系统的配置管理方案,它
- 在一个完整的项目中,如果每一个控制器的方法都返回不同的结果,那么对项目的维护和扩展都会很麻烦;并且现在主流的开发模式时前后端分离的模式,如果
- 目录一、二叉树的顺序存储1.堆的存储方式2.下标关系二、堆(heap)1.概念2.大/小 根堆2.1小根堆2.2大根堆3.建堆操作3.1向下
- java 弹幕小游戏的最初版本,供大家参考,具体内容如下最近在学习javaSE,根据b站视频老师的讲解,也参考了他的代码,做了一个弹幕小游戏
- kk-anti-reptile 是适用于基于 spring-boot 开发的分布式系统的反爬虫组件。系统要求基于 spring-boot 开
- 1. @Conditional 注解@Conditional注解是Spring-context模块提供了一个注解,该注解的作用是可以根据一定
- MainActivity.java package com.zhang.showPhoto;import android.app.Actio
- import java.io.BufferedReader;import java.io.File;import java.io.FileI
- 本文实例讲述了C#快速排序算法。分享给大家供大家参考。具体实现方法如下:public static int[] QuickSort(int[
- /** * 快速计算二进制数中1的个数(Fast Bit Counting) * 该算法的思想如下: * 每次将该数与该数减一后的数值
- CyclicBarrier是什么CyclicBarrier是Java并发包中提供的一种同步工具类,它可以让多个线程在某个屏障处等待,直到所有
- 本文实例为大家分享了C#图像处理的具体代码,供大家参考,具体内容如下(1)在Form1窗体中的PictureBox1控件中显示通过OpenF
- 本文实例讲述了C#遍历系统进程的方法。分享给大家供大家参考。具体实现方法如下:建立一个listBox将进程名称遍历进去this.listBo
- 内存模型Flink可以使用堆内和堆外内存,内存模型如图所示:flink使用内存划分为堆内内存和堆外内存。按照用途可以划分为task所用内存,
- 一、前言前面我们讲了Java的入门知识,相信许多小伙伴对Java基础有一个大概的认识了,这也为我们后续的学习打下了基础,所以我们可以继续学习
- 如何快速构建一个Spring Boot的项目工具 ideaJDK版本 1.8Spring Boot 版本 1.5.9环境搭建实现:最基础前端
- 一、项目简述功能: 系统分为三个角色。最高权限管理员,学生,教师,包括 学生管理,教师管理,课程管理,选课,退课,成绩查 询。,教学课程,查
- 导入maven项目各个注解均报错所遇问题导入maven项目各个注解均报错了思考1:这个项目使用了springboot;spring是个”大容
- 一、默认静态资源路径类路径下:staticpublicresources这几个目录为默认静态资源访问的目录二、增加静态资源路径前缀动态资源和