spring与disruptor集成的简单示例
作者:Muroidea 发布时间:2021-12-16 11:01:41
标签:spring,disruptor
disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用 A 向应用 B 传递数据 . 数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力. 使用mq 太重量级,所以选择了disruptor. 也可以使用Reactor
BaseQueueHelper.java
/**
* lmax.disruptor 高效队列处理模板. 支持初始队列,即在init()前进行发布。
*
* 调用init()时才真正启动线程开始处理 系统退出自动清理资源.
*
* @author xielongwang
* @create 2018-01-18 下午3:49
* @email xielong.wang@nvr-china.com
* @description
*/
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
/**
* 记录所有的队列,系统退出时统一清理资源
*/
private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
/**
* Disruptor 对象
*/
private Disruptor<E> disruptor;
/**
* RingBuffer
*/
private RingBuffer<E> ringBuffer;
/**
* initQueue
*/
private List<D> initQueue = new ArrayList<D>();
/**
* 队列大小
*
* @return 队列长度,必须是2的幂
*/
protected abstract int getQueueSize();
/**
* 事件工厂
*
* @return EventFactory
*/
protected abstract EventFactory<E> eventFactory();
/**
* 事件消费者
*
* @return WorkHandler[]
*/
protected abstract WorkHandler[] getHandler();
/**
* 初始化
*/
public void init() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
disruptor.setDefaultExceptionHandler(new MyHandlerException());
disruptor.handleEventsWithWorkerPool(getHandler());
ringBuffer = disruptor.start();
//初始化数据发布
for (D data : initQueue) {
ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
@Override
public void translateTo(E event, long sequence, D data) {
event.setValue(data);
}
}, data);
}
//加入资源清理钩子
synchronized (queueHelperList) {
if (queueHelperList.isEmpty()) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (BaseQueueHelper baseQueueHelper : queueHelperList) {
baseQueueHelper.shutdown();
}
}
});
}
queueHelperList.add(this);
}
}
/**
* 如果要改变线程执行优先级,override此策略. YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU,
* 慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景.
*
* @return WaitStrategy
*/
protected abstract WaitStrategy getStrategy();
/**
* 插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理.
*/
public synchronized void publishEvent(D data) {
if (ringBuffer == null) {
initQueue.add(data);
return;
}
ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
@Override
public void translateTo(E event, long sequence, D data) {
event.setValue(data);
}
}, data);
}
/**
* 关闭队列
*/
public void shutdown() {
disruptor.shutdown();
}
}
EventFactory.java
/**
* @author xielongwang
* @create 2018-01-18 下午6:24
* @email xielong.wang@nvr-china.com
* @description
*/
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
@Override
public SeriesDataEvent newInstance() {
return new SeriesDataEvent();
}
}
MyHandlerException.java
public class MyHandlerException implements ExceptionHandler {
private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);
/*
* (non-Javadoc) 运行过程中发生时的异常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
* , long, java.lang.Object)
*/
@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
ex.printStackTrace();
logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
}
/*
* (non-Javadoc) 启动时的异常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
* Throwable)
*/
@Override
public void handleOnStartException(Throwable ex) {
logger.error("start disruptor error ==[{}]!", ex.getMessage());
}
/*
* (non-Javadoc) 关闭时的异常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
* .Throwable)
*/
@Override
public void handleOnShutdownException(Throwable ex) {
logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
}
}
SeriesData.java (代表应用A发送给应用B的消息)
public class SeriesData {
private String deviceInfoStr;
public SeriesData() {
}
public SeriesData(String deviceInfoStr) {
this.deviceInfoStr = deviceInfoStr;
}
public String getDeviceInfoStr() {
return deviceInfoStr;
}
public void setDeviceInfoStr(String deviceInfoStr) {
this.deviceInfoStr = deviceInfoStr;
}
@Override
public String toString() {
return "SeriesData{" +
"deviceInfoStr='" + deviceInfoStr + '\'' +
'}';
}
}
SeriesDataEvent.java
public class SeriesDataEvent extends ValueWrapper<SeriesData> {
}
SeriesDataEventHandler.java
public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
@Autowired
private DeviceInfoService deviceInfoService;
@Override
public void onEvent(SeriesDataEvent event) {
if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
logger.warn("receiver series data is empty!");
}
//业务处理
deviceInfoService.processData(event.getValue().getDeviceInfoStr());
}
}
SeriesDataEventQueueHelper.java
@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
private static final int QUEUE_SIZE = 1024;
@Autowired
private List<SeriesDataEventHandler> seriesDataEventHandler;
@Override
protected int getQueueSize() {
return QUEUE_SIZE;
}
@Override
protected com.lmax.disruptor.EventFactory eventFactory() {
return new EventFactory();
}
@Override
protected WorkHandler[] getHandler() {
int size = seriesDataEventHandler.size();
SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
return paramEventHandlers;
}
@Override
protected WaitStrategy getStrategy() {
return new BlockingWaitStrategy();
//return new YieldingWaitStrategy();
}
@Override
public void afterPropertiesSet() throws Exception {
this.init();
}
}
ValueWrapper.java
public abstract class ValueWrapper<T> {
private T value;
public ValueWrapper() {}
public ValueWrapper(T value) {
this.value = value;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
}
DisruptorConfig.java
@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多实例几个消费者
public class DisruptorConfig {
/**
* smsParamEventHandler1
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler1() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler2
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler2() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler3
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler3() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler4
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler4() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler5
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler5() {
return new SeriesDataEventHandler();
}
}
测试
//注入SeriesDataEventQueueHelper消息生产者
@Autowired
private SeriesDataEventQueueHelper seriesDataEventQueueHelper;
@RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
long startTime1 = System.currentTimeMillis();
if (StringUtils.isEmpty(deviceData)) {
logger.info("receiver data is empty !");
return new DataResponseVo<String>(400, "failed");
}
seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
long startTime2 = System.currentTimeMillis();
logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
return new DataResponseVo<String>(200, "success");
}
应用A通过/data 接口把数据发送到应用B ,然后通过seriesDataEventQueueHelper 把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A. 可接受消息丢失, 可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控
来源:http://blog.csdn.net/u014087707/article/details/79340463


猜你喜欢
- 一、this关键字1.this的类型:哪个对象调用就是哪个对象的引用类型二、用法总结1.this.data; //访问属性2.this.fu
- 本文实例讲述了Android编程实现添加低电流提醒功能的方法。分享给大家供大家参考,具体如下:特殊需求,检测电流是否正常。监听如下广播:In
- 本文实例讲述了java简单列出文件夹下所有文件的方法。分享给大家供大家参考,具体如下:import Java.io.*;public cla
- Android Intent调用 Uri的方法总结//调用浏览器Uri uri = Uri.parse(""); Int
- 简单几步,实现SpringMVC+servlet3.0文件上传功能:第一步:配置web.xml文件中的servlet,添加multipart
- 现在越来越多的软件都开始使用沉浸式状态栏了,下面总结一下沉浸式状态栏的两种使用方法注意!沉浸式状态栏只支持安卓4.4及以上的版本状态栏:4.
- 前言报表输出是Java应用开发中经常涉及的内容,而一般的报表往往缺乏通用性,不方便用户进行个性化编辑。Java程序由于其跨平台特性,不能直接
- Android 重写ViewGroup 分析onMeasure()和onLayout()方法在继承ViewGroup类时,需要重写两个方法,
- 要求:如下图,使用线程操作 1、实时显示当前时间 2、输入加数和被加数,自动出现结果 分析:两个问题解决的方式一致,使用子线程进
- 今天一个读者问我关于Android通过调用Webservice实现天气预报这篇文章的源码下载后出现的错误Could not find cla
- 这里我们给定一个集合strings一、写法1–循环for (int i = 0, len = strings.size(); i <
- 这一篇,给大家介绍一下ImageView控件的使用,ImageView主要是用来显示图片,可以对图片进行放大、缩小、旋转的功能。androi
- byte:java中最小的数据类型。1字节/8位。-128(2^7)~127(2^7-1),默认值0。short:短整型,2字节/16位,取
- 最近项目需要在浏览器中通过URL预览图片。但发现浏览器始终默认下载,而不是预览。研究了一下,发现了问题:// 设置response的Head
- 最近有一款2048的游戏非常火,本文将来介绍一下使用OGEngine游戏引擎开发游戏2048。OGEngine引擎是开源的,我们很容易找到,
- 1.功能介绍Spring框架提供了线程池和定时任务执行的抽象接口:TaskExecutor和TaskScheduler来支持异步执行任务和定
- 由于需要访问MongoDB,但是本地开发环境不能直接连接MongoDB,需要通过SecureCRT使用127.0.0.2本地IP代理。但是程
- 一问道StringBuffer与StringBuilder的区别,张口就来StringBuffer是线程安全的,因为它相关方法都加了sync
- 写在前面并发编程一直都存在,只不过过去的很长时间里,比较难以实现,随着互联网的发展,人口红利的释放,更加友好的支持并发编程已经成了主流编程语
- 配置准备在build.gradle文件中添加如下依赖: compile "org.elasticsearc