kafka监控获取指定topic的消息总量示例
作者:扎克begod 发布时间:2023-09-04 01:44:48
标签:kafka,topic,消息总量
我就废话不多说了,直接 上代码吧!
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import java.util.*;
import java.util.Map.Entry;
public class KafkaOffsetTools {
public final static String KAFKA_TOPIC_NAME_ADAPTER = "sample";
public final static String KAFKA_TOPIC_NAME_EXCEPTION = "exception";
public final static String KAFKA_TOPIC_NAME_AUDIT = "audit";
private static final String rawTopicTotal = "rawTopicTotalRecordCounter";
private static final String avroTopicTotal = "avroTopicTotalRecordCounter";
private static final String exceptionTopicTotal = "exceptionTopicTotalRecordCounter";
public KafkaOffsetTools() {
}
public static long getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
partition);
Map, PartitionOffsetRequestInfo> requestInfo = new HashMap, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.err.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private TreeMap, PartitionMetadata> findLeader(List a_seedBrokers, String a_topic) {
TreeMap, PartitionMetadata> map = new TreeMap, PartitionMetadata>();
loop:
for (String seed : a_seedBrokers) {
SimpleConsumer consumer = null;
try {
String[] hostAndPort;
hostAndPort = seed.split(":");
consumer = new SimpleConsumer(hostAndPort[0], Integer.valueOf(hostAndPort[1]), 100000, 64 * 1024,
"leaderLookup" + new Date().getTime());
List topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
map.put(part.partitionId(), part);
}
}
} catch (Exception e) {
System.out.println("Error communicating with Broker [" + seed
+ "] to find Leader for [" + a_topic + ", ] Reason: " + e);
} finally {
if (consumer != null)
consumer.close();
}
}
return map;
}
public static void main(String[] args) {
String kafkaBrokerList = System.getenv("metadata.broker.list");
if(kafkaBrokerList == null || kafkaBrokerList.length() == 0){
System.err.println("No config kafka metadata.broker.list,it is null .");
//for test
kafkaBrokerList = "localhost:9092,localhost:9093";
System.err.println("Use this broker list for test,metadata.broker.list="+kafkaBrokerList);
}
//init topic,logSize = 0
Map,Integer> topics = new HashMap,Integer>();
topics.put(KAFKA_TOPIC_NAME_ADAPTER,0);
topics.put(KAFKA_TOPIC_NAME_EXCEPTION,0);
topics.put(KAFKA_TOPIC_NAME_AUDIT,0);
//init kafka broker list
String[] kafkaHosts;
kafkaHosts = kafkaBrokerList.split(",");
if (kafkaHosts == null || kafkaHosts.length == 0) {
System.err.println("No config kafka metadata.broker.list,it is null .");
System.exit(1);
}
List seeds = new ArrayList();
for (int i = 0; i < kafkaHosts.length; i++) {
seeds.add(kafkaHosts[i]);
}
KafkaOffsetTools kot = new KafkaOffsetTools();
for(String topicName : topics.keySet()){
TreeMap, PartitionMetadata> metadatas = kot.findLeader(seeds, topicName);
int logSize = 0;
for (Entry, PartitionMetadata> entry : metadatas.entrySet()) {
int partition = entry.getKey();
String leadBroker = entry.getValue().leader().host();
String clientName = "Client_" + topicName + "_" + partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, entry.getValue().leader().port(), 100000,
64 * 1024, clientName);
long readOffset = getLastOffset(consumer, topicName, partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
logSize += readOffset;
if (consumer != null) consumer.close();
}
topics.put(topicName,logSize);
}
System.out.println(topics.toString());
System.out.println(rawTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_ADAPTER)+" "+System.currentTimeMillis());
System.out.println(avroTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_AUDIT)+" "+System.currentTimeMillis());
System.out.println(exceptionTopicTotal+"="+topics.get(KAFKA_TOPIC_NAME_EXCEPTION)+" "+System.currentTimeMillis());
}
}
来源:https://blog.csdn.net/aa5305123/article/details/88255435


猜你喜欢
- 在一些网站上,特别是小说网站经常我们会看到这个功能,就是自动滚动屏幕的功能,方便了大家阅读文章,增强了用户体验。下面的javascript代
- 本文实例展示了Python统计列表中的重复项出现的次数的方法,是一个很实用的功能,适合Python初学者学习借鉴。具体方法如下:对一个列表,
- 1.最大值max(3,4) ##运行结果为42.最小值min(3,4) ##运行结果为33.求和sum(range
- 需求当需要同时ping/telnet多个ip时,可以通过引入ping包/telnet包实现,也可以通过go调用cmd命令实现,不过后者调用效
- 首先是对一元函数求积分,使用Scipy下的integrate函数:from scipy import integratedef g(x):
- Python曾经对我说:"时日不多,赶紧用Python"。于是看到了一个基于python的微信开源库:itchat,玩了
- 作为互联网产品设计师,在和前端开发人员沟通时你是否常常会听到这样的声音: —— “大姐,给点专业精神好不好,这个表格是自适应的,你
- 如下图,我们在做图片logo列表的时候通常是用li标签来实现。html:<ul class="logolist&q
- 这两个方法都可以用来在固定的时间段后去执行一段javascirpt代码,不过两者各有各的应用场景。实际上,setTimeout和setInt
- 1.数据是什么?在 Python 以及其他所有面向对象编程语言中,类都是对数据的构成(状态)以及数据 能做什么(行为)的描述。由于类的使用者
- 背景对接多个外部接口,需要保存请求参数以及返回参数,方便消息的补偿,因为多个外部接口,多个接口字段都不统一,整体使用一个大字段(longte
- SQL Server数据库查询速度慢的原因有很多,常见的有以下几种:1、没有索引或者没有用到索引(这是查询慢最常见的问题,是程序设计的缺陷)
- 在 pandas 中提供了利用映射关系来实现某些操作的函数,具体如下:replace() 函数:替换元素;map() 函数:新建一列;ren
- 本文实例讲述了Python运算符重载用法。分享给大家供大家参考。具体分析如下:python中,我们在定义类的时候,可以通过实现一些函数来实现
- 阿里云服务器的带宽为2M,网站每日的备份包都3G多了,离线下载太费时间了,打算每日将备份包自动上传到自己的百度云盘里。 1、先安装
- Mcrypt扩展库可以实现加密解密功能,就是既能将明文加密,也可以密文还原。1.PHP加密扩展库Mcrypt安装在标准的PHP安装过程中并没
- 变量当把一个值赋给一个名字时,它就会存储在内存中,我们把这块内存称为变量(variable)。在大多数语言中,都把这种行为称为"给
- Python中的type()函数和isinstance()函数是两个常用的类型判断函数,它们可以用来判断变量的类型,接下来让我们一起来看一下
- 一、mariadb与mysql简介1、mariadb简介MariaDB由MySQL的创始人Michael Widenius(英语:Micha
- 一般情况下TextArea区输入的文字数量是没有限制的,但是我们可以通过javascript限制表单的文字字数。如下javascript代码