Java Kafka实现延迟队列的示例代码
作者:整点bug 发布时间:2022-01-11 16:32:11
kafka作为一个使用广泛的消息队列,很多人都不会陌生,但当你在网上搜索“kafka 延迟队列”,出现的都是一些讲解时间轮或者只是提供了一些思路,并没有一份真实可用的代码实现,今天我们就来打破这个现象,提供一份可运行的代码,抛砖引玉,吸引更多的大神来分享。
基于kafka如何实现延迟队列
想要解决一个问题,我们需要先分解问题。kafka作为一个高性能的消息队列,只要消费能力足够,发出的消息都是会立刻收到的,因此我们需要想一个办法,让消息延迟发送出去。
网上已经有大神给出了如下方案:
在发送延迟消息时不直接发送到目标topic,而是发送到一个用于处理延迟消息的topic,例如
delay-minutes-1
写一段代码拉取
delay-minutes-1
中的消息,将满足条件的消息发送到真正的目标主题里。
就像画一匹马一样简单。
方案是好的,但是我们还需要更多细节。
完善细节
问题出在哪里?
问题出在延迟消息发出去之后,代码程序就会立刻收到延迟消息,要如何处理才能让延迟消息等待一段时间才发送到真正的topic里面。
可能有同学会觉得很简单嘛,在代码程序收到消息之后判断条件不满足,就调用sleep
方法,过了一段时间我再进行下一个循环拉取消息。
真的可行吗?
一切好像都很美好,但这是不可行的。
这是因为在轮询kafka拉取消息的时候,它会返回由max.poll.records
配置指定的一批消息,但是当程序代码不能在max.poll.interval.ms
配置的期望时间内处理这些消息的话,kafka就会认为这个消费者已经挂了,会进行rebalance
,同时你这个消费者就无法再拉取到任何消息了。
举个例子:当你需要一个24小时的延迟消息队列,在代码里面写下了Thread.sleep(1000*60*60*24);
,为了不发生rebalance
,你把max.poll.interval.ms
也改成了1000*60*60*24
,这个时候你或许会感觉到一丝丝的怪异,我是谁?我在哪?我为什么要写出来这样的代码?
其实我们可以更优雅的处理这个问题。
KafkaConsumer 提供了暂停和恢复的API函数,调用消费者的暂停方法后就无法再拉取到新的消息,同时长时间不消费kafka也不会认为这个消费者已经挂掉了。另外为了能够更加优雅,我们会启动一个定时器来替换sleep
。,完整流程如下图,当消费者发现消息不满足条件时,我们就暂停消费者,并把偏移量seek到上一次消费的位置以便等待下一个周期再次消费这条消息。
Java代码实现
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
@SpringBootTest
public class DelayQueueTest {
private KafkaConsumer<String, String> consumer;
private KafkaProducer<String, String> producer;
private volatile Boolean exit = false;
private final Object lock = new Object();
private final String servers = "";
@BeforeEach
void initConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
}
@BeforeEach
void initProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
@Test
void testDelayQueue() throws JsonProcessingException, InterruptedException {
String topic = "delay-minutes-1";
List<String> topics = Collections.singletonList(topic);
consumer.subscribe(topics);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (lock) {
consumer.resume(consumer.paused());
lock.notify();
}
}
}, 0, 1000);
do {
synchronized (lock) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));
if (consumerRecords.isEmpty()) {
lock.wait();
continue;
}
boolean timed = false;
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
long timestamp = consumerRecord.timestamp();
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
if (timestamp + 60 * 1000 < System.currentTimeMillis()) {
String value = consumerRecord.value();
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(value);
JsonNode jsonNodeTopic = jsonNode.get("topic");
String appTopic = null, appKey = null, appValue = null;
if (jsonNodeTopic != null) {
appTopic = jsonNodeTopic.asText();
}
if (appTopic == null) {
continue;
}
JsonNode jsonNodeKey = jsonNode.get("key");
if (jsonNodeKey != null) {
appKey = jsonNode.asText();
}
JsonNode jsonNodeValue = jsonNode.get("value");
if (jsonNodeValue != null) {
appValue = jsonNodeValue.asText();
}
// send to application topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
try {
producer.send(producerRecord).get();
// success. commit message
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
metadataHashMap.put(topicPartition, offsetAndMetadata);
consumer.commitSync(metadataHashMap);
} catch (ExecutionException e) {
consumer.pause(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
timed = true;
break;
}
} else {
consumer.pause(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
timed = true;
break;
}
}
if (timed) {
lock.wait();
}
}
} while (!exit);
}
}
这段程序是基于SpringBoot 2.4.4
版本和 kafka-client 2.7.0
版本编写的一个单元测试,需要修改私有变量servers
为kafka broker的地址。
在启动程序后,向Topic delay-minutes-
1 发送如以下格式的json字符串数据
{
"topic": "target",
"key": "key1",
"value": "value1"
}
同时启动一个消费者监听topic target
,在一分钟后,将会收到一条 key="key1", value="value1"的数据。
源代码地址
还需要做什么
创建多个topic用于处理不同时间的延迟消息,例如delay-minutes-1
delay-minutes-5
delay-minutes-10
delay-minutes-15
以提供指数级别的延迟时间,这样比一个topic要好很多,毕竟在顺序拉取消息的时候,有一条消息不满足条件,后面的将全部进行排队。
来源:https://zhuanlan.zhihu.com/p/365802989


