(starters)springboot-starter整合阿里云datahub方式
作者:Cry丶 发布时间:2023-03-12 06:28:16
DataHub 类似于传统大数据解决方案中 Kafka 的角色,提供了一个数据队列功能。
DataHub 除了供了一个缓冲的队列作用。同时由于 DataHub 提供了各种与其他阿里云
上下游产品的对接功能,所以 DataHub 又扮演了一个数据的分发枢纽工作。
datahub提供了开发者生产和消费的sdk,在平时的开发中往往会写很多重复的代码,我们可以利用springboot为我们提供的自定义starter的方式,模仿springboot官方的starter组件实现方式,来封装一个更高效简单易用的starter组件,实现开箱即用。
本文仅提供核心思路实现供学习使用,应根据自己所在公司开发习惯做定制开发
1. 功能介绍
1.无需关心DataHub底层如何操作,安心编写业务代码即可进行数据的获取和上传,
2.类似RabbitMQ的starter,通过注解方式,Listener和Handler方式进行队列消费
3.支持游标的上次记忆功能
<dependency>
<artifactId>cry-starters-projects</artifactId>
<groupId>cn.com.cry.starters</groupId>
<version>2022-1.0.0</version>
</dependency>
2.快速开始
2.1 启动客户端
配置阿里云DataHub的endpoint以及AK信息
aliyun:
datahub:
# 开启功能
havingValue: true
#是否为私有云
isPrivate: false
accessId: xxx
accessKey: xxx
endpoint: xxx
#连接DataHub客户端超时时间
conn-timeout: 10000
启动SpringBoot,你会发现datahub客户端已经启动完毕
2.2 获取DataHub客户端
DatahubClient datahubClient=DataHubTemplate.getDataHubClient();
2.3 写数据
public int write(@RequestParam("id") Integer shardId) {
List<Student> datas = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Student s = new Student();
s.setAge(i);
s.setName("name-" + i);
s.setAddress("address-" + i);
datas.add(s);
}
int successNumbers = DataHubTemplate.write("my_test", "student", datas, shardId);
return successNumbers;
}
以上示例代码表示往 projectName为my_test, topicName为student, shardId 为N的hub里写数据,并且返回插入成功的条数
2.4 读数据
读数据开发的逻辑类似RabbitMq的starter,使用@DataHubListener和@DataHubHandler处理器注解进行使用
@Component
@DataHubListener(projectName = "my_test")
public class ReadServiceImpl { @DataHubHandler(topicName = "student", shardId = 0, cursorType = CursorTypeWrapper.LATEST)
public void handler(Message message) {
System.out.println("读取到shardId=0的消息");
System.out.println(message.getData());
System.out.println(message.getCreateTsime());
System.out.println(message.getSize());
System.out.println(message.getConfig());
System.out.println(message.getMessageId());
}
}
以上代码说明: 通过LATEST游标的方式,监听 project=my_test ,topicName=student,shardId=0 ,最终通过Message的包装类拿到dataHub实时写入的数据。
这边可以设置多种游标类型,例如根据最新的系统时间、最早录入的序号等
3. 核心代码
首先需要一个DataHubClient增强类,在SpringBoot启动时开启一个线程来监听对应的project-topic-shardingId,根据游标规则来读取当前的cursor进行数据的读取。
public class DataHubClientWrapper implements InitializingBean, DisposableBean { @Autowired
private AliyunAccountProperties properties; @Autowired
private ApplicationContext context; private DatahubClient datahubClient;
public DataHubClientWrapper() { } /**
* 执行销毁方法
*
* @throws Exception
*/
@Override
public void destroy() throws Exception {
WorkerResourceExecutor.shutdown();
} @Override
public void afterPropertiesSet() throws Exception { /**
* 创建DataHubClient
*/
this.datahubClient = DataHubClientFactory.create(properties); /**
* 打印Banner
*/
BannerUtil.printBanner(); /**
* 赋值Template的静态对象dataHubClient
*/
DataHubTemplate.setDataHubClient(datahubClient); /**
* 初始化Worker线程
*/
WorkerResourceExecutor.initWorkerResource(context);
/**
* 启动Worker线程
*/
WorkerResourceExecutor.start();
}
}
写数据,构建了一个类似RedisDataTemplate的模板类,封装了write的逻辑,调用时只需要用DataHubTemplate.write调用
public class DataHubTemplate { private static DatahubClient dataHubClient; private final static Logger logger = LoggerFactory.getLogger(DataHubTemplate.class); /**
* 默认不开启重试机制
*
* @param projectName
* @param topicName
* @param datas
* @param shardId
* @return
*/
public static int write(String projectName, String topicName, List<?> datas, Integer shardId) {
return write(projectName, topicName, datas, shardId, false);
} /**
* 往指定的projectName以及topic和shard下面写数据
*
* @param projectName
* @param topicName
* @param datas
* @param shardId
* @param retry
* @return
*/
private static int write(String projectName, String topicName, List<?> datas, Integer shardId, boolean retry) {
RecordSchema recordSchema = dataHubClient.getTopic(projectName, topicName).getRecordSchema();
List<RecordEntry> recordEntries = new ArrayList<>();
for (Object o : datas) {
RecordEntry entry = new RecordEntry();
Map<String, Object> data = BeanUtil.beanToMap(o);
TupleRecordData tupleRecordData = new TupleRecordData(recordSchema);
for (String key : data.keySet()) {
tupleRecordData.setField(key, data.get(key));
}
entry.setRecordData(tupleRecordData);
entry.setShardId(String.valueOf(shardId));
recordEntries.add(entry);
}
PutRecordsResult result = dataHubClient.putRecords(projectName, topicName, recordEntries);
int failedRecordCount = result.getFailedRecordCount();
if (failedRecordCount > 0 && retry) {
retry(dataHubClient, result.getFailedRecords(), 1, projectName, topicName);
}
return datas.size() - failedRecordCount;
} /**
* @param client
* @param records
* @param retryTimes
* @param project
* @param topic
*/
private static void retry(DatahubClient client, List<RecordEntry> records, int retryTimes, String project, String topic) {
boolean suc = false;
List<RecordEntry> failedRecords = records;
while (retryTimes != 0) {
logger.info("the time to send message has [{}] records failed, is starting retry", records.size());
retryTimes = retryTimes - 1;
PutRecordsResult result = client.putRecords(project, topic, failedRecords);
int failedNum = result.getFailedRecordCount();
if (failedNum > 0) {
failedRecords = result.getFailedRecords();
continue;
}
suc = true;
break;
}
if (!suc) {
logger.error("DataHub send message retry failure");
}
} public static DatahubClient getDataHubClient() {
return dataHubClient;
} public static void setDataHubClient(DatahubClient dataHubClient) {
DataHubTemplate.dataHubClient = dataHubClient;
}
}
读数据,需要在Spring启动时开启一个监听线程DataListenerWorkerThread,执行一个死循环不停轮询DataHub下的对应通道。
public class DataListenerWorkerThread extends Thread {
private final static Logger logger = LoggerFactory.getLogger(DataListenerWorkerThread.class);
private volatile boolean init = false;
private DatahubConfig config;
private String workerKey;
private int recordLimits;
private int sleep;
private RecordSchema recordSchema;
private RecordHandler recordHandler;
private CursorHandler cursorHandler; public DataListenerWorkerThread(String projectName, String topicName, int shardId, CursorTypeWrapper cursorType, int recordLimits, int sleep, int sequenceOffset, String startTime, StringRedisTemplate redisTemplate) {
this.config = new DatahubConfig(projectName, topicName, shardId);
this.workerKey = projectName + "-" + topicName + "-" + shardId;
this.cursorHandler = new CursorHandler(cursorType, sequenceOffset, startTime, redisTemplate, workerKey);
this.recordLimits = recordLimits;
this.sleep = sleep;
this.setName("DataHub-Worker");
this.setDaemon(true);
} @Override
public void run() {
initRecordSchema();
String cursor = cursorHandler.positioningCursor(config);
for (; ; ) {
try {
GetRecordsResult result = DataHubTemplate.getDataHubClient().getRecords(config.getProjectName(), config.getTopicName(), String.valueOf(config.getShardId()), recordSchema, cursor, recordLimits);
if (result.getRecordCount() <= 0) {
// 无数据,sleep后读取
Thread.sleep(sleep);
continue;
}
List<Map<String, Object>> dataMap = recordHandler.convert2List(result.getRecords());
logger.info("receive [{}] records from project:[{}] topic:[{}] shard:[{}]", dataMap.size(), config.getProjectName(), config.getTopicName(), config.getShardId());
// 拿到下一个游标
cursor = cursorHandler.nextCursor(result);
//执行方法
WorkerResourceExecutor.invokeMethod(workerKey, JsonUtils.toJson(dataMap), dataMap.size(), config, cursor);
} catch (InvalidParameterException ex) {
//非法游标或游标已过期,建议重新定位后开始消费
cursor = cursorHandler.resetCursor(config);
logger.error("get Cursor error and reset cursor localtion ,errorMessage:{}", ex.getErrorMessage());
} catch (DatahubClientException e) {
logger.error("DataHubException:{}", e.getErrorMessage());
this.interrupt();
} catch (InterruptedException e) {
logger.info("daemon thread {}-{} interrupted", this.getName(), this.getId());
} catch (Exception e) {
this.interrupt();
logger.error("receive DataHub records cry.exception:{}", e, e);
}
}
} /**
* 终止
*/
public void shutdown() {
if (!interrupted()) {
interrupt();
}
} /**
* 初始化topic字段以及recordSchema
*/
private void initRecordSchema() {
try {
if (!init) {
recordSchema = DataHubTemplate.getDataHubClient().getTopic(config.getProjectName(), config.getTopicName()).getRecordSchema();
List<Field> fields = recordSchema.getFields();
this.recordHandler = new RecordHandler(fields);
init = true;
}
} catch (Exception e) {
logger.error("initRecordSchema error:{}", e, e);
}
}
}
read的时候结合了注解开发,通过定义类注解DataHubListener和方法注解DataHubHandler内置属性,来动态的控制需要在哪些方法中处理监听到的数据的逻辑:
DataHubHandler
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubHandler {
/**
* 话题名称
*
* @return
*/
String topicName(); /**
* shardId
*
* @return
*/
int shardId(); /**
* 最大数据量限制
*
* @return
*/
int recordLimit() default 1000; /**
* 游标类型
*
* @return
*/
CursorTypeWrapper cursorType() default CursorTypeWrapper.LATEST; /**
* 若未监听到数据添加,休眠时间 ms
*
* @return
*/
int sleep() default 10000; /**
* 使用CursorType.SYSTEM_TIME的时候配置 时间偏移量
*
* @return
*/
String startTime() default ""; /**
* 使用使用CursorType.SEQUENCE的时候配置,偏移量,必须是正整数
*
* @return
*/
int sequenceOffset() default 0;
}
DataHubListener
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataHubListener {
String projectName();
}
最后我们需要启动SpringBootStarter的EnableConfigurationProperties 功能,通过配置文件来控制default-bean的开启或者关闭。
启动类:
@Configuration
@EnableConfigurationProperties(value = {AliyunAccountProperties.class})
public class DataHubClientAutoConfiguration {
/**
* 初始化dataHub装饰bean
*
* @return
*/
@Bean
public DataHubClientWrapper dataHubWrapper() {
return new DataHubClientWrapper();
}
}
属性配置类
@ConditionalOnProperty(prefix = "aliyun.datahub",havingValue = "true")
@Data
public class AliyunAccountProperties implements Properties{ /**
* http://xxx.aliyuncs.com
*/
private String endpoint; /**
* account
*/
private String accessId; /**
* password
*/
private String accessKey; /**
* private cloud || public cloud
*/
private boolean isPrivate; /**
* unit: ms
*/
private Integer connTimeout = 10000;
}
最后记得要做成一个starter,在resources下新建一个META-INF文件夹,新建一个spring.factories文件,
org.springframework.boot.autoconfigure.EnableAutoConfiguration= \
cry.starter.datahub.DataHubClientAutoConfiguration
大体逻辑就是这样了,你学会了吗? hhhhhhhhh~
来源:https://crycrycry.blog.csdn.net/article/details/123970600


