Ruby ETL 漫谈
activewarehouse-etl
ActiveWarehouse ETL 应该是最早的Ruby ETL工具,并且十分强大,支持各种应用场景,但缺点是年久失修,没有再维护。ActiveWarehosue ETl目标是解决全部ETL相关的需求,是一个重量型工具,一般新的ETL工具的功能很少是他没有的,但其内部结构十分复杂,并且依赖ActiveRecord。另外一个缺点是,其支持的Source和Destination太过老旧,比如,对Postgresql不友好。
kiba
Kiba 是 ActiveWarehouse ETL 的维护者新开发的一个Gem,与 ActiveWarehouse ETL恰恰相反,Kiba 是一个轻量级的ETL工具。目前非常活跃,文档丰富,并且确实很轻,几百行代码勾勒出ETL的基本结构。但缺点呢,也很明显,它空无一物,什么都没提供,就是说,如果你的项目想用 Kiba 来解决实际ELT问题还差很远,需要自己去实现很多功能,如果不是很熟悉,要踩的坑还很多。另外,Kiba和Sidekiq一样,提供Kiba Pro,收费版,据说功能很多,支持以下特性:
- multi-threading (and later, multi-machines) - commonly requested
- built-in sources/transforms/destinations for common tasks lookups
- upserts / bulk load modules
- connectors optimized for parallelism (HTTP pagination extraction)
- connectors for the cloud (RedShift, ...)
- built-in helpers for common operations (debugging, limiting, caching...)
- premium support
ETL
ETL 是 Square 公司开源的一个轻量级的 ETL gem,与 ActiveWarehouse ETL 不同的是,Square ETL直接操作SQL,不依赖ActiveRecord。但可能只是解决Square公司的特定问题,目前只支持的是MySQL Source和Destination,并且Source和Destination必须在同一个Server上。另一个致命的缺点是不支持增量更新:
embulk
Embulk 的特点是其丰富的插件和并行处理能力。但其依赖JRuby和Java让Ruby开发者上手很难,并且其体系相当复杂,如果不是数据量巨大就不用考虑了。
kiba-plus
Kiba Plus是一个基于Kiba的增强,包含常用的Source和Destination实现,例如MySQL、Postgresql、CSV等。它具有以下优点:
- 支持Postgresql:其实支持Postgresql也就意味着很容易支持基于Pg的其他数据库,比如Greenplum、CitusData、Redshift等。题外话,MySQL和Pg在OLTP领域可以说不相上下,但在OLAP领域MySQL已经被Pg甩出一条街了,支持Pg意义重大。
- Fast Load && Insert:由于不依赖ActiveRecord和Sequel之类ORM工具,直接使用mysql2和pg gem操作SQL,能够利用更多数据库自身特性,比如streaming和mysql load infile or pg copy来更快的读取和插入。
- Incremental Insert:增量更新是ETL中不可缺少的部分,如果没有增量更新,即使再快的插入速度和再强的并行处理能力,随着数据量增长,也不可能每次把所以数据都重新插入一遍。
一个从MySQL导入到Pg的例子:
require 'kiba/plus'
SOURCE_URL = 'mysql://root@localhost/shopperplus'
DEST_URL = 'postgresql://hooopo@localhost:5432/crm2_dev'
source Kiba::Plus::Source::Mysql, { :connect_url => SOURCE_URL,
:query => %Q{SELECT id, email, 'hooopo' AS first_name, 'Wang' AS last_name FROM customers}
}
destination Kiba::Plus::Destination::PgBulk2, { :connect_url => DEST_URL,
:table_name => "customers",
:truncate => true,
:columns => [:id, :email, :first_name, :last_name],
:incremental => false
}
post_process do
result = PG.connect(DEST_URL).query("SELECT COUNT(*) AS num FROM customers")
puts "Insert total: #{result.first['num']}"
end
执行:
bundle exec kiba customer_mysql_to_pg.etl
输出:
# I, [2016-05-16T01:53:36.832565 #87909] INFO -- : TRUNCATE TABLE customers;
# I, [2016-05-16T01:53:36.841770 #87909] INFO -- : COPY customers (id, email, first_name, last_name) FROM STDIN WITH DELIMITER ',' NULL '\N' CSV
# Insert total: 428972