.
目录
下载logstash和mysql驱动包
安装 jdbc 和 elasticsearch 插件
编写配置文件
运行
多表同步
最近项目用到分库分表,涉及数据查询需要联表或分页时发现需要合并表,影响查询性能,所以引入ES,但是订单数据要求实时性高,数据量又比较大,如果同一时刻改了表的状态字段,一条一条写很耗时间,所以引入logstash 来增量同步数据库数据到ES,省去业务方进行数据更新。下面介绍一下logstash配置和使用:
-
下载logstash和mysql驱动包
下载logstash包并解压
[root@srv117 plugins]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zip
[root@srv117 plugins]# unzip logstash-6.2.3.zip && cd logstash-6.2.3
下载mysql驱动并解压
[root@srv117 logstash-6.2.3]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
[root@srv117 logstash-6.2.3]# unzip mysql-connector-java-5.1.46.zip
-
安装 jdbc 和 elasticsearch 插件
安装JDBC插件
[root@srv117 logstash-6.2.3]# bin/logstash-plugin install logstash-input-jdbc
安装elasticsearch插件
[root@srv117 logstash-6.2.3]# bin/logstash-plugin install logstash-output-elasticsearch
-
编写配置文件
logstash同步mysql数据原理:通过配置文件读取指定数据库表的数据,根据条件查询得到数据写入流中,然后通过配置输出流(ES数据源),将数据写入ES中。
增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段。
输入配置:
input {
jdbc {
jdbc_driver_library => "/usr/local/genlot/es/plugins/logstash-6.2.3/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.8.0.117:3306/user?useAffectedRows=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "root"
schedule => "*/5 * * * *"
statement => "select * from user_test where update_time >= :sql_last_value"
lowercase_column_names=>false
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "/usr/local/es/plugins/logstash-6.2.3/config/table_config/updateTime_0000_0000"
#开启分页查询
jdbc_paging_enabled => true
jdbc_page_size => 50000
}
}
输入属性解析:
jdbc_driver_library:驱动路径
jdbc_driver_class :驱动类名称
jdbc_connection_string :数据库连接地址
jdbc_user : 数据库用户名
jdbc_password : 数据库密码
schedule: 定时任务配置,秒,分,时,天
statement : sql查询语句
lowercase_column_names:列名是否转为小写
use_column_value : 使用递增列的值
tracking_column_type : 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
tracking_column : 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
last_run_metadata_path : 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
jdbc_paging_enabled : 是否开启分页查询
jdbc_page_size : 每页查询条数
更多输入属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
输出配置:
output {
elasticsearch {
action => "update"
doc_as_upsert => true
hosts => ["127.0.0.1:19011"]
index => "user_db"
document_type => "user_table"
document_id => "%{userId}"
}
stdout {
codec => json_lines
}
}
输出属性解析:
action : 动作,update标识更新,index标识新增(默认)
doc_as_upsert : update时如果不存在是否新增。
hosts : es集群地址,http请求地址。
index : 导入到 es 中的 index 名
document_type :导入到 es 中的 type名
document_id : 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{userId} 表示引用 mysql 表中 userId字段的值。
更多输出属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html
完整例子:
input {
jdbc {
jdbc_driver_library => "/usr/local/genlot/es/plugins/logstash-6.2.3/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.8.0.117:3306/user?useAffectedRows=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "root"
schedule => "*/5 * * * *"
statement => "select * from user_test where update_time >= :sql_last_value"
lowercase_column_names=>false
use_column_value => true
tracking_column_type => "timestamp"
tracking_column => "update_time"
last_run_metadata_path => "/usr/local/es/plugins/logstash-6.2.3/config/table_config/updateTime_0000_0000"
#开启分页查询
jdbc_paging_enabled => true
jdbc_page_size => 50000
}
}
filter{
mutate {
remove_field =>["@timestamp","@version","update_time"]
}
}
output {
elasticsearch {
action => "update"
doc_as_upsert => true
hosts => ["127.0.0.1:19011"]
index => "user_db"
document_type => "user_table"
document_id => "%{userId}"
}
stdout {
codec => json_lines
}
}
-
运行
将以上配置文件保存到user_es.cfg,并存放于config目录下,执行以下语句:
[root@srv117 logstash-6.2.3]bin/logstash -f config/user_es.cfg
如果成功了会在标准输出输出执行的 sql 语句。
[2018-04-14T18:12:00,278][INFO ][logstash.inputs.jdbc ] (0.001011s) SELECT version()
[2018-04-14T18:12:00,284][INFO ][logstash.inputs.jdbc ] (0.000723s) SELECT * FROM user_test WHERE update_time > '2019-04-14 18:55:00'
多表同步
如果业务比较复杂,涉及多个业务表数据同步,或者进行了分库分表,需要进行多表同步,可以添加多个配置文件,例如user_es1.cfg,user_es2.cfg。然后修改配置文件
config/pipelines.yml。添加以上两个配置。
- pipeline.id: user_es1
path.config: "config/user_es1.cfg"
- pipeline.id: user_es1
path.config: "config/user_es1.cfg"
执行bin/logstash 启动即可。后台启动,新建一个start.sh文件,添加以下内容
#!/bin/bash
echo "START logstash "
nohup sh bin/logstash &exit
执行sh start.sh即可。注意:start.sh文件必须放在logstash-6.2.3根目录下。
其实如果是纯粹的数据搬移工作,那么使用阿里的 datax 其实是很方便的。速度也很快,而且还方便操作 !