猜你喜欢
- 什么是代理代理就是给目标对象一个代理对象,并由代理对象控制目标的引用。为什么要使用代理模式1、通过引入代理对象的方式,可以间接的访问目标对象
- 使用 DateFormat 格式化日期、时间DateFormat 也是一个抽象类,它也提供了如下几个类方法用于获取 DateFormat 对
- 本文实例讲述了Android判断网络类型的方法。分享给大家供大家参考,具体如下:判断网络类型是wifi,还是3G,还是2G网络,对不同的网络
- 本文实例为大家分享了JSON处理工具类的具体代码,供大家参考,具体内容如下import java.io.IOException; impor
- 其实这个比较简单,子线程怎么通知主线程,就是让子线程做完了自己的事儿就去干主线程的转回去干主线程的事儿。那么怎么让子线程去做主线程的事儿呢,
- 本文实例讲述了C#使用委托实现的快速排序算法。分享给大家供大家参考。具体如下:class QuickSort { private dele
- IDEA 新手使用手册1 简介IDEA的全称是IntelliJ IDEA,这是一个java编程语言开发的集成环境。IDEA的每一个方面都是为
- 思路分析:要逆序遍历某个列表,首先要获得一个ListIterator对象,利用for()循环,以ListIterator类的hasNext(
- 本文实例为大家分享了Android自定义弹框Dialog效果的具体代码,供大家参考,具体内容如下1.dialog_delete.xml<
- 定时/计划功能主要使用的就是Timer对象,它在内部还是使用多线程的方式进行处理,所以它和线程技术还是有非常大的关联。Timer类主要作用就
- 工厂方法模式(Factory Method):定义一个用于创建对象的接口,让子类决定实例化哪一个类。工厂方法使一个类的实例化延迟到其子类。工
- 苹果上的UI基本上都是这个效果,然而Android机上的顶部状态栏总是和app的主题颜色不搭。还好如今的api19以上的版本,我们也能做出这
- 首先备注一下JAR(Java Archive,Java 归档文件)是与平台无关的文件格式,它允许将许多文件组合成一个压缩文件。为 J2EE
- 为了让我提供的通用 Mapper 的 boot-starter 同时兼容 Spring Boot 1.x 和 2.x,增加了这么一个工具类。
- 编程是一门艺术,大批量的改动显然是非常丑陋的做法,用心的琢磨写的代码让它变的更美观。在现实生活中,存在很多“部分-整体&
- 本文实例为大家分享了Android实现图片加载进度提示的具体代码,供大家参考,具体内容如下先上图:实现原理:第一个控件的实现原理是重写Ima
- 一、文件上传的原理分析1、文件上传的必要前提a、表单的method必须是postb、表单的enctype属性必须是multipart/for
- spring @Autowired注解无法注入问题简述在使用spring框架的过程中,常会遇到这种两情况:1、在扫描的包以外使用需要使用ma
- NoHttp是专门做Android网络请求与下载的框架,NoHttp基本使用方法如下本文demo源码下载地址: http://xiazai.
- 快速回顾1.Lambda表达式: (参数) -> {主体}Lambda表达式打开了函数式编程爱好者继续使用Java的大门。Lambda