java发送kafka事务消息的实现方法
作者:逆风飞翔的小叔 发布时间:2022-05-17 01:18:35
标签:java,kafka,事务
前言
事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需要在程序中添加上事务注解即可
kafka客户端事务,直接使用客户端提供的相关的API即可,和jdbc事务的使用很类似,主要包含下面5个API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
下面结合实际的代码以及效果演示进行说明
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerTransaction {
public static void main(String[] args) throws Exception {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
System.out.println("开始发送消息");
try {
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i));
}
//int i = 1 / 0;
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
System.out.println(e);
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}
运行上面的代码,正常是可以发送到指定的topic下
接下来,我们将上面的代码中的 1/0 放开,再次运行程序,可以看到,程序中抛异常了,但是消息并没有发送到kafka的broker,说明事务的配置生效了
来源:https://blog.csdn.net/congge_study/article/details/123190421


猜你喜欢
- 在C#当中,利用WebClient这个核心类,可以轻易的打造一个下载器。但是这里想要强调的是,我们用的是异步操作。所谓异步,是相对于同步的概
- 啥都不说先上效果图,这个是我项目里的效果:下面的是我抽取出来的 demo 适配啥的我基本上都做好了没做其他的ok 下面 说一下思路把首先 说
- 本文实例为大家分享了Android九宫格图片展示的具体代码,供大家参考,具体内容如下实现思路:首先说下布局,整个是一个横向的线性
- 1、IO流1.流和流的分类什么是IO流?I:Input (输入)O: Ouput(输出)IO流的分类?有多种分类方式:一种方式是按照流的方向
- 本文实例讲述了Java实现的计算最大下标距离算法。分享给大家供大家参考,具体如下:题目描述给定一个整形数组,找出最大下标距离j−i, 当且A
- 1、什么是反射?在java开发中有一个非常重要的概念就是java反射机制,也是java的重要特征之一。反射的概念是由Smith在1982年首
- 本文实例为大家分享了Android App自动更新通知栏下载的具体代码,供大家参考,具体内容如下版本更新说明这里有调用UpdateServi
- 线程概念进程:启动一个应用程序就叫一个进程。 接着又启动一个应用程序,这叫两个进程。每个进程都有一个独立的内存空间;进程也是程序的一次执行过
- 每一趟从待排序的数据元素中选出最小(或最大)的一个元素,顺序放在已排好序的数列的最后,直到全部待排序的数据元素排完。代码public cla
- 嵌套查询使用Fluent Mybatis, 不用手写一行xml文件或者Mapper文件,在dao类中即可使用java api构造中比较复杂的
- 两种方法,第一种是静态开启方法把application 或者 activity的主题设置为Theme.Holo即可<?xml vers
- 项目里面用到了语音唤醒功能,前面一直在用讯飞的语音识别,本来打算也是直接用讯飞的语音唤醒,但是讯飞的语音唤醒要收费,试用版只有35天有效期。
- 笔者最近需要上位机与下位机进行数据交互,在广泛参考大佬的资料后,较为完善地使用Textbox控件进行数据输入的功能。程序段主要功能:实现输入
- 〇、正则表达式的基本语法符号若只简单匹配固定字符串,则无需任何修饰符,例如:需要匹配字符串 77,则可直接写:new Regex(
- 目录卡顿原理卡顿监控ANR原理卡顿原理主线程有耗时操作会导致卡顿,卡顿超过阀值,触发ANR。 应用进程启动时候,Zygote会反射调用Act
- 系统: lc android4.4 在做前后摄像头录像的时候,发现会出现花屏的时候,但不是必现,可能会在某一次重启之后会出现,而且出现之后会
- 一种方法是可以在窗体的属性面板将窗体的 ControlBox属性设置为false,或者在窗体的构造函数中这样写:public F
- 没有借助任何第三方库,完全基于JAVA Socket实现一个最小化的HTTP文件下载客户端。完整的演示如何通过Socket实现下载文件的HT
- XML的特点是功能全面,但标签繁琐,格式复杂。在Web上使用XML现在越来越少,取而代之的是JSON这种数据结构。JSON是JavaScri
- 背景我们经常在网上下载一些视频教程,然而这些视频命名规则各不相同,即使对于相同类型的文件名来说,当文件数量很大且文件名全部是中文时,文件排序