猜你喜欢
- 1、如果只是一个简单的springboot demo,用以下配置就行新建config类```import org.springf
- 网格布局标签是GridLayout。这个布局是android4.0新增的布局。这个布局只有4.0之后的版本才能使用。不过新增了一些东东①跟L
- 建库建表DROP DATABASE IF EXISTS mp;CREATE DATABASE mp DEFAULT CHARACTER SE
- 那些GC的默认值其实GC或者说JVM的参数非常非常的多,有控制内存使用的:有控制JIT的:有控制分代比例的,也有控制GC并发的:当然,大部分
- 1.添加引用Windows服务(.NET Framework)2.输入项目名称,选择安装位置,,选择安装框架版本;创建。3.找到MyServ
- 有时候我们需要实现这样的场景,类似进入开发者模式,即多次点击后执行操作。首先我们先看一个方法:System提供的一个静态方法arraycop
- 动态内存管理为什么存在动态内存分配我们到现在为止掌握的是什么样的内存开辟方式呢//创建一个变量int val = 20; &n
- 可以用如下方法: 修改AudioYusuStreamOut.cpp,添加方法: void AudioYusuStreamOut::swS2M
- 比如有海量的文本文件,如订单,页面点击事件的记录,量特别大,很难搞定。那么我们该怎样解决海量数据的计算?1、获取总行数2、计算每个文件中存多
- 文章主要涉及到以下几个问题:怎么实现Java的序列化为什么实现了java.io.Serializable接口才能被序列化transient的
- 前言本文主要给大家介绍的是java虚拟机的故障处理工具,文中提到这些工具包括:名称主要作用jpsJVM process Status Too
- 最近在做一个项目,需要用到非对称加密,但是出现一个很诡异的情况,本地开发环境是Windows环境,测试环境是Linux环境,出现一个问题,
- 什么是自旋锁说道自旋锁就要从多线程下的锁机制说起,由于在多处理器系统环境中有些资源因为其有限性,有时需要互斥访问(mutual exclus
- 1、装箱和拆箱是一个抽象的概念 2、装箱是将值类型转换为引用类型 ;拆箱是将引用类型转换为值类型利用装箱和拆箱功能,可通过允许值类型的任何值
- 本文实例讲述了Android编程中Tween动画和Frame动画实现方法。分享给大家供大家参考,具体如下:Animation主要有两种动画模
- 本文介绍如何在springboot中使用默认的spring cache,声明式缓存Spring 定义 CacheManager 和 Cach
- 简介Exchanger是一个用于线程间数据交换的工具类,它提供一个公共点,在这个公共点,两个线程可以交换彼此的数据。当一个线程调用excha
- 一、关键字分类C语言一共多少个关键字呢?一般的书上,都是32个(包括本书),但是这个都是C90(C89)的标准。其实C99后又新增了5个关键
- 简单工厂模式的概念就是建立一个工厂类,对实现了同一接口的一些类进行实例的创建。简单工厂模式的实质是由一个工厂类根据传入的参数,动态决定应该创
- @Autowired注解在抽象类中失效最近在工作中遇到这个问题,在抽象类中使用Autowired这个注解,注入mybatis的dao时,总是