被kafka-client和springkafka版本坑到自闭及解决
作者:追月亮的猴子 发布时间:2023-08-23 15:07:36
被kafka-client和springkafka版本坑
上周刚刚欢天喜地的在linux上部了kafka,这周打算用spring-boot框架写个简单demo跑一下,结果悲剧就此展开。
首先建立maven工程:pom中添加spring boot kafka依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafkaproducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafkaproducer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
配置文件如下:
server.port=8089
spring.kafka.bootstrap-servers=ip:port
spring.kafka.producer.retries= 0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.linger.ms=1
然后新建一个Producer类
package com.example.kafkaproducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
@Autowired
KafkaTemplate kafkaTemplate;
public void produce(){
kafkaTemplate.send("test","hello word");
System.out.println("发送消息");
}
}
在test类中调用
package com.example.kafkaproducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaproducerApplicationTests {
@Autowired KafkaProducer kafkaProducer;
@Test
public void contextLoads() {
kafkaProducer.produce();
}
}
然后控制台就会打印一个莫名奇妙的错误,没有打印任何堆栈信息,大概意思只是表达了连接不上。
Exception thrown when sending a message with key='null' and payload='' to topic
telnet ip+port 是可以通的
随后发现,xshell上启动的kafka-server在报这样一个错,更详细的没有留存。
ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18
百度了一下,很可能是Linux上的kafka版本和pom中引入的spring-kafka依赖不匹配造成的,于是查看对应关系。
查看kafka,发现装的是一个0.8.2.1 版本的kafka,该版本的kafka是2015年3月发布的版本,可以说是十分古老,真是不知道为什么当初要选这么老的版本。
换了几次spring-kafka的pom之后,依然在报这个问题,于是我选择换更新的kafka的包。
换了2.2.0版本kafka的包,问题得到解决。
其中consumer的创建命令和老版本的不太一样,且consumer和producer需使用相同的端口号,而不是像之前producer配置为broker的端口,consumer配置为zookeeper的端口号。
./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic test
且config文件夹下server.properties文件中的一些配置和之前不太一样,需要注意的是,以下两行配置原来是被注解了的,需要在这里取消掉注解,并配置自己的ip。
listeners = PLAINTEXT://your.host.name:9092
advertised.listeners=PLAINTEXT://your.host.name:9092
springboot、spring-kafka、kafka-client三者兼容性关系
spring官方描述的spring-kafka的版本和kafka-clients的版本对应关系:
官方地址:https://spring.io/projects/spring-kafka
中间列:“Spring Integration for Apache Kafka Version 可忽略不看:
也就是说spring-kafka与spring-client是存在在一对多关系的,那是不是他所有的spring-client都可以选呢?
接着往下看(摘自官网):
他说啥 ?
springboot 1.5 你应该用的是spring-kafka 1.3.x.
springboot2.0你应该使用的是spring-kafka2.0.x.
如果用的是spring boot2.1.x,那么你必须使用spring-kafka的版本是2.2.x。否则就会出现noClass等等各种异常。
spring-kafka的版本是2.1默认使用的spring-client是1.1.x,当你要使用另外两个时,你就要使用如下的版本配置.
如果你用的是2.2.x的spring-kafka,只看第一张图,你会以为2.1.x的kafka-clients也可以用。但是spring说了,此时默认用的kafka-clients是2.0.x,如果你想用2.1.x,必须看文档附录,下图的大概意思,必须换掉下图所示的所有依赖版本。
也就是说并不是一对多 他默认的还是只有一个kafka-client来给你的,你要选其他的可以的,你添加一些额外配置
例如:
Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本为 2.2.0.RELEASE,kafka-clients 的默认版本为2.0.0,所以 kafka 的版本选用为 kafka_2.11-2.1.0 (前面的2.11代表的是Scala的版本后面为kafka的版本号)
来源:https://blog.csdn.net/qq_27707957/article/details/90553235
猜你喜欢
- JdbcTypeInterceptor运行时自动添加 jdbcType 属性 * 签名@Intercepts({
- 本文实例为大家分享了Java金额大小写转换的具体代码,供大家参考,具体内容如下/** * @ClassName: NumberConver
- 在基于UI元素的自动化测试中, 无论是桌面的UI自动化测试,还是Web的UI自动化测试. 首先我们需要查找和识别UI
- 前言上一篇我们认识了Kotlin编程语言,也搭建好开发环境。本篇就进入Kotlin的基础语法介绍,与其他编程语言一样,Kotlin也有自己的
- 目录1、Jetbrains官网下载IntelliJ IDEA2、安装 IntelliJ IDEA3、注册4、参考链接1、Jetbrains官
- 背景最近好几个项目在运行过程中客户都提出文件上传大小的限制能否设置的大一些,用户经常需要上传好几个G的资料文件,如图纸,视频等,并且需要在上
- 前言目前Flutter三大主流状态管理框架分别是provider、flutter_bloc、getx,三大状态管理框架各有优劣,本篇文章将介
- 协议做如下规定:规定数据协议:序列号 长度 状态字 数据长度 数据1 &n
- spring的自动装配功能的定义:无须在Spring配置文件中描述javaBean之间的依赖关系(如配置<property>、&
- 背景产品想对多次快速点击做一下优化,想要的效果就是双击不会打开多次但是从开发角度来说,我可以用kotlin的拓展方法来调整这个,但是之前的历
- EhCache 是一个纯Java的进程内缓存框架,具有快速、精干等特点,是Hibernate中默认的CacheProvider。ehcach
- 本文实例为大家分享了OpenCV实现直线检测并消除的具体代码,供大家参考,具体内容如下很简单,代码如下#include<iostrea
- 1、mybatis-plus @DS实现动态切换数据源原理首先mybatis-plus使用com.baomidou.dynamic.data
- 一、synchronized 有不足新事物的出现要不是替代老事物,要么就是对老事物的补充JUC 的 locks 就是对 synchroniz
- 一 :问题背景问题:当查询接口较复杂时候,数据的获取都需要[远程调用],必然需要花费更多的时间。 假如查询文章详情页面,需要如下标注的时间才
- 概述:Flutter中常用的滑动布局 ScrollView 有 SingleChildScrollView、NestedScrollView
- 实现原理: 长连接的维持,是要客户端程序,定时向服务端程序,发送一个
- 在实际应用中,我们往往有需要比较两个自定义对象大小的地方。而这些自定义对象的比较,就不像简单的整型数据那么简单,它们往往包含有许多的属性,我
- 本文主要介绍的是通过使用java的相关类可以实现对文件或文件夹的压缩。zlib是一种数据压缩程序库,它的设计目标是处理单纯的数据(而不管数据
- C++11 引入一个全新的线程库,包含启动和管理线程的工具,提供了同步(互斥、锁和原子变量)的方法,我将试图为你介绍这个全新的线