java发送kafka事务消息的实现方法
作者:逆风飞翔的小叔 发布时间:2022-05-17 01:18:35
标签:java,kafka,事务
前言
事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需要在程序中添加上事务注解即可
kafka客户端事务,直接使用客户端提供的相关的API即可,和jdbc事务的使用很类似,主要包含下面5个API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
下面结合实际的代码以及效果演示进行说明
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerTransaction {
public static void main(String[] args) throws Exception {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
// 3. 创建 kafka 生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();
System.out.println("开始发送消息");
try {
// 4. 调用 send 方法,发送消息
for (int i = 0; i < 5; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i));
}
//int i = 1 / 0;
// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
System.out.println(e);
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}
运行上面的代码,正常是可以发送到指定的topic下
接下来,我们将上面的代码中的 1/0 放开,再次运行程序,可以看到,程序中抛异常了,但是消息并没有发送到kafka的broker,说明事务的配置生效了
来源:https://blog.csdn.net/congge_study/article/details/123190421
![](https://www.aspxhome.com/images/zang.png)
![](https://www.aspxhome.com/images/jiucuo.png)
猜你喜欢
- 前言在介绍Dubbo之前先了解一下基本概念:Dubbo是一个RPC框架,RPC,即Remote Procedure Call(远程过程调用)
- 这篇文章主要介绍了Java如何基于ProcessBuilder类调用外部程序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的
- 这篇文章介绍了Java+Nginx实现POP、IMAP、SMTP邮箱代理服务,我们本次使用的环境为Centos7下,java程序我们通过ec
- 使用InsertProvider注解报错解决目前项目在使用mybatis,并且是使用注解的方式。在使用InsertProvider注解的时候
- 前言我们会使用git来保存我们项目的配置文件,但是文件中总有一些敏感数据,对于这些敏感数据我们通常需要给它加密,加密通常有两种加密方式,一种
- 我就废话不多说了,大家还是直接看代码吧~<?xml version="1.0" encoding="UT
- 本文实例讲述了java继承中的构造方法。分享给大家供大家参考。具体如下:继承中的构造方法: 1、子类的构造过程中必须调用其基类的构造方法。2
- 本文实例为大家分享了Java通讯录系统的具体代码,供大家参考,具体内容如下import java.util.Scanner;class Pe
- 前言在上一篇中,我们初步了解了Sentinel的基本概念,以及其有关限流方面的基础理论,本篇将通过简单的与框架进行整合,看看Sentinel
- 本文实例为大家分享了JAVA NIO实现简单聊天室功能的具体代码,供大家参考,具体内容如下服务端初始化一个ServerSocketChann
- 开发 Web 应用的思路实现一个简单的 JSP/Servlet。搭建创建 Web 应用工程的环境。创建 Web 应用工程。Web 应用工程的
- 一,FileWritter写入文件FileWritter, 字符流写入字符到文件。默认情况下,它会使用新的内容取代所有现有的内容,然而,当指
- Commons Beanutils是Apache开源组
- 我们在项目中都会遇到项目打包,可以通过assembly对我们的项目进行打包。针对打包构建jar包,本文不再叙述。具体可以参考maven插件a
- 字符串每隔4位加空格今天弄了个银行卡识别功能,回显的时候想要将银行卡号每四位加一个空格,这样核对卡号会方便很多,这里记录一下1.正则表达式实
- 在源码的阅读过程中,可以了解别人实现某个功能的涉及思路,看看他们是怎么想,怎么做的。接下来,我们看看这篇Java源码解析之object的详细
- 前言公司最近在开发中遇到一个问题,在弄帖子的发布与回复问题,然后再iOS端和Android端添加表情的时候都会出错Caused by: ja
- 1、对称二叉树【OJ链接】分为以下几种情况:二叉树为空,是对称二叉树二叉树不为空,其左子树或者右子树为空,不是对称二叉树二叉树不为空,左右子
- 请求SpringBoot接受前台参数的六种方式,首先因为从前台发送的请求没有界面的话只能是从地址栏发送并且只能是Get请求,为了测试其他的请
- 介绍建造者模式(Builder Pattern):将一个复杂对象的构建与它的表示分离,使得同样的构建过程可以创建不同的表示。建造者模式是一种