实用工具之DataXWeb数据同步

标签: 无 分类: 未分类 创建时间:2022-03-17 08:13:22 更新时间:2025-01-17 10:39:23

1.前言

DataX实用json来描述数据同步任务,比较麻烦,所以有人开发了基于DataX的Web可视化界面,就是DataX-Web。这里有一个问题,就是DataX-Web本身没有starrocks执行器,但是它是调用的独立安装的DataX,所以如果你自己安装了starrocks的writer,那么就可以在构建执行器的时候,修改生成的json代码,这样就实现了starrocks的读取写入。

2.安装

安装其实挺简单的,先安装datax源码,也是下载解压就好了,然后指定环境变量,或者是修改安装目录下的 modules/datax-executor/bin/env.properties 文件,找到 PYTHON_PATH 指向 datax.py 脚本的位置,详情请查看问题说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
## 编译安装
## 直接从Git上面获得源代码,在项目的根目录下执行如下命令
mvn clean install
## 执行成功后将会在工程的build目录下生成安装包
build/datax-web-{VERSION}.tar.gz

## 为了顺利启动,最好是这是环境变量,
vi /etc/profile
## 设置DATAX_HOME目录
export DATAX_HOME=/usr/local/datax/bin/

## 在选定的安装目录,解压安装包
tar -zxvf datax-web-{VERSION}.tar.gz
## 交互式安装
./bin/install.sh
## 一键启动所有服务
./bin/start-all.sh
## 一键取消所有服务
./bin/stop-all.sh

3.使用

打开 ip:9527端口,进行任务创建。http://localhost:9527/index.html, 默认管理员用户名:admin 密码:123456

(1) 项目管理->创建项目,指定相应的项目名称

(2) 数据源管理->创建数据源,实用jdbc连接相应的数据库

(3) 任务管理->DataX任务模版,创建一个DataX任务模版

(4) 任务管理->任务构建,构建任务

(4) 任务管理->任务管理,执行任务

参考文章:
1. WeiYe-Jing /datax-web 开源可视化界面,功能非常的全面,还有配置增量脚本的功能
2.Datax可视化界面环境搭建Datax-web
3.大数据之CentOS7安装部署DataX+Datax-Web环境

4.增量更新配置

其实我有点蒙,我有一张表叫 realtime ,其中的一个字段叫 RecordTime,是从2012年开始的 yyyy-MM-dd HH:mm:ss 字段格式,我想着用这个字段进行增量配置。主要配置,辅助参数:时间自增;增量开始时间:选择一个时间;增量时间字段:-DlastTime=’%s’ -DcurrentTime=’%s’;增量时间格式:时间戳。

修改生成的json文件,将table去掉,加上querySql语句,这里之所以使用了 1970-01-01 08:00:00,是使用的北京时间的unix时间戳的开始时间,DATEADD第一个参数 S 指的是秒数。

