Go语言kafka生产消费消息实例搬砖
作者:爱吃红薯粉 发布时间:2024-06-07 16:06:58
标签:Go,kafka,生产消费
kafka go库
kafka go客户端官方目前没有提供,但在github有2个非常流行的库
星星较多,网上案例也多 https://github.com/Shopify/sarama
confluent官网提供的库 https://github.com/confluentinc/confluent-kafka-go
这里使用sarama,因为星星多,案例多,方便快速上手
注意
如果kafka版本在2.2以下,需要在go.mod里面将sarama版本改为github.com/Shopify/sarama v1.24.1
这是因为sarama只提供最新2个版本+2个月的兼容保证,所以使用低版本kafka是需要避坑
使用非集群集群生产者时,需要自行去创建topic,如果使用集群生产者,集群会自动创建
例子
package main
import (
"fmt"
"github.com/Shopify/sarama"
cluster "github.com/bsm/sarama-cluster"
"time"
)
var (
Consumer *cluster.Consumer
producer sarama.SyncProducer
brokers = []string{"ip1:9092","ip2:9092","ip3:9092"}
topic = "testGo"
groupId = "testGo_test1"
)
func initProducer() {
var err error
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForLocal
config.Producer.Retry.Max = 3
config.Producer.Return.Successes = true
brokers := brokers
producer, err = sarama.NewSyncProducer(brokers,config)
if err != nil {
fmt.Printf("生产者初始化失败 -> %v \n", err)
panic(err)
}
fmt.Println("生产者初始化成功。")
}
func initConsumer() {
var err error
config := cluster.NewConfig()
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Offsets.Initial = sarama.OffsetNewest
Consumer, err = cluster.NewConsumer(brokers,groupId,[]string{topic},config)
if err != nil {
fmt.Printf("消费者初始化失败 -> %v \n", err)
panic(err.Error())
}
if Consumer == nil {
panic(fmt.Sprintf("消费者为空. kafka info -> {brokers:%v, topic: %v, group: %v}", brokers, topic, groupId))
}
fmt.Printf("消费者初始化成功, consumer -> %v, topic -> %v, ", Consumer, topic)
}
func main() {
initProducer()
initConsumer()
//生产消息
for i := 1;i < 100; i ++ {
pid, offset, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder(i),
Value: sarama.ByteEncoder("this is test message."),
})
if err != nil {
fmt.Println("发送消息失败, err:", err)
return
}
fmt.Printf("offset: %v\n", offset)
}
time.Sleep(2 * time.Second)
//消费消息
for {
select {
case msg, ok := <-Consumer.Messages():
if ok {
fmt.Printf("kafka msg: %s \n", msg.Value)
}
}
}
}
运行结果如下
该demo流程如下
引入单机sarama库和集群sarama库
定义连接变量
使用单sarama库实例化一个生产者
使用集群sarama库实例化一个消费者
循环100次发送100条消息
使用sarama自带的生产消息构造器设置消息内容
使用for让进程一直监听来自kafka的消息
来源:https://juejin.cn/post/7033565443084255263


猜你喜欢
- 通过优化CSS代码,减小对系统资源的占用。自己整理出几个能减少系统资源占用的CSS写法,要优化网站的页面加载速度,这些注意点不能忽视!一、尽
- 安装 Golang在 http://golang.org/dl/ 可以下载到 Golang。安装文档:http://golang.org/d
- 线程间通信方法 1. 通信方法线程间使用全局变量进行通信 2. 共享
- 问题描述我是debain 系的linux系统没遇到这个问题,在centos系统遇到的Collecting dlib Downlo
- Master Master数据库保存有放在SQLSERVER实体上的所有数据库,它还是将引擎固定起来的粘合剂。由于如果不使用主数据库,SQL
- MySQL好像从5.0.2版本就开始支持触发器的功能了,本次博客就来介绍一下触发器,首先还是谈下概念性的东西吧:什么是触发器触发器是与表有关
- 学习python字符串方法的使用,对书中列举的每种方法都做一个试用,将结果记录,方便以后查询。(1) s.capitalize() ;功能:
- 接触php那么久,但是安装环境却很生疏,遇到了很多问题,借着百度,整理了些下面的方法问题一:mysql服务没有安装解决办法:在cmd操作下找
- 本文实例为大家分享了python sort、sort_index的具体代码,供大家参考,具体内容如下对Series进行排序#生成序列objo
- 导语哈喽!大家好,我是栗子,感谢大家的支持!新的一天,新气象,程序猿们的日常开始敲敲敲,改改改——今
- 在网上查了一下,在网上收集了Java与JavaScript中使用的两个例子,试验过,分享下。1、Java:package org.bearf
- 自定义指令directives及常用钩子函数说明除了核心功能默认内置的指令 (v-model 和 v-show),Vue 也允许注册自定义指
- 简单来说python的内存管理机制有三种1)引用计数2)垃圾回收3)内存池接下来我们来详细讲解这三种管理机制1,引用计数:引用计
- 功能描述目标完成多账号微信小程序每天自动签到输出签到成功则向微信群发送签到成功的信息否则提示用户签到失败,需手动签到包管理requestsi
- 迭代器什么是迭代迭代是重复反馈过程的活动,其目的通常是为了逼近所需目标或结果。每一次对过程的重复称为一次“迭代&
- 本文实例讲述了php打包压缩文件之ZipArchive方法用法。分享给大家供大家参考,具体如下:前面说到了php打包压缩文件之PclZip方
- 这学期在学python,感觉想写一个东西来巩固自己的基础,因为大二的时候我看过python,所以还是一共花了几个小时写了一个基于mysql的
- 实际开发过程中,我们经常会被各种宽度,高度计算搞晕。尤其是使用了rem的计算方式,自适应布局难倒一大片程序员。为了解决这类问题,我觉得可以利
- 有两种方式:一种是图片放在static中,一种是图片放在media中第一种:即:STATIC_URL = '/static/'
- 本文实例讲述了python实现根据窗口标题调用窗口的方法。分享给大家供大家参考。具体分析如下:当你知道一个windows窗口的标题后,可以用