Java定时调用.ktr文件的示例代码(解决方案)
作者:ACGkaka_ 发布时间:2021-12-29 13:21:49
1.Maven依赖
<!-- Kettle -->
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-core</artifactId>
<version>7.1.0.0-12</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>kettle-engine</artifactId>
<version>7.1.0.0-12</version>
</dependency>
<dependency>
<groupId>pentaho-kettle</groupId>
<artifactId>metastore</artifactId>
<version>7.1.0.0-12</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<!-- connector -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
注意:kettle的jar包依赖会拉不下来,需要将jar包install到本地,命令:
创建 0_install.bat 文件
:: 本地 install kettle-core.jar包
CALL mvn install:install-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar
:: 本地 install kettle-engine.jar包
CALL mvn install:install-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar
:: 本地 install metastore.jar包
CALL mvn install:install-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar
pause
或者deploy到内网 * 上,命令:
创建 1_deploy.bat 文件
:: * deploy kettle-core.jar包
CALL mvn deploy:deploy-file -Dfile=kettle-core-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-core -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID
:: * deploy kettle-engine.jar包
CALL mvn deploy:deploy-file -Dfile=kettle-engine-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=kettle-engine -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID
:: * deploy metastore.jar包
CALL mvn deploy:deploy-file -Dfile=metastore-7.1.0.0-12.jar -DgroupId=pentaho-kettle -DartifactId=metastore -Dversion=7.1.0.0-12 -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID
pause
(脚本创建在jar包目录下,创建好之后双击运行即可)
【jar包、脚本文件下载地址】
https://share.weiyun.com/eaOSjqP7
2.执行.ktr/.kjb工具类
KettleReadUtils.java
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import java.io.InputStream;
/**
* <p> @Title KettleReadUtils
* <p> @Description Kettle工具包
*
* @author zhj
* @date 2021/4/8 10:50
*/
public class KettleReadUtils {
/**
* 调用 kettle ktr
*
* @param path 文件路径
*/
public static void runKtr(String path) {
try {
KettleEnvironment.init();
EnvUtil.environmentInit();
TransMeta transMeta = new TransMeta(path);
Trans trans = new Trans(transMeta);
trans.execute(null);
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
throw new Exception("Errors during transformation execution!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 以流的方式调用 kettle ktr
*
* @param in 文件流
*/
public static void runKtrByStream(InputStream in) {
try {
KettleEnvironment.init();
TransMeta transMeta = new TransMeta(in, null, true, null, null);
Trans trans = new Trans(transMeta);
trans.execute(null);
trans.waitUntilFinished();
if (trans.getErrors() > 0) {
throw new Exception("Errors during transformation execution!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 调用 kettle job
*
* @param paraNames 多个参数名
* @param paraValues 多个参数值
* @param jobPath 如: String fName= "D:\\kettle\\aaa.kjb";
*/
public static void runJob(String[] paraNames, String[] paraValues, String jobPath) {
try {
KettleEnvironment.init();
JobMeta jobMeta = new JobMeta(jobPath, null);
Job job = new Job(null, jobMeta);
// 向Job 脚本传递参数,脚本中获取参数值:${参数名}
if (paraNames != null && paraValues != null) {
for (int i = 0; i < paraNames.length && i < paraValues.length; i++) {
job.setVariable(paraNames[i], paraValues[i]);
}
}
job.start();
job.waitUntilFinished();
if (job.getErrors() > 0) {
throw new Exception("Errors during job execution!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
3.创建.ktr/.kjb工具类
(此处只是提供java创建途径,可以直接使用Spoon.bat创建好的文件)
import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import java.io.File;
/**
* <p> @Title KettleReadUtils
* <p> @Description Kettle工具包
*
* @author zhj
* @date 2021/4/8 10:50
*/
public class KettleWriteUtils {
/**
* 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
*/
private static final String DATABASE_XML_1 =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>db1</name>" +
"<server>127.0.0.1</server>" +
"<type>MYSQL</type>" +
"<access>Native</access>" +
"<database>test</database>" +
"<port>3306</port>" +
"<username>root</username>" +
"<password>root</password>" +
"</connection>";
private static final String DATABASE_XML_2 =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<connection>" +
"<name>db2</name>" +
"<server>127.0.0.1</server>" +
"<type>MYSQL</type>" +
"<access>Native</access>" +
"<database>test</database>" +
"<port>3306</port>" +
"<username>root</username>" +
"<password>root</password>" +
"</connection>";
/**
* 创建ktr文件
*
* @param args args
*/
public static void main(String[] args) {
try {
KettleEnvironment.init();
KettleWriteUtils kettleWriteUtils = new KettleWriteUtils();
TransMeta transMeta = kettleWriteUtils.generateMyOwnTrans();
String transXml = transMeta.getXML();
String transName = "update_insert_Trans.ktr";
File file = new File(transName);
FileUtils.writeStringToFile(file, transXml, "UTF-8");
} catch (Exception e) {
e.printStackTrace();
return;
}
}
/**
* 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作
* @return 元数据
* @throws KettleXMLException 生成XML异常
*/
private TransMeta generateMyOwnTrans() throws KettleXMLException {
TransMeta transMeta = new TransMeta();
//设置转化的名称
transMeta.setName("insert_update");
//添加转换的数据库连接
DatabaseMeta databaseMeta1 = new DatabaseMeta(DATABASE_XML_1);
transMeta.addDatabase(databaseMeta1);
DatabaseMeta databaseMeta2 = new DatabaseMeta(DATABASE_XML_2);
transMeta.addDatabase(databaseMeta2);
//registry是给每个步骤生成一个标识Id用
PluginRegistry registry = PluginRegistry.getInstance();
//第一个表输入步骤(TableInputMeta)
TableInputMeta tableInput = new TableInputMeta();
String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
//给表输入添加一个DatabaseMeta连接数据库
DatabaseMeta db1 = transMeta.findDatabase("db1");
tableInput.setDatabaseMeta(db1);
String sql = "SELECT USER_ID,USER_NAME FROM t_manager_user";
tableInput.setSQL(sql);
//添加TableInputMeta到转换中
StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId, "table input", tableInput);
//给步骤添加在spoon工具中的显示位置
tableInputMetaStep.setDraw(true);
tableInputMetaStep.setLocation(100, 100);
transMeta.addStep(tableInputMetaStep);
//第二个步骤插入与更新
InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
//添加数据库连接
DatabaseMeta db2 = transMeta.findDatabase("db2");
insertUpdateMeta.setDatabaseMeta(db2);
//设置操作的表
insertUpdateMeta.setTableName("t_stat_user_info");
//设置用来查询的关键字
insertUpdateMeta.setKeyLookup(new String[]{"USER_ID"});
insertUpdateMeta.setKeyStream(new String[]{"USER_ID"});
insertUpdateMeta.setKeyStream2(new String[]{""});
insertUpdateMeta.setKeyCondition(new String[]{"="});
//设置要更新的字段
String[] updatelookup = {"USER_ID","USER_NAME"} ;
String[] updateStream = {"USER_ID","USER_NAME"} ;
Boolean[] updateOrNot = {false,true};
insertUpdateMeta.setUpdateLookup(updatelookup);
insertUpdateMeta.setUpdateStream(updateStream);
insertUpdateMeta.setUpdate(updateOrNot);
//添加步骤到转换中
StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update", insertUpdateMeta);
insertUpdateStep.setDraw(true);
insertUpdateStep.setLocation(250, 100);
transMeta.addStep(insertUpdateStep);
//添加hop把两个步骤关联起来
transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
return transMeta;
}
}
4.测试执行.ktr文件
执行用例:
public static void main(String[] args) {
InputStream inputStream = KettleReadUtils.class.getResourceAsStream("/etl/test.ktr");
runKtrByStream(inputStream);
}
.ktr文件位置:
执行结果:
5.Kettle所使用的mysql-connector 5.1.49 和 8 版本不兼容问题
mysql-connector-java 5.1.49 版本中,支持连接驱动,
org.gjt.mm.mysql.Driver
;mysql-connector-java 8.* 版本中,连接驱动,
com.mysql.cj.jdbc.Driver
;如果直接使用 8.* 版本 去连接 MySQL 数据库的话会出现"错误连接数据库"问题:
Driver class ‘org.gjt.mm.mysql.Driver' could not be found, make sure the ‘MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver
解决方案:
1.关闭Kettle;
2.将/data-integration/lib/ 下面的 mysql-connector-java-5.1.49.jar
替换为 mysql-connector-java-8.*.jar
;
3.打开Kettle,修改连接类型为 Generic database ,配置驱动名称为 com.mysql.cj.jdbc.Driver;
4.重新导出为.ktr/.kjb文件;
5.再用java调用即可解决问题。
整理完毕,完结撒花~
来源:https://blog.csdn.net/qq_33204709/article/details/115833635


猜你喜欢
- 在框架开发过程中,通用代码生成是一项必不可少的功能,c#在这后端模板引擎这方面第三方组件较少,我这里选择的是NVelocity,现在升级到了
- 最近要搞一个项目,需要上传相册和拍照的图片,不负所望,终于完成了! 不过需要说明一下,其实网上很多教程拍照的图片,都是缩略图不是
- 在本文中,我们将介绍二进制搜索相对于简单线性搜索的优势,并介绍它在 Java 中的实现。1. 需要有效的搜索假设我们在wine-sellin
- jwt简介冒泡排序:(Bubble Sort)是一种简单的交换排序。之所以叫做冒泡排序,因为我们可以把每个元素当成一个小气泡,根据气泡大小,
- using System;using System.Collections.Generic;using System.Linq;using
- 近日,Eclipse经常挂掉,都是由于JVM崩溃的原因。每次都有以下错误日志:## A fatal error has been detec
- 前言不得不说SpringBoot的开发者是在为大众程序猿谋福利,把大家都惯成了懒汉,xml不配置了,连tomcat也懒的配置了,典型的一键启
- 0x00 前言在一些比较极端情况下,C3P0链的使用还是挺频繁的。0x01 利用方式利用方式在C3P0中有三种利用方式http baseJN
- 1、直接使用getWindow().getDecorView().getRootView()直接使用getWindow().getDecor
- 1.企业实际项目中Git的使用在实际的企业项目开发中,我们一般Java的项目在公司都有自己的局域网代码仓库,仓库上存放着很多的项目。以我工作
- 在重写类UsernamePasswordAuthenticationFilter时抛出了这个异常,字面上理解是authenticationM
- 要想充分发挥ADO.NET的优势,不仅需要全面、深入理解ADO.NET编程模型,及时总结经验、技巧也十分重要。ADO已经有多年的实践经验,A
- 目录1.项目gitthub地址链接: https://github.com/baisul/generateCode.git切换到master
- 本文实例讲述了Android编程实现任务管理器的方法。分享给大家供大家参考,具体如下:任务管理器可以实现的功能有:1.查看当前系统下运行的所
- 定义: SharedPreferences
- Java 异常的栈轨迹(Stack Trace)详解 捕获到异常时,往往需要进行一些处理。比较简单直接的
- 一、定义实体类Person,封装生成的数据package net.dc.test;public class Person { private
- 一、获取android工程里面的各种资源的id; 1.1 string型 比如下面: << string name=”OK”&g
- 话不多说,先上图 &n
- 1、IO流1.流和流的分类什么是IO流?I:Input (输入)O: Ouput(输出)IO流的分类?有多种分类方式:一种方式是按照流的方向