1
2
3
4
5
6
"connection": [
{
"querySql": [
"select * from realtime where RecordTime >= DATEADD(S,convert(int,${lastTime}),'1970-01-01 08:00:00') and RecordTime < DATEADD(S,convert(int,${currentTime}),'1970-01-01 08:00:00') "
]
}

还有就是要使用 convert 函数,将 ${lastTime} 参数进行转换,否在生成的时间戳就变成了字符串,提示:参数数据类型 varchar 对于 dateadd 函数的参数 2 无效

1
select * from realtime where RecordTime >= DATEADD(S,'1646903822','1970-01-01 08:00:00') and RecordTime < DATEADD(S,'1647512519','1970-01-01 08:00:00') 

5.解决增量更新的过程

[尝试]
(1) 仓库里面的说明,需要配置querSql,在增量字段里面配置了:-DlastTime=’%s’ -DcurrentTime=’%s’,但是这样配置之后,日志里面出现了:您的配置凌乱了. 因为datax不能同时既配置table又配置querySql.请检查您的配置并作出修改.

1
2
3
4
5
6
7
8
9
10
11
12
13
"connection": [
{
"table": [
"realtime"
],
"jdbcUrl": [
"jdbc:sqlserver://xxx:xxx;DatabaseName=phems"
],
"querySql": [
"select * from realtime where RecordTime >= FROM_UNIXTIME(${lastTime}) and RecordTime < FROM_UNIXTIME(${currentTime})"
]
}
]

(2) 后来我把table这个字段删掉,修改了connection,只留下了querySql,

1
2
3
4
"querySql": [
"select * from realtime where RecordTime >= FROM_UNIXTIME(${lastTime}) and RecordTime < FROM_UNIXTIME(${currentTime})"
]

出现了错误:您的配置有误. 由于您读取数据库表采用了querySql的方式, 所以您不需要再配置 column. 如果您不想看到这条提醒,请移除您源头表中配置中的 column.

(3) 然后我回退到table,删除了querySql,我把增量字段改成了 RecordTime

结果出现了错误: 找不到或无法加载主类 RecordTime

(4) 我再次尝试将增量字段设置为:-DlastTime=’%s’ -DcurrentTime=’%s’,配置了querySql,将 “column” 配置为 [ “*” ],结果出现了:执行的SQL为: select * from realtime where RecordTime >= FROM_UNIXTIME(‘2010-01-01%00:00:00’) and RecordTime < FROM_UNIXTIME(‘2022-01-10%18:13:45’) 具体错误信息为:com.microsoft.sqlserver.jdbc.SQLServerException: ‘FROM_UNIXTIME’ 不是可以识别的 内置函数名称。

这个可能是因为我的数据库是SQL Server的原因。

(5) 删掉了FROM_UNIXTIME函数,还是出现了问题:执行的SQL为: select * from realtime where RecordTime >= ‘2010-01-01%00:00:00’ and RecordTime < ‘2022-01-10%18:21:28’ 具体错误信息为:com.microsoft.sqlserver.jdbc.SQLServerException: 从字符串转换日期和/或时间时,转换失败,也就是说在生成的增量SQL语句时,将日期格式中间加了一个 ‘%’, 这就无法执行SQL 语句了。

[尝试]
有人说把增量语句改为:-DlastTime=’yyyy-MM-dd ‘ -DcurrentTime=’yyyy-MM-dd’,就可以了,我尝试之后,还是不行。

参考文章:
1.DataX Web数据增量同步配置说明
2.使用时间自增格式的 yyyy-MM-dd HH:mm:ss 这个写了源码的错误地方,cmdArr.add(doc.replaceAll(SPLIT_SPACE, TRANSFORM_SPLIT_SPACE)); 如果实在不行,只能自己编译了。
3.% problem in incremental update

(6) 阅读源码 src/main/java/com/wugui/datax/executor/service/command/BuildCommand.java。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (IncrementTypeEnum.TIME.getCode() == incrementType) {
if (doc.length() > 0) doc.append(SPLIT_SPACE);
String replaceParamType = tgParam.getReplaceParamType();

if (StringUtils.isBlank(replaceParamType) || replaceParamType.equals("Timestamp")) {
long startTime = tgParam.getStartTime().getTime() / 1000;
long endTime = tgParam.getTriggerTime().getTime() / 1000;
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startTime, endTime));
} else {
SimpleDateFormat sdf = new SimpleDateFormat(replaceParamType);
String endTime = sdf.format(tgParam.getTriggerTime()).replaceAll(SPLIT_SPACE, PERCENT);
String startTime = sdf.format(tgParam.getStartTime()).replaceAll(SPLIT_SPACE, PERCENT);
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startTime, endTime));
}
//buildPartitionCM(doc, partitionStr);
doc.append(TRANSFORM_QUOTES);

} else if (IncrementTypeEnum.ID.getCode() == incrementType) {
long startId = tgParam.getStartId();
long endId = tgParam.getEndId();
if (doc.length() > 0) doc.append(SPLIT_SPACE);
doc.append(PARAMS_CM).append(TRANSFORM_QUOTES).append(String.format(replaceParam, startId, endId));
doc.append(TRANSFORM_QUOTES);
}

