Java分布式学习之Kafka消息队列
作者:kaico2018 发布时间:2022-04-10 22:22:47
介绍
Apache Kafka 是分布式发布-订阅消息系统,在 kafka官网上对 kafka 的定义:一个分布式发布-订阅消息传递系统。 它最初由LinkedIn公司开发,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
注意:Kafka并没有遵循JMS规范(),它只提供了发布和订阅通讯方式。
kafka中文官网:http://kafka.apachecn.org/quickstart.html
Kafka核心相关名称
Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群
Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发
massage: Kafka中最基本的传递对象。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。Kafka里面实现分区,一个broker就是表示一个区域。
Segment:partition物理上由多个segment组成,每个Segment存着message信息
Producer : 生产者,生产message发送到topic
Consumer : 消费者,订阅topic并消费message, consumer作为一个线程来消费
Consumer Group:消费者组,一个Consumer Group包含多个consumer
Offset:偏移量,理解为消息 partition 中消息的索引位置
主题和队列的区别:
队列是一个数据结构,遵循先进先出原则
kafka集群安装
参考官方文档:https://kafka.apachecn.org/quickstart.html
每台服务器上安装jdk1.8环境
安装Zookeeper集群环境
安装kafka集群环境
运行环境测试
安装jdk环境和zookeeper这里不详述了。
kafka为什么依赖于zookeeper:kafka会将mq信息存放到zookeeper上,为了使整个集群能够方便扩展,采用zookeeper的事件通知相互感知。
kafka集群安装步骤:
1、下载kafka的压缩包,下载地址:https://kafka.apachecn.org/downloads.html
2、解压安装包
tar -zxvf kafka_2.11-1.0.0.tgz
3、修改kafka的配置文件 config/server.properties
配置文件修改内容:
zookeeper连接地址:
zookeeper.connect=192.168.1.19:2181
监听的ip,修改为本机的ip
listeners=PLAINTEXT://192.168.1.19:9092
kafka的brokerid,每台broker的id都不一样
broker.id=0
4、依次启动kafka
./kafka-server-start.sh -daemon config/server.properties
kafka使用
kafka文件存储
topic是逻辑上的概念,而partition是物理上的概念,每个partition对应于一个log文件,该log文件中存储的就是Producer生成的数据。Producer生成的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment,每个segment包括:“.index”文件、“.log”文件和.timeindex等文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。
例如:执行命令新建一个主题,分三个区存放放在三个broker中:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic kaico
一个partition分为多个segment
.log 日志文件
.index 偏移量索引文件
.timeindex 时间戳索引文件
其他文件(partition.metadata,leader-epoch-checkpoint)
Springboot整合kafka
maven依赖
<dependencies>
<!-- springBoot集成kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- SpringBoot整合Web组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
yml配置
# kafka
spring:
kafka:
# kafka服务器地址(可以多个)
# bootstrap-servers: 192.168.212.164:9092,192.168.212.167:9092,192.168.212.168:9092
bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
consumer:
# 指定一个默认的组名
group-id: kafkaGroup1
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 缓存容量
buffer-memory: 524288
# 服务器地址
bootstrap-servers: www.kaicostudy.com:9092,www.kaicostudy.com:9093,www.kaicostudy.com:9094
生产者
@RestController
public class KafkaController {
/**
* 注入kafkaTemplate
*/
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息的方法
*
* @param key
* 推送数据的key
* @param data
* 推送数据的data
*/
private void send(String key, String data) {
// topic 名称 key data 消息数据
kafkaTemplate.send("kaico", key, data);
}
// test 主题 1 my_test 3
@RequestMapping("/kafka")
public String testKafka() {
int iMax = 6;
for (int i = 1; i < iMax; i++) {
send("key" + i, "data" + i);
}
return "success";
}
}
消费者
@Component
public class TopicKaicoConsumer {
/**
* 消费者使用日志打印消息
*/
@KafkaListener(topics = "kaico") //监听的主题
public void receive(ConsumerRecord<?, ?> consumer) {
System.out.println("topic名称:" + consumer.topic() + ",key:" +
consumer.key() + "," +
"分区位置:" + consumer.partition()
+ ", 下标" + consumer.offset());
//输出key对应的value的值
System.out.println(consumer.value());
}
}
来源:https://blog.csdn.net/weixin_44044929/article/details/125067523


猜你喜欢
- 为什么需要UI自动化测试?我有一个观点,对于重复的工作,那么程序都是可以代替的,我想这是作为一个程序员的一个基本素养(能偷懒的绝不干活)。U
- 前言在分布式场景下为了保证数据最终一致性。在单进程的系统中,存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步(
- 本文实例为大家分享了Android自定义view实现TextView方形输入框的具体代码,供大家参考,具体内容如下先奉上最终效果图实现思路分
- 反射提供了封装程序集、模块和类型的对象(Type 类型)。可以使用反射动态创建类型的实例,将类型绑定到现有对象,或从现有对象获取类型并调用其
- C#备忘录设计模式大家好,老胡又和大家见面了。首先承认今天的博客有点标题党了,人生是没有存档,也没有后悔药的。有存档和后悔药的,那是游戏,不
- 最近在做数据挖掘的课程设计,需要将数据分析的结果很直观的展现给用户,这就要用到数据统计图,要实现这个功能就需要几个第三方包了:1. 
- 刚刚接触Java不久,写法上和一些术语还不是很熟悉,如有不对的地方,希望指正。本次学生成绩系统要求能实现成绩录入, * ,绩点计算,退出系
- 代码案例一:private void button1_Click(object sender, EventArgs e) &n
- 本文的控制台项目是根据SuperSocket官方Telnet示例代码进行调试的,官方示例代码:Telnet示例。开始我的第一个Telnet控
- static修饰符是java里面非常常用的一个东西,用法也非常多。然而,在kotlin里竟然没有这个东西!那该如何替代呢?本文就总结了下ja
- 由于一个项目的需要,我研究了一下android的网络通信方式,大体和java平台的很相似! android平台也提供了很多的AP
- 理解hashCode() 的作用是获取哈希码,也称为散列码;它实际上是返回一个int整数。这个哈希码的作用是确定该对象在哈希表中的索引位置。
- 1.什么是线程安全性当多个线程访问某个类时,不管运行时环境采用何种调用方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或
- 本文实例为大家分享了Android实现秒表功能的具体代码,供大家参考,具体内容如下设计完成一个秒表,具备启停功能,正确使用工作线程完成界面刷
- 本文实例讲述了spring AOP的Around增强实现方法。分享给大家供大家参考,具体如下:一 配置<?xml version=&q
- 基本要点1、Lombok作用:在我们的实体类中,我们再也不需要声明get、set、有参无参等方法,统统可以通过Lombok注解来实现同样的功
- 本文介绍了JAVA中实现原生的 socket 通信机制原理,分享给大家,具体如下:当前环境jdk == 1.8知识点socket 的连接处理
- SpringBoot集成Redis 1.添加redis依赖<dependency> <groupId
- 本文实例讲述了Android解析Intent Filter的方法。分享给大家供大家参考。具体分析如下:匿名性质的运行时绑定使得理解Andro
- Redisson是架设在redis基础上的一个Java驻内存数据网格(In-Memory Data Grid)。充分的利用了Redis键值数