软件编程
位置:首页>> 软件编程>> java编程>> Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析

Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析

作者:CNBLOG  发布时间:2023-11-05 17:25:41 

标签:Java,pulsar,catalog,元数据

简介

通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink

Maven


<dependency>
  <groupId>io.streamnative.connectors</groupId>
  <artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
  <version>2.7.3</version>
</dependency>

<!-- JAR repositories -->
  <repositories>
       <repository>
           <id>central</id>
           <layout>default</layout>
           <url>https://repo1.maven.org/maven2</url>
       </repository>
       <repository>
           <id>bintray-streamnative-maven</id>
           <name>bintray</name>
           <url>https://dl.bintray.com/streamnative/maven</url>
       </repository>
   </repositories>

CODE

使用PulsarMetadataReader获取元数据


package com.levi.demo;

import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Test.
*
* @author levi
* @version 1.0
**/
public class Test {

public static void main(String[] args)  {
       final ClientConfigurationData configurationData = new ClientConfigurationData();
       configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
       //Your Pulsar Token
       final AuthenticationToken token =
               new AuthenticationToken(
                       "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");
       configurationData.setAuthentication(token);

try (final PulsarMetadataReader reader =
                    new PulsarMetadataReader("http://127.0.0.1:8443",
                            configurationData,
                            "",
                            new HashMap(),
                            -1,
                            -1)) {
           //获取namespaces
           final List<String> namespaces = reader.listNamespaces();
           System.out.println("namespaces: " + namespaces.toString());

for (final String namespace : namespaces) {
               //获取Topics
               final List<String> topics = reader.getTopics(namespace);
               System.out.println("topic: " + topics.toString());

for (String topic : topics) {
                   //获取字段SchemaInfo
                   final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
                   final String name = schemaInfo.getName();
                   System.out.println("SchemaName:" + name); //topicName
                   final SchemaType type = schemaInfo.getType();
                   System.out.println("SchemaType:" + type.toString());// "JSON"...
                   final Map<String, String> properties = schemaInfo.getProperties();
                   System.out.println(properties);
                   final String schemaDefinition = schemaInfo.getSchemaDefinition();
                   System.out.println(schemaDefinition); // Field info.
               }
           }

} catch (IOException | PulsarAdminException e) {
           e.printStackTrace();
       }

}

}

来源:https://www.cnblogs.com/levi125/p/14500436.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com