关于通过java调用datax,返回任务执行的方法
作者:沉梦杨志 发布时间:2023-11-28 21:26:45
DATAX
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
datax的详细介绍
请参考 DataX-Introduction
引言
因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。
datax源码跟踪
从github下完源码开始改造,datax的启动类在datax-core包下Engine类的entry方法,该方法是一个静态方法。
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
options.addOption("job", true, "Job config.");
options.addOption("jobid", true, "Job unique id.");
options.addOption("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = parser.parse(options, args);
String jobPath = cl.getOptionValue("job");
// 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
String jobIdString = cl.getOptionValue("jobid");
RUNTIME_MODE = cl.getOptionValue("mode");
Configuration configuration = ConfigParser.parse(jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = Long.parseLong(jobIdString);
} else {
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
}
boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
if (!isStandAloneMode && jobId == -1) {
// 如果不是 standalone 模式,那么 jobId 一定不能为-1
throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
}
configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);
//打印vmInfo
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
LOG.info(vmInfo.toString());
}
LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");
LOG.debug(configuration.toJSON());
ConfigurationValidate.doValidate(configuration);
Engine engine = new Engine();
engine.start(configuration);
}
里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用JobContainer 的start() 方法。
@Override
public void start() {
LOG.info("DataX jobContainer starts job.");
boolean hasException = false;
boolean isDryRun = false;
try {
this.startTimeStamp = System.currentTimeMillis();
isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
if (isDryRun) {
LOG.info("jobContainer starts to do preCheck ...");
this.preCheck();
} else {
userConf = configuration.clone();
LOG.debug("jobContainer starts to do preHandle ...");
this.preHandle();
LOG.debug("jobContainer starts to do init ...");
this.init();
LOG.info("jobContainer starts to do prepare ...");
this.prepare();
LOG.info("jobContainer starts to do split ...");
this.totalStage = this.split();
LOG.info("jobContainer starts to do schedule ...");
this.schedule();
LOG.debug("jobContainer starts to do post ...");
this.post();
LOG.debug("jobContainer starts to do postHandle ...");
this.postHandle();
LOG.info("DataX jobId [{}] completed successfully.", this.jobId);
this.invokeHooks();
}
} catch (Throwable e) {
LOG.error("Exception when job run", e);
hasException = true;
if (e instanceof OutOfMemoryError) {
this.destroy();
System.gc();
}
if (super.getContainerCommunicator() == null) {
// 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化
AbstractContainerCommunicator tempContainerCollector;
// standalone
tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);
super.setContainerCommunicator(tempContainerCollector);
}
Communication communication = super.getContainerCommunicator().collect();
// 汇报前的状态,不需要手动进行设置
// communication.setState(State.FAILED);
communication.setThrowable(e);
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
super.getContainerCommunicator().report(reportCommunication);
throw DataXException.asDataXException(
FrameworkErrorCode.RUNTIME_ERROR, e);
} finally {
if (!isDryRun) {
this.destroy();
this.endTimeStamp = System.currentTimeMillis();
if (!hasException) {
//最后打印cpu的平均消耗,GC的统计
VMInfo vmInfo = VMInfo.getVmInfo();
if (vmInfo != null) {
vmInfo.getDelta(false);
LOG.info(vmInfo.totalString());
}
LOG.info(PerfTrace.getInstance().summarizeNoException());
this.logStatistics();
}
}
}
}
而我们需要的任务信息就在this.logStatistics() 中
private void logStatistics() {
long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
if (0L == transferCosts) {
transferCosts = 1L;
}
if (super.getContainerCommunicator() == null) {
return;
}
Communication communication = super.getContainerCommunicator().collect();
communication.setTimestamp(this.endTimeStamp);
Communication tempComm = new Communication();
tempComm.setTimestamp(this.startTransferTimeStamp);
Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
// 字节速率
long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
/ transferCosts;
long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
/ transferCosts;
reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);
reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);
super.getContainerCommunicator().report(reportCommunication);
LOG.info(String.format(
"\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
+ "%-26s: %19s\n",
"任务启动时刻",
dateFormat.format(startTimeStamp),
"任务结束时刻",
dateFormat.format(endTimeStamp),
"任务总计耗时",
String.valueOf(totalCosts) + "s",
"任务平均流量",
StrUtil.stringify(byteSpeedPerSecond)
+ "/s",
"记录写入速度",
String.valueOf(recordSpeedPerSecond)
+ "rec/s", "读出记录总数",
String.valueOf(CommunicationTool.getTotalReadRecords(communication)),
"读写失败总数",
String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
));
LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" +
dateFormat.format(endTimeStamp) + "|" +
String.valueOf(totalCosts) + "|" +
StrUtil.stringify(byteSpeedPerSecond) + "|" +
String.valueOf(recordSpeedPerSecond) + "|" +
String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" +
String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
);
if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
|| communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
LOG.info(String.format(
"\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
"Transformer成功记录总数",
communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
"Transformer失败记录总数",
communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
"Transformer过滤记录总数",
communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
));
}
}
改造开始
新增返回实体DataxResult (get、set省略)
public class DataxResult {
//任务启动时刻
private long startTimeStamp;
//任务结束时刻
private long endTimeStamp;
//任务总时耗
private long totalCosts;
//任务平均流量
private long byteSpeedPerSecond;
//记录写入速度
private long recordSpeedPerSecond;
//读出记录总数
private long totalReadRecords;
//读写失败总数
private long totalErrorRecords;
//成功记录总数
private long transformerSucceedRecords;
// 失败记录总数
private long transformerFailedRecords;
// 过滤记录总数
private long transformerFilterRecords;
//字节数
private long readSucceedBytes;
//转换开始时间
private long endTransferTimeStamp;
//转换结束时间
private long startTransferTimeStamp;
//转换总耗时
private long transferCosts;
重写logStatistics方法,返回该实体。
private DataxResult logStatistics(DataxResult resultMsg) {
long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
if (0L == transferCosts) {
transferCosts = 1L;
}
if (super.getContainerCommunicator() == null) {
return resultMsg;
}
Communication communication = super.getContainerCommunicator().collect();
long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
/ transferCosts;
long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
/ transferCosts;
return resultMsg.getResultMsg(startTimeStamp,
endTimeStamp,
totalCosts,
byteSpeedPerSecond,
recordSpeedPerSecond,
communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES),
this.endTransferTimeStamp,
this.startTransferTimeStamp,
transferCosts
);
}
还需要重写JobContainer的**start()**方法。
@Override
public DataxResult start(DataxResult dataxResult) {
...
DataxResult result = new DataxResult();
result = logStatistics(dataxResult);
...
return result;
}
然后在Engine 类中添加模拟测试方法mockentry
public DataxResult mockstart(Configuration allConf) {
...
DataxResult dataxResult = new DataxResult();
return container.start(dataxResult);
}
开始测试
在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 为本地路径
该datax_home路径下有以下几个目录
public class test {
public static void main(String[] args) {
String[] datxArgs = {"-job", CoreConstant.DATAX_HOME + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};
try {
DataxResult dataxResult= Engine.mockentry(datxArgs);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
执行结果为
3
大功告成!
来源:https://blog.csdn.net/ffyangzhch/article/details/88784007


猜你喜欢
- 简介本文用示例介绍SpringBoot如何解决雪花算法主键ID传到前端后精度丢失问题。问题描述Java后端Long类型的范围-2^63~2^
- 涉及知识点在本示例中,从数据绑定,到数据展示,涉及知识点如下所示:DataGrid,要WPF提供的进行二维数据展示在列表控件,默认功能非常简
- 在hibernate5中,有了一些新的变动: 新引导 APISpatial/GIS 支持Java 8 支持扩展 AUTO
- 生产者消费者模式的几种实现方式拿我们生活中的例子来说,工厂生产出来的产品总是要输出到外面使用的,这就是生产与消费的概念。在我们实际的软件开发
- 这篇文章主要介绍了Jmeter如何添加循环控制器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以
- 群里有人问,怎样直接清空堆里的string值。有人建议直接用Dispose()方法;Dispose()销毁了对象,是一种垃圾回收机制。(这里
- 当你要做一个图库的项目时,对图片大小、像素的控制是首先需要解决的难题。一、单图生成略缩图单图经过重新绘制,生成新的图片。新图可以按一定比例由
- 本文实例分析了c#对象初始化顺序。分享给大家供大家参考。具体如下:using System;using System.Collections
- Java的super关键字当子类重写父类的方法后,子类对象将无法直接访问父类被重写的方法。为了解决这个问题,在Java中专门提供了一个sup
- #define Testusing System;namespace Wrox.ProCSharp.ParameterTestSample.
- 前言请求http的Demo是个人亲测过,目前该方式已经在线上运行着。因为是http请求,所有发送post 和get 请求的demo都有在下方
- 现在Android智能手机的像素都会提供照相的功能,大部分的手机的摄像头的像素都在1000万以上的像素,有的甚至会更高。它们大多都会支持光学
- 前言在进行lua方法注册的时候, 大多数解决方案直接否定了泛型方法, 因为在lua侧难以表达出泛型, 以及lua的函数重载问题,函数重载问题
- 本文实例为大家分享了C#遍历文件夹获取指定后缀名文件的具体代码,供大家参考,具体内容如下问题描述:项目需要,要进行某文件夹下所有shp数据的
- SpringCloud启动失败问题Nacos配置读取失败org.yaml.snakeyaml.error.YAMLException: ja
- HTTP请求:如果需要Json格式的自己转下,度娘上N种姿势…//处理http请求 requestUrl为请求地址 requestMetho
- 在LINUX上部署带有JAR包的JAVA项目首先eclipse上要装上一个小插件,叫做Fat Jar点击Fat Jar红框里选上主类点击Ne
- 本文实例讲述了Java实现Http工具类的封装操作。分享给大家供大家参考,具体如下:http工具类的实现:(通过apache包)第一个类im
- 前言前面两篇文章我们已经学习了Lifecycle和DataBind,本篇文章我们来学习Jetpack系列中比较重要的ViewModel,Je
- 本文实例为大家分享了Android表格布局TableLayout的具体代码,供大家参考,具体内容如下1.TableLayout TableL