首页 > 解决方案 > logstash JDBC 插件多态索引

问题描述

JDBC 插件多态索引

嗨,我们有一个对项目具有多态性的表,我们想找到一种方法来更新一个 logstash 配置中的不同索引。

表结构

下面是一个示例表。列item_type表示类型(例如Pen, Post, Collection),item_id是我们数据库中项目的外键,并且在scorecron 上计算并每隔一段时间更新一次,这会更新我们的updated_at列。

流行度分数

过程

使用 logstash jdbc 插件,我们想查询数据,然后将其推送到 ES。但是,我没有看到动态推送更新到索引的方法(除了每个项目类型的logstash 配置和sql 查询)。在一个完美的世界里,我们想从上表中获取输入(见下面的输入代码)

输入

    input {
        jdbc {
            jdbc_driver_library => "/usr/share/logstash/bin/mysql-connector-java-8.0.15.jar"
            jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
            # useCursorFetch needed cause jdbc_fetch_size not working??
            # https://discuss.elastic.co/t/logstash-jdbc-plugin/84874/2
            # https://stackoverflow.com/a/10772407
            jdbc_connection_string => "jdbc:mysql://${CP_LS_SQL_HOST}:${CP_LS_SQL_PORT}/${CP_LS_SQL_DB}?useCursorFetch=true&autoReconnect=true&failOverReadOnly=false&maxReconnects=10"
            statement => "select * from view_elastic_popularity_all where updated_at > :sql_last_value"
            jdbc_user => "${CP_LS_SQL_USER}"
            jdbc_password => "${CP_LS_SQL_PASSWORD}"
            jdbc_fetch_size => "${CP_LS_FETCH_SIZE}"
            last_run_metadata_path => "/usr/share/logstash/cp/last_run_files/last_run_popularity_live"
            jdbc_page_size => '10000'
            use_column_value => true
            tracking_column => 'updated_at'
            tracking_column_type => 'timestamp'
            schedule => "* * * * *"
        }
    }

然后通过输出插件对 ES 运行更新查询(见下面的输出代码)

输出

    output {
      elasticsearch {
          index => "HOW_DO_WE_DYNAMICALLY_SET_INDEX_BASED_ON_ITEM_TYPE?"
          document_id => "%{id}"
          hosts => ["${CP_LS_ES_HOST}:${CP_LS_ES_PORT}"]
          user => "${CP_LS_ES_USER}"
          password => "${CP_LS_ES_PASSWORD}"
      }
    }

帮助?

我们不能成为第一个遇到这个问题的公司。我们将如何构建输出?

标签: elasticsearchlogstash

解决方案


您可以使用事件消息中的字段动态设置索引的名称,方法与动态设置 document_id 的方式相同。

output {
  elasticsearch {
      index => "%{item_type}"
      document_id => "%{id}"
      hosts => ["${CP_LS_ES_HOST}:${CP_LS_ES_PORT}"]
      user => "${CP_LS_ES_USER}"
      password => "${CP_LS_ES_PASSWORD}"
  }
}

推荐阅读