首页 > 技术文章 > 将mysql数据同步到ES6.4(全量+增量)

Hero- 2018-09-29 14:54 原文

下载安装包时注意下载到指定文件夹 这里我放在OPT文件夹下
一:安装logstash
进入到opt文件夹打开终端 执行以下命令
wget -c https://artifacts.elastic.co/downloads/logstash/logstash-6.4.0.zip
加上-c支持断点续传
二:解压logstash
unzip logstash-6.4.0.zip
三:进入到logstash bin目录
cd logstash-6.4.0/bin
四:安装logstash-jdbc
./logstash-plugin install logstash-input-jdbc
五:编写配置文件(jdbc.sql和jdbc.conf,建议在bin目录下vim jdbc.conf)
六:首先在bin目录下新建一个mysql目录,里面包含jdbc.conf,jdbc.sql文件,加入mysql的驱动
jdbc.conf内容:

input {
    stdin {
    }
    jdbc {
      # 连接的数据库地址和哪一个数据库,指定编码格式,禁用SSL协议,设定自动重连
      jdbc_connection_string => "jdbc:mysql://数据库地址:端口号/数据库名?characterEncoding=UTF-8&useSSL=false&autoReconnect=true"
      # 你的账户密码
      jdbc_user => "账号"
      jdbc_password => "密码"
      # 连接数据库的驱动包,建议使用绝对地址
      jdbc_driver_library => "mysql/mysql-connector-java-5.1.45-bin.jar"
      # 这是不用动就好
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

   #处理中文乱码问题
      codec => plain { charset => "UTF-8"}
   #使用其它字段追踪,而不是用时间
    use_column_value => true
   #追踪的字段      
    tracking_column => testid    
  #如果你追踪的字段是时间类型
  #tracking_column_type => "timestamp" record_last_run => true #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值 last_run_metadata_path => "mysql/station_parameter.txt" jdbc_default_timezone => "Asia/Shanghai" statement_filepath => "mysql/jdbc.sql" #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录 clean_run => false # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 schedule => "* * * * *" type => "jdbc" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { # 要导入到的Elasticsearch所在的主机 hosts => "192.168.105.180:9200" # 要导入到的Elasticsearch的索引的名称 index => "db_anytest" # 类型名称(类似数据库表名) document_type => "table_anytest" # 主键名称(类似数据库主键) document_id => "%{testid}" } stdout { # JSON格式输出 codec => json_lines } }

 


jdbc.sql里面就直接写sql语句就行了

全量同步sql语句就不要写testid了 增量同步就加上where条件 where testid > :sql_last_value
七:开始导入(需要用root账户启动)
./logstash -f mysql/jdbc.conf

如果报错

logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the "path.data" setting

请使用

./logstash -f mysql/jdbc.conf --path.data=/root/

Elasticsearch7.0以后一个索引只能对应一个类型了 所以没有数据库名这个概念了 直接用表名当索引名

当同步多表的时候需要在jdbc上增加type属性 然后output根据type进行分类 但是如果你的表本身有type字段 那就要在sql上做处理 给数据库的表type字段起别名

 

推荐阅读