时间戳的方式,我想起来了,可以传递时间戳的方式,但是在时间字符串上进行下功夫,将 “2022-01-02 13:00:00” 转换为时间戳,进行比较。因为Datax-Web,会最终将querySql中的变量,进行替换。

1
2
3
4
5
6
7
8
9
10
11
--- 增量开始时间:2022-03-10 17:17:02,增量时间字段:-DlastTime='%s' -DcurrentTime='%s'增量时间格式:时间戳
--- json中的 querySql
select * from realtime where RecordTime >= (${lastTime}) and RecordTime < (${currentTime})
--- 最后转换的结果就是
select * from realtime where RecordTime >= ('1646903822') and RecordTime < ('1647509572')

--- 方法一:如果RecordTime格式为:yyyy-MM-dd HH:mm:ss,那么可以将该时间字段,使用sql语句转换为时间戳再进行比较
SELECT * from realtime r WHERE DATEDIFF(S,'1970-01-01 08:00:00', RecordTime) > 1646903822

--- 方法二:将生成的时间戳,使用sql语句,对时间戳进行时间格式化,转换为 yyyy-MM-dd HH:mm:ss 在使用 >= 进行比较,这个方法效率要高很多
select * from realtime where RecordTime >= DATEADD(S,${lastTime},'1970-01-01 08:00:00') and RecordTime < DATEADD(S,${currentTime},'1970-01-01 08:00:00')
参考文章:
1.SqlServer时间与字符串相互转换
2.How can I convert bigint (UNIX timestamp) to datetime in SQL Server?
3.SQL Server中时间戳转换为日期格式 CONVERT() 函数是把日期转换为新数据类型的通用函数。
4.sqlserver 时间戳–日期 转换 DATEADD :时间戳转换成普通时间;DATEDIFF 普通时间转换成时间戳
5.SQL SERVER 日期格式化、日期和字符串转换 1.convert(datetime,’YYYY-MM-DD HH24:MI:SS’);2.cast(‘YYYY-MM-DD HH24:MI:SS’ as datetime)
6.不一样的 SQL Server 日期格式化 这里主要是 FORMAT 函数

/usr/bin/python: can’t find ‘main‘ module in ‘’

安装datax-web,增加执行器之后,执行过程中,出现了错误:

1
2
3
4
5
6
7
8
9
## 编辑配置文件
vi /usr/local/datax-web-2.1.2/modules/datax-executor/bin/env.properties
## 修改,脚本执行器
PYTHON_PATH=/usr/local/datax/bin/datax.py

## 或者修改
vi /etc/profile
## 设置DATAX_HOME目录
export DATAX_HOME=/usr/local/datax/bin/

当 IDENTITY_INSERT 设置为 OFF 时,不能为表中的标识列插入显式值

出现这个错误,其实就是因为主键自增id不能被明显的设置,比如,如果你将id设置为自增,在进行同步的时候,就不能插入这个自增的字段。

【尝试方案】
(1)最简单的方式,就是不插入 id,这种自动增长的列。
(1)设置 IDENTITY_INSERT
当我尝试设置这个值之后,发现总是无效的。

1
SET IDENTITY_INSERT tablename ON

注意
1.每一次连接会话中的任一时刻,只能对一个表设置 IDENTITY_INSERT ON,且设置只对当前会话有效;
2.在对标识列执行插入操作进,一定要列出此标识列(当然,同时也就需要列出相关的其他列了)。

(2)尝试使用 preSql语句
在执行插入之前,先执行 set IDENTITY_INSERT语句。这个参数在 parameter 下面。

1
2
3
"preSql": [
" set IDENTITY_INSERT dbo.realtime on"
],

