SpringBoot整合dataworks的实现过程
作者:IT_DLin 发布时间:2023-11-29 12:13:09
注意事项
阿里云的dataworks提供了OpenApi, 需要是企业版或旗舰版才能够调用,也就是付费项目。
这里测试主要是调用拉取dataworks上拉取的脚本,并存储到本地。
脚本包含两部分
1、开发的odps脚本(通过OpenApi获取)2、建表语句脚本(通过dataworks信息去连接maxCompute获取建立语句)
阿里云Dataworks的openApi分页查询限制,一次最多查询100条。我们拉取脚本需要分多页查询
该项目使用到了MaxCompute的SDK/JDBC方式连接,SpringBoot操作MaxCompute SDK/JDBC连接
整合实现
实现主要是编写工具类,如果需要则可以配置成SpringBean,注入容器即可使用
依赖引入
<properties>
<java.version>1.8</java.version>
<!--maxCompute-sdk-版本号-->
<max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
<!--maxCompute-jdbc-版本号-->
<max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
<!--dataworks版本号-->
<dataworks-sdk.version>3.4.2</dataworks-sdk.version>
<aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--max compute sdk-->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>${max-compute-sdk.version}</version>
</dependency>
<!--max compute jdbc-->
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-jdbc</artifactId>
<version>${max-compute-jdbc.version}</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
<!--dataworks需要引入aliyun-sdk和dataworks本身-->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-core</artifactId>
<version>${aliyun-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-dataworks-public</artifactId>
<version>${dataworks-sdk.version}</version>
</dependency>
</dependencies>
请求参数类编写
/**
* @Description
* @Author itdl
* @Date 2022/08/09 15:12
*/
@Data
public class DataWorksOpenApiConnParam {
/**
* 区域 eg. cn-shanghai
*/
private String region;
/**
* 访问keyId
*/
private String aliyunAccessId;
/**
* 密钥
*/
private String aliyunAccessKey;
/**
* 访问端点 就是API的URL前缀
*/
private String endPoint;
/**
* 数据库类型 如odps
*/
private String datasourceType;
/**
* 所属项目
*/
private String project;
/**
* 项目环境 dev prod
*/
private String projectEnv;
}
工具类编写
基础类准备,拉取脚本之后的回调函数
为什么需要回调函数,因为拉取的是所有脚本,如果合并每次分页结果的话,会导致内存溢出,而使用回调函数只是每次循环增加处理函数
/**
* @Description
* @Author itdl
* @Date 2022/08/09 15:12
*/
@Data
public class DataWorksOpenApiConnParam {
/**
* 区域 eg. cn-shanghai
*/
private String region;
/**
* 访问keyId
*/
private String aliyunAccessId;
/**
* 密钥
*/
private String aliyunAccessKey;
/**
* 访问端点 就是API的URL前缀
*/
private String endPoint;
/**
* 数据库类型 如odps
*/
private String datasourceType;
/**
* 所属项目
*/
private String project;
/**
* 项目环境 dev prod
*/
private String projectEnv;
}
初始化操作
主要是实例化dataworks openApi接口的客户端信息,maxCompute连接的工具类初始化(包括JDBC,SDK方式)
private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
/**默认的odps接口地址 在Odps中也可以看到该变量*/
private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
/**
* dataworks连接参数
*
*/
private final DataWorksOpenApiConnParam connParam;
/**
* 可以使用dataworks去连接maxCompute 如果连接的引擎是maxCompute的话
*/
private final MaxComputeJdbcUtil maxComputeJdbcUtil;
private final MaxComputeSdkUtil maxComputeSdkUtil;
private final boolean odpsSdk;
/**
* 客户端
*/
private final IAcsClient client;
public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
this.connParam = connParam;
this.client = buildClient();
this.odpsSdk = odpsSdk;
if (odpsSdk){
this.maxComputeJdbcUtil = null;
this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
}else {
this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
this.maxComputeSdkUtil = null;
}
}
private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();
// 设置账号密码
param.setAliyunAccessId(connParam.getAliyunAccessId());
param.setAliyunAccessKey(connParam.getAliyunAccessKey());
// 设置endpoint
param.setMaxComputeEndpoint(defaultEndpoint);
// 目前只处理odps的引擎
final String datasourceType = connParam.getDatasourceType();
if (!"odps".equals(datasourceType)){
throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
}
// 获取项目环境,根据项目环境连接不同的maxCompute
final String projectEnv = connParam.getProjectEnv();
if ("dev".equals(projectEnv)){
// 开发环境dataworks + _dev就是maxCompute的项目名
param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
}else {
// 生产环境dataworks的项目名和maxCompute一致
param.setProjectName(connParam.getProject());
}
return new MaxComputeSdkUtil(param);
}
private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();
// 设置账号密码
param.setAliyunAccessId(connParam.getAliyunAccessId());
param.setAliyunAccessKey(connParam.getAliyunAccessKey());
// 设置endpoint
param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));
// 目前只处理odps的引擎
final String datasourceType = connParam.getDatasourceType();
if (!"odps".equals(datasourceType)){
throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
}
// 获取项目环境,根据项目环境连接不同的maxCompute
final String projectEnv = connParam.getProjectEnv();
if ("dev".equals(projectEnv)){
// 开发环境dataworks + _dev就是maxCompute的项目名
param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
}else {
// 生产环境dataworks的项目名和maxCompute一致
param.setProjectName(connParam.getProject());
}
return new MaxComputeJdbcUtil(param);
}
调用OpenApi拉取所有脚本
/**
* 根据文件夹路径分页查询该路径下的文件(脚本)
* @param pageSize 每页查询多少数据
* @param folderPath 文件所在目录
* @param userType 文件所属功能模块 可不传
* @param fileTypes 设置文件代码类型 逗号分割 可不传
*/
public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
pageSize = setPageSize(pageSize);
// 创建请求
final ListFilesRequest request = new ListFilesRequest();
// 设置分页参数
request.setPageNumber(1);
request.setPageSize(pageSize);
// 设置上级文件夹
request.setFileFolderPath(folderPath);
// 设置区域和项目名称
request.setSysRegionId(connParam.getRegion());
request.setProjectIdentifier(connParam.getProject());
// 设置文件所属功能模块
if (!ObjectUtils.isEmpty(userType)){
request.setUseType(userType);
}
// 设置文件代码类型
if (!ObjectUtils.isEmpty(fileTypes)){
request.setFileTypes(fileTypes);
}
// 发起请求
ListFilesResponse res = client.getAcsResponse(request);
// 获取分页总数
final Integer totalCount = res.getData().getTotalCount();
// 返回结果
final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
// 计算能分几页
long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
// 只有1页 直接返回
if (pages <= 1){
callBack.handle(resultList);
return;
}
// 第一页执行回调
callBack.handle(resultList);
// 分页数据 从第二页开始查询 同步拉取,可以优化为多线程拉取
for (int i = 2; i <= pages; i++) {
//第1页
request.setPageNumber(i);
//每页大小
request.setPageSize(pageSize);
// 发起请求
res = client.getAcsResponse(request);
final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
if (!ObjectUtils.isEmpty(tableEntityList)){
// 执行回调函数
callBack.handle(tableEntityList);
}
}
}
内部连接MaxCompute拉取所有DDL脚本内容
DataWorks工具类代码,通过回调函数处理
/**
* 获取所有的DDL脚本
* @param callBack 回调处理函数
*/
public void listAllDdl(CallBack.DdlCallBack callBack){
if (odpsSdk){
final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos();
for (TableMetaInfo tableInfo : tableInfos) {
final String tableName = tableInfo.getTableName();
final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName);
callBack.handle(tableName, sqlCreateDesc);
}
}
}
MaxCompute工具类代码,根据表名获取建表语句, 以SDK为例, JDBC直接执行show create table即可拿到建表语句
/**
* 根据表名获取建表语句
* @param tableName 表名
* @return
*/
public String getSqlCreateDesc(String tableName) {
final Table table = odps.tables().get(tableName);
// 建表语句
StringBuilder mssqlDDL = new StringBuilder();
// 获取表结构
TableSchema tableSchema = table.getSchema();
// 获取表名表注释
String tableComment = table.getComment();
//获取列名列注释
List<Column> columns = tableSchema.getColumns();
/*组装成mssql的DDL*/
// 表名
mssqlDDL.append("CREATE TABLE IF NOT EXISTS ");
mssqlDDL.append(tableName).append("\n");
mssqlDDL.append(" (\n");
//列字段
int index = 1;
for (Column column : columns) {
mssqlDDL.append(" ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName());
if (!ObjectUtils.isEmpty(column.getComment())) {
mssqlDDL.append(" COMMENT '").append(column.getComment()).append("'");
}
if (index == columns.size()) {
mssqlDDL.append("\n");
} else {
mssqlDDL.append(",\n");
}
index++;
}
mssqlDDL.append(" )\n");
//获取分区
List<Column> partitionColumns = tableSchema.getPartitionColumns();
int partitionIndex = 1;
if (!ObjectUtils.isEmpty(partitionColumns)) {
mssqlDDL.append("PARTITIONED BY (");
}
for (Column partitionColumn : partitionColumns) {
final String format = String.format("%s %s COMMENT '%s'", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment());
mssqlDDL.append(format);
if (partitionIndex == partitionColumns.size()) {
mssqlDDL.append("\n");
} else {
mssqlDDL.append(",\n");
}
partitionIndex++;
}
if (!ObjectUtils.isEmpty(partitionColumns)) {
mssqlDDL.append(")\n");
}
// mssqlDDL.append("STORED AS ALIORC \n");
// mssqlDDL.append("TBLPROPERTIES ('comment'='").append(tableComment).append("');");
mssqlDDL.append(";");
return mssqlDDL.toString();
}
测试代码
public static void main(String[] args) throws ClientException {
final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam();
connParam.setAliyunAccessId("您的阿里云账号accessId");
connParam.setAliyunAccessKey("您的阿里云账号accessKey");
// dataworks所在区域
connParam.setRegion("cn-chengdu");
// dataworks所属项目
connParam.setProject("dataworks所属项目");
// dataworks所属项目环境 如果不分环境的话设置为生产即可
connParam.setProjectEnv("dev");
// 数据引擎类型 odps
connParam.setDatasourceType("odps");
// ddataworks接口地址
connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com");
final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true);
// 拉取所有ODPS脚本
dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> {
// 处理文件
for (ListFilesResponse.Data.File file : files) {
final String fileName = file.getFileName();
System.out.println(fileName);
}
});
// 拉取所有表的建表语句
dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> {
System.out.println("=======================================");
System.out.println("表名:" + tableName + "内容如下:\n");
System.out.println(tableDdlContent);
System.out.println("=======================================");
});
}
测试结果
test_001脚本
test_002脚本
test_003脚本
test_004脚本
test_005脚本
=======================================
表名:test_abc_info内容如下:CREATE TABLE IF NOT EXISTS test_abc_info
(
test_abc1 STRING COMMENT '字段1',
test_abc2 STRING COMMENT '字段2',
test_abc3 STRING COMMENT '字段3',
test_abc4 STRING COMMENT '字段4',
test_abc5 STRING COMMENT '字段5',
test_abc6 STRING COMMENT '字段6',
test_abc7 STRING COMMENT '字段7',
test_abc8 STRING COMMENT '字段8'
)
PARTITIONED BY (p_date STRING COMMENT '数据日期'
)
;
=======================================
Disconnected from the target VM, address: '127.0.0.1:59509', transport: 'socket'
项目地址
https://github.com/HedongLin123/dataworks_odps_demo
来源:https://blog.csdn.net/qq_35267557/article/details/126255857


