(WIP) yugong设计思路

22 Nov 2019

很多开发工作无非将从某个数据源的数据变换写入到另外一个数据源. 最简单的例子:

insert into dst.stat_table
select a, b, @func(c) as d, count(1) as cnt
from src.log_table
group by a, b, d

如果同数据库, 似乎就是直接执行一个SQL的事情. 但是往往会涉及跨实例的数据的读写, 即便单个数据库, 主从部署角度考虑, 为了避免大的聚合查询影响主库写入, 也需要选择从从库读. 这就涉及到了一些开发工作.

SQL是一个非常成功的声明式语言, 把要做什么事情写得清清楚楚, 没有一句废话. 所以我们直接用SQL配置化的方式来实现我们的数据同步逻辑呢?

以上就是一开始做yugong工具的想法.

数据管道

Unix Pipe

mysql -h $src_host $src_db -B -N -e “select * from src_table” clickhouse-client –host $dst_host –database $dst_db –query=”insert into dst_table format TabSeparated”

基于文本的序列化, 如拼SQL 缺点性能, 尽量使用各数据库原生的协议

批量的问题

SQL解析

其实一开始特别简单, 不用高大上的语法解释器, 跟据实际需求将SQL字符串拆分成数据源的查询语句和目标库的写入语句, 执行拿到数据执行写入即可.

为了支持一些简单的逻辑, 如 UT模式 拿游标等, 可以类似SQL存储过程的方式, 引入变量.

select last_timestamp from bookeeper where job_name = '...'
select a, b, count(1) from log where ut > @last_timestamp group by a, b;

对于数据量较大, 需要拆分成较多子任务执行的, 可以通过返回多行的值来解决

select today()-i as dt from system.numbers limit 10
--- 由于上述返回的是一个时间列表, 一下命令针对每个dt分别一次执行
select ... from src where dt = @dt

DSN化配置

数据库相关配置统一使用类似url的方式来定义数据配置, 从而方便解析.

mysql://user:pass@host:port/db?option_key=option_value… elasticsearch://host:port/index

统一批处理/流处理逻辑

数据来源不一定是固定的数据库表, 而可能是增量的数据, 基于表的模式我们可以通过UT方式来做小批量同步, 但是缺点: 额外ut索引负担; 不够实时.

因此我们很多时候还是需要从实时的时间日志来源, 如kafka中实时统计汇总的. 这是我们可以通过SQL来抽象从数据库以及从kafka取数的差异.

insert into stat (dt, aid, country, cnt) values (date(ts), aid, country, 1) on duplicate key update cnt = values(cnt)+1 select ts, aid from kafka://kafka:9092/#json

开发中注意几个点:

  1. kafka数据源的数据解析问题, 简单的用JSON自然问题不大, 如果是proto等协议的化, 用动态语言来做更适合.
  2. 程序中的汇总逻辑, 比较复杂, 先简单支持如max/min/sum等聚合逻辑即可.
HOME