• 欢迎访问开心洋葱网站,在线教程,推荐使用最新版火狐浏览器和Chrome浏览器访问本网站,欢迎加入开心洋葱 QQ群
  • 为方便开心洋葱网用户,开心洋葱官网已经开启复制功能!
  • 欢迎访问开心洋葱网站,手机也能访问哦~欢迎加入开心洋葱多维思维学习平台 QQ群
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏开心洋葱吧~~~~~~~~~~~~~!
  • 由于近期流量激增,小站的ECS没能经的起亲们的访问,本站依然没有盈利,如果各位看如果觉着文字不错,还请看官给小站打个赏~~~~~~~~~~~~~!

Logstash增量同步Mysql数据到搜索引擎

Mysql supingemail 1839次浏览 0个评论

.

目录

下载logstash和mysql驱动包

安装 jdbc 和 elasticsearch 插件

编写配置文件

运行

多表同步

最近项目用到分库分表,涉及数据查询需要联表或分页时发现需要合并表,影响查询性能,所以引入ES,但是订单数据要求实时性高,数据量又比较大,如果同一时刻改了表的状态字段,一条一条写很耗时间,所以引入logstash 来增量同步数据库数据到ES,省去业务方进行数据更新。下面介绍一下logstash配置和使用:

  1. 下载logstash和mysql驱动包

下载logstash包并解压

[root@srv117 plugins]# wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zip
[root@srv117 plugins]# unzip logstash-6.2.3.zip && cd logstash-6.2.3

下载mysql驱动并解压

[root@srv117 logstash-6.2.3]# wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zip
[root@srv117 logstash-6.2.3]# unzip mysql-connector-java-5.1.46.zip
  1. 安装 jdbc 和 elasticsearch 插件

安装JDBC插件

[root@srv117 logstash-6.2.3]# bin/logstash-plugin install logstash-input-jdbc

安装elasticsearch插件

[root@srv117 logstash-6.2.3]# bin/logstash-plugin install logstash-output-elasticsearch
  •  
  1. 编写配置文件

    logstash同步mysql数据原理:通过配置文件读取指定数据库表的数据,根据条件查询得到数据写入流中,然后通过配置输出流(ES数据源),将数据写入ES中。
    增量获取的方式没有通过 binlog 方式同步,而是用一个递增字段作为条件去查询,每次都记录当前查询的位置,由于递增的特性,只需要查询比当前大的记录即可获取这段时间内的全部增量,一般的递增字段有两种,AUTO_INCREMENT 的主键 id 和 ON UPDATE CURRENT_TIMESTAMP 的 update_time 字段,id 字段只适用于那种只有插入没有更新的表,update_time 更加通用一些,建议在 mysql 表设计的时候都增加一个 update_time 字段。

输入配置:

input {
  jdbc {
    jdbc_driver_library => "/usr/local/genlot/es/plugins/logstash-6.2.3/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://10.8.0.117:3306/user?useAffectedRows=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "root"
    schedule => "*/5 * * * *"
    statement => "select * from user_test where update_time >= :sql_last_value"
    lowercase_column_names=>false
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "/usr/local/es/plugins/logstash-6.2.3/config/table_config/updateTime_0000_0000"
    #开启分页查询
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
  }
}

输入属性解析:
jdbc_driver_library:驱动路径
jdbc_driver_class :驱动类名称
jdbc_connection_string :数据库连接地址
jdbc_user : 数据库用户名
jdbc_password : 数据库密码
schedule: 定时任务配置,秒,分,时,天
statement : sql查询语句
lowercase_column_names:列名是否转为小写
use_column_value : 使用递增列的值
tracking_column_type : 递增字段的类型,numeric 表示数值类型, timestamp 表示时间戳类型
tracking_column : 递增字段的名称,这里使用 update_time 这一列,这列的类型是 timestamp
last_run_metadata_path : 同步点文件,这个文件记录了上次的同步点,重启时会读取这个文件,这个文件可以手动修改
jdbc_paging_enabled : 是否开启分页查询
jdbc_page_size : 每页查询条数
更多输入属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
输出配置:

output {
	elasticsearch {
	action => "update"
	doc_as_upsert => true
	hosts  => ["127.0.0.1:19011"]
	index => "user_db"
	document_type => "user_table"
	document_id => "%{userId}"
    }
	stdout {
		codec => json_lines
  }
}

输出属性解析:
action : 动作,update标识更新,index标识新增(默认)
doc_as_upsert : update时如果不存在是否新增。
hosts : es集群地址,http请求地址。
index : 导入到 es 中的 index 名
document_type :导入到 es 中的 type名
document_id : 导入到 es 中的文档 id,这个需要设置成主键,否则同一条记录更新后在 es 中会出现两条记录,%{userId} 表示引用 mysql 表中 userId字段的值。
更多输出属性参照:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html

完整例子:

input {
  jdbc {
    jdbc_driver_library => "/usr/local/genlot/es/plugins/logstash-6.2.3/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://10.8.0.117:3306/user?useAffectedRows=true&useUnicode=true&characterEncoding=UTF-8"
    jdbc_user => "root"
    jdbc_password => "root"
    schedule => "*/5 * * * *"
    statement => "select * from user_test where update_time >= :sql_last_value"
    lowercase_column_names=>false
    use_column_value => true
    tracking_column_type => "timestamp"
    tracking_column => "update_time"
    last_run_metadata_path => "/usr/local/es/plugins/logstash-6.2.3/config/table_config/updateTime_0000_0000"
    #开启分页查询
    jdbc_paging_enabled => true
    jdbc_page_size => 50000
  }
}

filter{
	mutate {
                remove_field =>["@timestamp","@version","update_time"]
	}
}

output {
	elasticsearch {
	action => "update"
	doc_as_upsert => true
	hosts  => ["127.0.0.1:19011"]
	index => "user_db"
	document_type => "user_table"
	document_id => "%{userId}"
    }
	stdout {
		codec => json_lines
  }
}

  1. 运行

将以上配置文件保存到user_es.cfg,并存放于config目录下,执行以下语句:

[root@srv117 logstash-6.2.3]bin/logstash -f config/user_es.cfg
  •  

如果成功了会在标准输出输出执行的 sql 语句。

[2018-04-14T18:12:00,278][INFO ][logstash.inputs.jdbc     ] (0.001011s) SELECT version()
[2018-04-14T18:12:00,284][INFO ][logstash.inputs.jdbc     ] (0.000723s) SELECT * FROM user_test WHERE update_time > '2019-04-14 18:55:00'

多表同步

如果业务比较复杂,涉及多个业务表数据同步,或者进行了分库分表,需要进行多表同步,可以添加多个配置文件,例如user_es1.cfg,user_es2.cfg。然后修改配置文件
config/pipelines.yml。添加以上两个配置。

- pipeline.id: user_es1
  path.config: "config/user_es1.cfg"
- pipeline.id: user_es1
  path.config: "config/user_es1.cfg"

执行bin/logstash 启动即可。后台启动,新建一个start.sh文件,添加以下内容

#!/bin/bash
echo "START logstash "
nohup sh bin/logstash &exit

执行sh start.sh即可。注意:start.sh文件必须放在logstash-6.2.3根目录下。

其实如果是纯粹的数据搬移工作,那么使用阿里的 datax 其实是很方便的。速度也很快,而且还方便操作 !
 


开心洋葱 , 版权所有丨如未注明 , 均为原创丨未经授权请勿修改 , 转载请注明Logstash增量同步Mysql数据到搜索引擎
喜欢 (0)

您必须 登录 才能发表评论!

加载中……