【解决方案】
我升级了 datax 版本,2023年09月之后,这个问题就解决了,然后 尝试使用 session 配置,最后终于可以同步数据了。

1
2
3
4
5
6
"writer": {
"name": "sqlserverwriter",
"parameter": {
"session": ["SET IDENTITY_INSERT tableName ON"]
}
}
参考文章:
【1】.当 IDENTITY_INSERT 设置为 OFF 时,不能为表中的标识列插入显式值
【2】.SET IDENTITY_INSERT 学习心得
【3】.SQL server修改主键为自动增长 SQLServer不能通过Navicat等界面工具修改主键的自动增长,修改操作只能通过sql命令来实现。
自增列不能直接修改,必须将原有ID列删除,然后重新添加一列具有identity属性的ID字段。
【4】.SET IDENTITY_INSERT (Transact-SQL) 任何时候,一个会话中只有一个表的 IDENTITY_INSERT 属性可以设置为 ON。 如果某个表已将此属性设置为 ON,则对另一个表发出 SET IDENTITY_INSERT ON 语句时,SQL Server 将返回一个错误信息,指出 SET IDENTITY_INSERT 已设置为 ON,并报告已将其属性设置为 ON 的表。
【5】.DataX PostgresqlWriter 这里是 preSql 的用法。
【6】.当IDENTITY_INSERT设置为 OFF 时,不能为表中的标识列插入显式值
【7】.【SQLServer自增列问题】sqlserverwriter 这里提到了一个 session 的配置,是修改源码改掉的,后来官方也出来修复措施。
【8】.datax(27):不太常见配置项querySql、preSql、postSql、splitPk[通俗易懂] 1.querySql;2.preSql;3.postSql;4.splitPk,

“exception”:”将 nvarchar 转换为数据类型 numeric 时出现算术溢出错误。

这个问题比较奇怪了,明明Reader和Writer的数据库的表结构都是一样的,最后在执行的时候,还是出现了这个问题,两个数据库都是SqlServer

小额赞助
本人提供免费与付费咨询服务,感谢您的支持!赞助请发邮件通知,方便公布您的善意!
**光 3.01 元
Sun 3.00 元
bibichuan 3.00 元
微信公众号
广告位
诚心邀请广大金主爸爸洽谈合作
每日一省
isNaN 和 Number.isNaN 函数的区别?

1.函数 isNaN 接收参数后,会尝试将这个参数转换为数值,任何不能被转换为数值的的值都会返回 true,因此非数字值传入也会返回 true ,会影响 NaN 的判断。

2.函数 Number.isNaN 会首先判断传入参数是否为数字,如果是数字再继续判断是否为 NaN ,不会进行数据类型的转换,这种方法对于 NaN 的判断更为准确。

每日二省
为什么0.1+0.2 ! == 0.3,如何让其相等?

一个直接的解决方法就是设置一个误差范围,通常称为“机器精度”。对JavaScript来说,这个值通常为2-52,在ES6中,提供了Number.EPSILON属性,而它的值就是2-52,只要判断0.1+0.2-0.3是否小于Number.EPSILON,如果小于,就可以判断为0.1+0.2 ===0.3。

每日三省
== 操作符的强制类型转换规则?

1.首先会判断两者类型是否**相同,**相同的话就比较两者的大小。

2.类型不相同的话,就会进行类型转换。

3.会先判断是否在对比 null 和 undefined,是的话就会返回 true。

4.判断两者类型是否为 string 和 number,是的话就会将字符串转换为 number。

5.判断其中一方是否为 boolean,是的话就会把 boolean 转为 number 再进行判断。

6.判断其中一方是否为 object 且另一方为 string、number 或者 symbol,是的话就会把 object 转为原始类型再进行判断。

每日英语
Happiness is time precipitation, smile is the lonely sad.
幸福是年华的沉淀,微笑是寂寞的悲伤。