猜你喜欢
- 前言由于业务需要,后端需要返回一个树型结构给前端,包含父子节点的数据已经在数据库中存储好,现在需要做的是如何以树型结构的形式返给给前端。数据
- Java%(取模运算)Java的取模运算1.实现算法public static double ramainder(double divide
- 本文实例为大家分享了java启动线程的方法,供大家参考,具体内容如下1.继承Threadpublic class java_thread e
- 今天深度学习一下《Java并发编程的艺术》的第1章并发编程的挑战,深入理解Java多线程,看看多线程中的坑。注意,哈肯的程序员读书笔记并不是
- 现在很多第三方Launcher((如360Launcher,GoLauncher)带有iphone主题,相信玩Android的人大都知道。本
- 本文实例讲述了Android编程实现AIDL(跨进程通信)的方法。分享给大家供大家参考,具体如下:一. 概述:跨进程通信(AIDL),主要实
- 本文实例分析了C# SQlite操作方法。分享给大家供大家参考,具体如下:最近项目需求用C#保存一些数据,如此先总结一下。需要下载Sqlit
- ${} 和 #{} 都是 MyBatis 中用来替换参数的,它们都可以将用户传递过来的参数,替换到 MyBatis 最终生成的 SQL 中,
- JAVA中Integer类下的常用方法有哪些?1.进制转换 n进制转10进制 字符串结果Integer.parseInt(String s,
- IP地址与整数之间的转换1、IP地址转换为整数原理:IP地址每段可以看成是8位无符号整数即0-255,把每段拆分成一个二进制形式组合起来,然
- * 可以说是Android开发中最常用的东西之一。我们通过 * 可以监听对象的各种变化事件,并进行一些需要的处理,相当有用,而且使用起来也
- 我正在开发一个软键盘,做得很好,但是我不知道如何自定义一个长按键的弹出窗口.我的键盘视图:<?xml version="1.
- 1、SpringSecurity 本质是一个过滤器链SpringSecurity 采用的是责任链的设计模式,它有一条很长的过滤器链。现在对这
- 前言代码生成器,也叫逆向工程,是根据数据库里的表结构,自动生成对应的实体类、映射文件和接口。看到很多小伙伴在为数据库生成实体类发愁,现分享给
- 本文实例为大家分享了java实现字符串反转的具体代码,供大家参考,具体内容如下1.需求:定义一个方法,实现字符串反转。键盘录入一个字符串,调
- 快速排序是应用最广泛的排序算法,流行的原因是它实现简单,适用于各种不同情况的输入数据且在一般情况下比其他排序都快得多。快速排序是原地排序(只
- @FeignClient()注解的使用由于SpringCloud采用分布式微服务架构,难免在各个子模块下存在模块方法互相调用的情况。比如se
- 目录基本查询延迟查询属性类型筛选复合from子句多级排序分组联合查询-join合并-zip()分区(分页)并行linq取消长时间运行的并行l
- 这里主要介绍的是优先队列的二叉堆Java实现,代码如下:package practice;import edu.princeton.cs.a
- 项目最终的文件结构1 添加maven依赖 <dependency> <groupI