java Disruptor构建高性能内存队列使用详解
作者:码猿技术专栏 发布时间:2022-02-03 00:41:14
Java中有哪些队列
ArrayBlockingQueue
使用ReentrantLockLinkedBlockingQueue
使用ReentrantLockConcurrentLinkedQueue
使用CAS
等等
我们清楚使用锁的性能比较低,尽量使用无锁设计。接下来就我们来认识下
Disruptor简单使用
github地址:github.com/LMAX-Exchan…
先简单介绍下:
Disruptor它是一个开源的并发框架,并获得2011 Duke’s程序框架创新奖【Oracle】,能够在无锁的情况下实现网络的Queue并发操作。英国外汇交易公司LMAX开发的一个高性能队列,号称单线程能支撑每秒600万订单~
日志框架Log4j2 异步模式采用了Disruptor来处理
局限呢,他就是个内存队列,也就是说无法支撑分布式场景。
简单使用
数据传输对象
@Data
public class EventData {
private Long value;
}
消费者
public class EventConsumer implements WorkHandler<EventData> {
/**
* 消费回调
* @param eventData
* @throws Exception
*/
@Override
public void onEvent(EventData eventData) throws Exception {
Thread.sleep(5000);
System.out.println(Thread.currentThread() + ", eventData:" + eventData.getValue());
}
}
生产者
public class EventProducer {
private final RingBuffer<EventData> ringBuffer;
public EventProducer(RingBuffer<EventData> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void sendData(Long v){
// cas展位
long next = ringBuffer.next();
try {
EventData eventData = ringBuffer.get(next);
eventData.setValue(v);
} finally {
// 通知等待的消费者
System.out.println("EventProducer send success, sequence:"+next);
ringBuffer.publish(next);
}
}
}
测试类
public class DisruptorTest {
public static void main(String[] args) {
// 2的n次方
int bufferSize = 8;
Disruptor<EventData> disruptor = new Disruptor<EventData>(
() -> new EventData(), // 事件工厂
bufferSize, // 环形数组大小
Executors.defaultThreadFactory(), // 线程池工厂
ProducerType.MULTI, // 支持多事件发布者
new BlockingWaitStrategy()); // 等待策略
// 设置消费者
disruptor.handleEventsWithWorkerPool(
new EventConsumer(),
new EventConsumer(),
new EventConsumer(),
new EventConsumer());
disruptor.start();
RingBuffer<EventData> ringBuffer = disruptor.getRingBuffer();
EventProducer eventProducer = new EventProducer(ringBuffer);
long i = 0;
for(;;){
i++;
eventProducer.sendData(i);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
核心组件
基于上面简单例子来看确实很简单,Disruptor帮我们封装好了生产消费模型的实现,接下来我们来看下他是基于哪些核心组件来支撑起一个高性能无锁队列呢?
RingBuffer: 环形数组,底层使用数组entries,在初始化时填充数组,避免不断新建对象带来的开销。后续只会对entries做更新操作
Sequencer: 核心管家
定义生产同步的实现:
SingleProducerSequencer
单生产、MultiProducerSequencer
多生产当前写的进度Sequence cursor
所有消费者进度的数组
Sequence[] gatingSequences
MultiProducerSequencer
可用区availableBuffer
【利用空间换取查询效率】
Sequence: 本身就是一个序号器用来标识处理进度,也可以当做是一个atomicInteger
; 还有另外一个特点,为了解决伪共享问题而引入的:缓存行填充。这个在后面介绍。
workProcessor: 处理Event的循环,在循环中获取Disruptor的事件,然后把事件分配给各个handler
EventHandler: 负责业务逻辑的handler,自己实现。
WaitStrategy: 消费者 如何等待 事件的策略,定义了如下策略
leepingWaitStrategy
:自旋 + yield + sleep
BlockingWaitStrategy
:加锁,适合CPU资源紧张(不需要切换线程),系统吞吐量无要求的
YieldingWaitStrategy
:自旋 + yield + 自旋
BusySpinWaitStrategy
:自旋,减少线程之前切换
PhasedBackoffWaitStrategy
:自旋 + yield + 自定义策略
带着问题来解析代码?
1、多生产者如何保证消息生产不会相互覆盖。【如何达到互斥效果】
每个线程获取不同的一段数组空间,然后通过CAS判断这段空间是否已经分配出去。
接下来我们看下多生产类MultiProducerSequencer
中next方法【获取生产序号】
// 消费者上一次消费的最小序号 // 后续第二点会讲到
private final Sequence gatingSequenceCache = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 当前进度的序号
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
// 所有消费者的序号 //后续第二点会讲到
protected volatile Sequence[] gatingSequences = new Sequence[0];
public long next(int n)
{
if (n < 1)
{
throw new IllegalArgumentException("n must be > 0");
}
long current;
long next;
do
{
// 当前进度的序号,Sequence的value具有可见性,保证多线程间线程之间能感知到可申请的最新值
current = cursor.get();
// 要申请的序号空间:最大序列号
next = current + n;
long wrapPoint = next - bufferSize;
// 消费者最小序列号
long cachedGatingSequence = gatingSequenceCache.get();
// 大于一圈 || 最小消费序列号>当前进度
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > current)
{
long gatingSequence = Util.getMinimumSequence(gatingSequences, current);
// 说明大于1圈,并没有多余空间可以申请
if (wrapPoint > gatingSequence)
{
LockSupport.parkNanos(1); // TODO, should we spin based on the wait strategy?
continue;
}
// 更新最小值到Sequence的value中
gatingSequenceCache.set(gatingSequence);
}
// CAS成功后更新当前Sequence的value
else if (cursor.compareAndSet(current, next))
{
break;
}
}
while (true);
return next;
}
2、生产者向序号器申请写的序号,如序号正在被消费,Sequencer是如何知道哪些序号是可以被写入的呢?【未消费则被覆盖如何处理】
从gatingSequences中取得最小的序号,生产者最多能写到这个序号的后一位。通俗来讲就是申请的序号不能大于最小消费者序号一圈【申请到最大序列号-buffersize 要小于/等于 最小消费的序列号】的时候, 才能申请到当前写的序号
public final EventHandlerGroup<T> handleEventsWithWorkerPool(final WorkHandler<T>... workHandlers)
{
return createWorkerPool(new Sequence[0], workHandlers);
}
EventHandlerGroup<T> createWorkerPool(
final Sequence[] barrierSequences, final WorkHandler<? super T>[] workHandlers)
{
final SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(barrierSequences);
final WorkerPool<T> workerPool = new WorkerPool<>(ringBuffer, sequenceBarrier, exceptionHandler, workHandlers);
consumerRepository.add(workerPool, sequenceBarrier);
final Sequence[] workerSequences = workerPool.getWorkerSequences();
updateGatingSequencesForNextInChain(barrierSequences, workerSequences);
return new EventHandlerGroup<>(this, consumerRepository, workerSequences);
}
private void updateGatingSequencesForNextInChain(final Sequence[] barrierSequences, final Sequence[] processorSequences)
{
if (processorSequences.length > 0)
{
// 消费者启动后就会将所有消费者存放入AbstractSequencer中gatingSequences
ringBuffer.addGatingSequences(processorSequences);
for (final Sequence barrierSequence : barrierSequences)
{
ringBuffer.removeGatingSequence(barrierSequence);
}
consumerRepository.unMarkEventProcessorsAsEndOfChain(barrierSequences);
}
}
3、在多生产者情况下,生产者是申请到一段可写入的序号,然后再写入这些序号中,那么消费者是如何感知哪些序号是可以被消费的呢?【借问提1图说明】
这个前提是多生产者情况下,第一点我们说过每个线程获取不同的一段数组空间,那么现在单单通过序号已经不够用了,MultiProducerSequencer
使用了int 数组 【availableBuffer
】来标识当前序号是否可用。当生产者成功生产事件后会将availableBuffer
中当前序列号置为1标识可以读取。
如此消费者可以读取的的最大序号就是我们availableBuffer
中第一个不可用序号-1。
初始化availableBuffer
流程
public MultiProducerSequencer(int bufferSize, final WaitStrategy waitStrategy)
{
super(bufferSize, waitStrategy);
// 初始化可用数组
availableBuffer = new int[bufferSize];
indexMask = bufferSize - 1;
indexShift = Util.log2(bufferSize);
initialiseAvailableBuffer();
}
// 初始化默认availableBuffer为-1
private void initialiseAvailableBuffer()
{
for (int i = availableBuffer.length - 1; i != 0; i--)
{
setAvailableBufferValue(i, -1);
}
setAvailableBufferValue(0, -1);
}
// 生产者成功生产事件将可用区数组置为1
public void publish(final long sequence)
{
setAvailable(sequence);
waitStrategy.signalAllWhenBlocking();
}
private void setAvailableBufferValue(int index, int flag)
{
long bufferAddress = (index * SCALE) + BASE;
UNSAFE.putOrderedInt(availableBuffer, bufferAddress, flag);
}
消费者消费流程
WorkProcessor类中消费run方法
public void run()
{
boolean processedSequence = true;
long cachedAvailableSequence = Long.MIN_VALUE;
long nextSequence = sequence.get();
T event = null;
while (true)
{
try
{
// 先通过cas获取消费事件的占有权
if (processedSequence)
{
processedSequence = false;
do
{
nextSequence = workSequence.get() + 1L;
sequence.set(nextSequence - 1L);
}
while (!workSequence.compareAndSet(nextSequence - 1L, nextSequence));
}
// 数据就绪,可以消费
if (cachedAvailableSequence >= nextSequence)
{
event = ringBuffer.get(nextSequence);
// 触发回调函数
workHandler.onEvent(event);
processedSequence = true;
}
else
{
// 获取可以被读取的下标
cachedAvailableSequence = sequenceBarrier.waitFor(nextSequence);
}
}
// ....省略
}
notifyShutdown();
running.set(false);
}
public long waitFor(final long sequence)
throws AlertException, InterruptedException, TimeoutException
{
checkAlert();
// 这个值获取的current write 下标,可以认为全局消费下标。此处与每一段的write1和write2下标区分开
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
if (availableSequence < sequence)
{
return availableSequence;
}
// 通过availableBuffer筛选出第一个不可用序号 -1
return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
// 从current read下标开始, 循环至 current write,如果碰到availableBuffer 为-1 直接返回
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}
return availableSequence;
}
解决伪共享问题
什么是伪共享问题呢?
为了提高CPU的速度,Cpu有高速缓存Cache,该缓存最小单位为缓存行CacheLine,他是从主内存复制的Cache的最小单位,通常是64字节。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。如果你访问一个long数组,当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。
伪共享问题是指,当多个线程共享某份数据时,线程1可能拉到线程2的数据在其cache line中,此时线程1修改数据,线程2取其数据时就要重新从内存中拉取,两个线程互相影响,导致数据虽然在cache line中,每次却要去内存中拉取。
Disruptor是如何解决的呢?
在value前后统一都加入7个Long类型进行填充,线程拉取时,不论如何都会占满整个缓存
回顾总结:Disuptor为何能称之为高性能的无锁队列框架呢?
缓存行填充,避免缓存频繁失效。【java8中也引入
@sun.misc.Contended
注解来避免伪共享】无锁竞争:通过CAS 【二阶段提交】
环形数组:数据都是覆盖,避免GC
底层更多的使用位运算来提升效率
来源:https://juejin.cn/post/7174966129431937080


猜你喜欢
- 在初始化自己位置的时候请求定位权限:Constants.ACCESS_FINE_LOCATION_COMMANDS_REQUEST_CODE
- 所谓的浮动工具栏,效果图如下:也就是说,可以将工具栏拖出其原先的停靠位置,而且可以将拖出来的工具栏再拖放回去。实现的基本思路如下1、拖动出来
- 前言在真实的项目开发中,使用SpringBoot可以说非常普遍了,而在框架整合中,与数据库的交互无外乎使用jpa,mybatis,mybat
- 一、idea打包项目第一步点击右边maven第二步点击compile,编译代码,编译成功后(双击运行)第三步点击package,打包代码二、
- 前言前段时间一直使用到word文档转pdf或者pdf转word,寻思着用Java应该是可以实现的,于是花了点时间写了个文件转换工具源码wel
- 前言一直很好奇Android Root的原理,恰好最近碰到了一个跟Android默认带Root权限的问题,这里顺便记录一下Android系统
- 关于滑动效果,在Android中用得比较多,本示例实现的滑动效果是使用ViewFlipper来实现的,当然也可以使用其它的View来实现。接
- 题目很简单, 就是IMessage对象怎么变成Byte[]答案1:msg.ToByteArray()这肯定不符合我们的要求答案2:using
- @Bean修饰的方法参数的注入方法参数默认注入方式为Autowired,即先根据类型匹配,若有多个在根据名称进行匹配。1:复杂类型可以通过@
- Android 微信摇一摇功能实现,最近学习传感器,就想实现摇一摇的功能,上网查了些资料,就整理下。如有错误,还请指正。开发环境Androi
- 接口等幂性通俗的来说就是同一时间内,发起多次请求只有一次请求成功;其目的时防止多次提交,数据重复入库,表单验证网络延迟重复提交等问题。比如:
- 通过shift+shift可以调出搜索窗口或者ctrl+n但是,如果想搜索jdk中的类,只是在搜索栏中是无法搜出来的需要勾选 红框内的选项没
- 概要:点赞头像效果使用的地方很多,实现的方式也很多,下面通过使用RecyclerView实现一下1、创建布局文件 一个recyclervie
- 本文实例讲述了Android viewpager中动态添加view并实现伪无限循环的方法。分享给大家供大家参考,具体如下:viewpager
- 1.什么是串口?在不会使用串口通讯之前,暂且可以把它理解为“一个可通讯的口”;使用篇不深入探讨理论及
- 本文主要是分析Spring bean的循环依赖,以及Spring的解决方式。 通过这种解决方式,我们可以应用在我们实际开发项目中。1. 什么
- 1.内部类概念及分类将一个类定义在另一个类的内部或者接口内部或者方法体内部,这个类就被称为内部类,我们不妨将内部类所在的类称为外围类,除了定
- Unity 有点击屏幕进行移动操作,通过Input.GetMouseButtonDown(0)。如果点击到了一些UI上面会触发点击屏幕事件。
- Gradle和Maven都是当前热门的自动化构建工具。使用Gradle去构建项目,由于没有办法像Maven一样配置Setting文件来修改本
- 只要了解过多线程,我们就知道线程开始的顺序跟执行的顺序是不一样的。如果只是创建三个线程然后执行,最后的执行顺序是不可预期的。这是因为在创建完