Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析
作者:CNBLOG 发布时间:2023-11-05 17:25:41
标签:Java,pulsar,catalog,元数据
简介
通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink
Maven
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
<version>2.7.3</version>
</dependency>
<!-- JAR repositories -->
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
CODE
使用PulsarMetadataReader获取元数据
package com.levi.demo;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Test.
*
* @author levi
* @version 1.0
**/
public class Test {
public static void main(String[] args) {
final ClientConfigurationData configurationData = new ClientConfigurationData();
configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
//Your Pulsar Token
final AuthenticationToken token =
new AuthenticationToken(
"eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");
configurationData.setAuthentication(token);
try (final PulsarMetadataReader reader =
new PulsarMetadataReader("http://127.0.0.1:8443",
configurationData,
"",
new HashMap(),
-1,
-1)) {
//获取namespaces
final List<String> namespaces = reader.listNamespaces();
System.out.println("namespaces: " + namespaces.toString());
for (final String namespace : namespaces) {
//获取Topics
final List<String> topics = reader.getTopics(namespace);
System.out.println("topic: " + topics.toString());
for (String topic : topics) {
//获取字段SchemaInfo
final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
final String name = schemaInfo.getName();
System.out.println("SchemaName:" + name); //topicName
final SchemaType type = schemaInfo.getType();
System.out.println("SchemaType:" + type.toString());// "JSON"...
final Map<String, String> properties = schemaInfo.getProperties();
System.out.println(properties);
final String schemaDefinition = schemaInfo.getSchemaDefinition();
System.out.println(schemaDefinition); // Field info.
}
}
} catch (IOException | PulsarAdminException e) {
e.printStackTrace();
}
}
}
来源:https://www.cnblogs.com/levi125/p/14500436.html


猜你喜欢
- Android 控制WIFI相关操作WIFI的全称是Wireless Fidelity,又称802.11b标准,是一种高速的无线通信协议,传
- 本文实例为大家分享了C语言实现扫雷游戏的具体代码,供大家参考,具体内容如下前言一、游戏规则介绍扫雷是一个十分经典的游戏,一张棋盘中有很多个不
- 一、创建Config配置中心项目1.添加依赖 <dependency> <groupId>org.sp
- 目录引言API介绍1、Optional(),empty(),of(),ofNullable()2、orElse(),orElseGet()和
- 以前我们说过在一些简单的例子中,比如为一个字段赋值或递增该字段,我们需要对线程进行同步,虽然lock可以满足我们的需要,但是一个竞争锁一定会
- 本篇主要总结下Spring容器在初始化实例前后,提供的一些回调方法和可扩展点。利用这些方法和扩展点,可以实现在Spring初始化实例前后做一
- 前篇回顾:Spring源码解析容器初始化构造方法在上一篇文章中,我们介绍完了AnnotationConfigApplicationConte
- 本文实例为大家分享了java实现文件归档和还原的具体代码,供大家参考,具体内容如下基本思路: 文件归档,换句话就是把多个文件的字节
- 做这个功能是因为开发项目的时候,由于后台接口的一些参数的值的长度有要求,不能超过多少个字符,所以在编辑框中输入的字符是要有限制的
- 前言尺子在客户端开发中有一定的应用场景,比如厘米尺、白板的画线尺、视频剪辑的时间尺。一般可以采用用户控件通过自绘的方式实现,但今天我要讲一个
- 前言C#方法中参数类型有4种参数类型,有时候很难记住它们的不同特征,下图对它们做一个总结大家可能在编码中或多或少的使用过out的ref,但是
- 1. 前言今天开始我们来一步步窥探它是如何工作的。我们又该如何驾驭它。本篇将通过 Spring Boot 2.x 来讲解 Spring Se
- 问题背景: 我要在一个表单里同时一次性提交多名乘客的个人信息到SpringMVC,前端HTML和SpringMVC Controller里该
- 一般很多项目不是在springcloud的环境中使用的,但是需要用到分布式配置中心来管理一些外部或者项目的配置,这个时候我们可以使用spri
- FloatingActionButton项目在github上的主页:https://github.com/futuresimple/andr
- 1、AOP基本总结连接点(JoinPoint):连接点是程序运行的某个阶段点,如方法调用、异常抛出等切入点(Pointcut):切入点是Jo
- 一直以来不是怎么清楚自旋锁,最近有点时间,好好的学习了一下;所谓的自旋锁在我的理解就是多个线程在尝试获取锁的时候,其中一个线程获取锁之后,其
- 我们在代码中经常使用using保障非托管资源的释放 static void Main(string[] args){
- 对大部分系统来说都需要权限管理来决定不同用户可以看到哪些内容,那么如何在Spring MVC中实现权限验证呢?当然我们可以继续使用servl
- 本文实例讲述了C#保存listbox中数据到文本文件的方法。分享给大家供大家参考。具体实现方法如下:private void SaveLst