Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析
作者:CNBLOG 发布时间:2023-11-05 17:25:41
标签:Java,pulsar,catalog,元数据
简介
通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink
Maven
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
<version>2.7.3</version>
</dependency>
<!-- JAR repositories -->
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
CODE
使用PulsarMetadataReader获取元数据
package com.levi.demo;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Test.
*
* @author levi
* @version 1.0
**/
public class Test {
public static void main(String[] args) {
final ClientConfigurationData configurationData = new ClientConfigurationData();
configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
//Your Pulsar Token
final AuthenticationToken token =
new AuthenticationToken(
"eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");
configurationData.setAuthentication(token);
try (final PulsarMetadataReader reader =
new PulsarMetadataReader("http://127.0.0.1:8443",
configurationData,
"",
new HashMap(),
-1,
-1)) {
//获取namespaces
final List<String> namespaces = reader.listNamespaces();
System.out.println("namespaces: " + namespaces.toString());
for (final String namespace : namespaces) {
//获取Topics
final List<String> topics = reader.getTopics(namespace);
System.out.println("topic: " + topics.toString());
for (String topic : topics) {
//获取字段SchemaInfo
final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
final String name = schemaInfo.getName();
System.out.println("SchemaName:" + name); //topicName
final SchemaType type = schemaInfo.getType();
System.out.println("SchemaType:" + type.toString());// "JSON"...
final Map<String, String> properties = schemaInfo.getProperties();
System.out.println(properties);
final String schemaDefinition = schemaInfo.getSchemaDefinition();
System.out.println(schemaDefinition); // Field info.
}
}
} catch (IOException | PulsarAdminException e) {
e.printStackTrace();
}
}
}
来源:https://www.cnblogs.com/levi125/p/14500436.html
0
投稿
猜你喜欢
- (注意:本文基于JDK1.8) 前言包括迭代器中的remove()方法,以及删除单个元素、删除多个元素、删除所有元素、删除不包含的
- 本文实例总结了java判断字符串是否为数字的方法。分享给大家供大家参考,具体如下:方法一:用JAVA自带的函数public static b
- 下载和上传附件、发送短信和发送邮件,都算是程序中很常用的功能,之前记录了文件的上传和下载还有发送短信,由于最近比较忙,邮件发送的功能就没有时
- 需求描述:企业开发过程中,经常需要将一些静态文本数据放到Resources目录下,项目启动时或者程序运行
- 代码如下import java.util.concurrent.Callable;import java.util.concurrent.E
- 注解的介绍@ControllerAdvice@ControllerAdvice注解是Spring3.2中新增的注解,学名是Controlle
- 复合语句Java的复合语句是以整个区块为单位的语句,由{}以及{}内包含的内容组成对于复合语句来说,复合语句创建了一个局部变量的作用域,该作
- 一、堆排序1、什么是堆排序(1)堆排序:堆排序(Heapsort)是指利用堆这种数据结构所设计的一种排序算法。堆积是一个近似完全二叉树的结构
- Gradle修改默认的Build配置文件名Gradle默认使用build.gradle作为默认的配置文件文件名。如果我们在build.gra
- 网上很多资料在描述Java内存模型的时候,都会介绍有一个主存,然后每个工作线程有自己的工作内存。数据在主存中会有一份,在工作内存中也有一份。
- 绝对路径:不可改变的路径本地绝对路径:增加盘符的路径(e:/test/test.html)网络绝对路径:增加协议,IP地址,端口号的路径(h
- (一) collection和collections这两者均位于java.util包下,不同的是:collection是一个集合接口,有Li
- 本文实例讲述了Java实现批量向mysql写入数据的方法。分享给大家供大家参考,具体如下:private static String use
- 关于UIToolbarToolBar工具栏是视图View的属性,可以在工具栏上添加工具栏按钮Bar Button Item(可以是自定义的C
- 1.打开官网稍微学习一下,了解一下spring cloud是个什么东西,大概有哪些组件等https://spring.io/projects
- 最近研究了一下如何在Android上实现CoverFlow效果的控件,其实早在2010年,就有Neil Davies开发并开源出了这个控件,
- 之前我们在使用vue进行 h5 表单录入的过程中,遇到了Android软键盘弹出,覆盖 h5页面 输入框 问题,在此进行回顾并分享给大家:系
- mybatis 传入null值解决前端传入两个值,如果其中一个为null时,很多时候我们都很困惑,明明传入的是null,为啥mybatis
- 用法在java中经常会遇到需要对数据进行类型转换的场景,String类型的数据转为Int类型属于比较常见的场景,主要有两种转换方法:1. 使
- Handler是什么?Handler 是一个可以实现多线程间切换的类,通过 Handler 可以轻松地将一个任务切换到 Handler 所在