Golang操作Kafka的实现示例
作者:YUHAOHAO 发布时间:2024-05-22 10:18:48
标签:Golang,操作,Kafka
一.使用库说明
Golang中连接kafka可以使用第三方库:github.com/Shopify/sarama
二.Kafka Producer发送消息
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 发送完数据需要leader和follower都确认
config.Producer.Partitioner = sarama.NewRandomPartitioner //写到随机分区中,我们默认设置32个分区
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 构造一个消息
msg := &sarama.ProducerMessage{}
msg.Topic = "task"
msg.Value = sarama.StringEncoder("producer kafka messages...")
// 连接kafka
client, err := sarama.NewSyncProducer([]string{"192.20.216.8:9092"}, config)
if err != nil {
fmt.Println("Producer closed, err:", err)
return
}
defer client.Close()
// 发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send msg failed, err:", err)
return
}
fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
三.Kafka Consumer消费消息
package main
import (
"fmt"
"github.com/Shopify/sarama"
"sync"
)
func main() {
var wg sync.WaitGroup
consumer, err := sarama.NewConsumer([]string{"192.20.216.8:9092"}, nil)
if err != nil {
fmt.Println("Failed to start consumer: %s", err)
return
}
partitionList, err := consumer.Partitions("task-status-data") // 通过topic获取到所有的分区
if err != nil {
fmt.Println("Failed to get the list of partition: ", err)
return
}
fmt.Println(partitionList)
for partition := range partitionList{ // 遍历所有的分区
pc, err := consumer.ConsumePartition("task", int32(partition), sarama.OffsetNewest) // 针对每个分区创建一个分区消费者
if err != nil {
fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)
}
wg.Add(1)
go func(sarama.PartitionConsumer) { // 为每个分区开一个go协程取值
for msg := range pc.Messages() { // 阻塞直到有值发送过来,然后再继续等待
fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
defer pc.AsyncClose()
wg.Done()
}(pc)
}
wg.Wait()
consumer.Close()
}
来源:https://www.cnblogs.com/yuhaohao/p/15439034.html


猜你喜欢
- 引言最近在学习python,先说一下我学Python得原因,一个是因为它足够好用,完成同样的功能,代码量会比其他语言少很多,有大量的丰富的库
- 说明本例子利用TensorFlow搭建一个全连接神经网络,实现对MNIST手写数字的识别。先上代码from tensorflow.examp
- asin()方法返回x的反正弦,以弧度表示。语法以下是asin()方法语法:asin(x)注意:此函数是无法直接访问的,所以我们
- Go素数筛选分析1. 素数筛选介绍学习Go语言的过程中,遇到素数筛选的问题。这是一个经典的并发编程问题,是某大佬的代码,短短几行代码就实现了
- 数据采集我们上一篇介绍了,如何采集王者最低战力,本文就来给大家介绍如何采集王者皮肤,买不起皮肤,当个桌面壁纸挺好的。下面,我和大家介绍如何获
- 本文实例为大家分享了python实现转圈打印矩阵的具体代码,供大家参考,具体内容如下#! conding:utf-8__author__ =
- 代码如下:<% FunctIon DownloadFIle(StrFIle) StrFIlename=StrFIle Response
- 前言最近使用PyTorch感觉妙不可言,有种当初使用Keras的快感,而且速度还不慢。各种设计直接简洁,方便研究,比tensorflow的臃
- Enum 是个类所以基本的类操作都可以用也就是我们可以添加自己的方法class Mood(Enum): FUNKY
- 学校有一、二、三。。。。至十班。假设每个班上有30名学生。张、李、刘、苏等现有这样的表 student ,字段 class 及name 。其
- 前言前面几个章节我们学习了对于普通文件的操作,比如说文件的创建、复制粘贴、裁剪粘贴、文件名的重命名、删除等等。另外还学习了一些基本练习,如何
- 然后给脚本文件运行权限,方法(1)chmod +x ./*.py方法(2)chmod 755 ./*.py (777也无所谓啦)这个命令不去
- python是支持多线程的,主要是通过thread和threading这两个模块来实现的。thread模块是比较底层的模块,threadin
- pLSA(probabilistic Latent Semantic Analysis),概率潜在语义分析模型,是1999年Hoffman提
- 一、“无”的哲学佛家讲究“因果报应”,有果必有应。此段看似与主题没有血缘关系,实际讲的是“因”。我个人比较喜欢老子的道家思想,并喜欢以其思想
- 本篇文章博主将带大家一起学习MySQL中常用的数据查询语言。DQL(Data Query Language 数据查询语言)SELECT 语法
- 昨晚在往MySQL数据库中插入一组数据时,出错了!数据库无情了给我报了个错误:ERROR 1365(22012):Division by 0
- 有时候想同时(同级)展示多个视图,而不是嵌套展示,例如创建一个布局,有 sidebar(侧导航) 和 main(主内容) 两个视图,这个时候
- 5. 其他杂项 5.1 生成图像PHP可以操作处理图像。如果你已经安装了GD库,你甚至可以利用PHP生成图像。
- 1.找到python的安装路径: 如果忘记可以在Pycharm运行如下代码:import