首页 > 解决方案 > 当使用logstash将数据从mysql导入到elasticsearch usig JDBC时,不是所有的记录都被导入了吗?

问题描述

我正在使用在 NDB 引擎上运行的 MySQL 集群。我正在尝试从视图中导入数据。

这是视图的定义-

CREATE 
    ALGORITHM = UNDEFINED 
    DEFINER = `s2ic`@`%` 
    SQL SECURITY DEFINER
VIEW `s2ic`.`courses_view` AS
    SELECT 
        `c`.`course_id` AS `course_id`,
        `c`.`edupartner_id` AS `edupartner_id`,
        `c`.`name` AS `name`,
        `c`.`start_date` AS `start_date`,
        `c`.`duration` AS `duration`,
        `c`.`created_at` AS `created_at`,
        `c`.`prerequisite` AS `prerequisite`,
        `c`.`qualification` AS `qualification`,
        `c`.`description` AS `description`,
        `c`.`type` AS `type`,
        `c`.`is_active` AS `is_active`,
        `c`.`location` AS `location`,
        `c`.`validity_date` AS `validity_date`,
        `c`.`level` AS `level`,
        `c`.`course_fee` AS `course_fee`,
        `c`.`status` AS `status`,
        `c`.`domain_tag` AS `domain_tag`,
        `c`.`course_highlights` AS `course_highlights`,
        `c`.`updated_at` AS `updated_at`,
        `c`.`is_removed` AS `is_removed`,
        (SELECT 
                `s2ic`.`users`.`name`
            FROM
                `s2ic`.`users`
            WHERE
                (`s2ic`.`users`.`user_id` = (SELECT 
                        `s2ic`.`edupartner_user`.`user_id`
                    FROM
                        `s2ic`.`edupartner_user`
                    WHERE
                        (`s2ic`.`edupartner_user`.`edupartner_id` = `c`.`edupartner_id`)
                    LIMIT 1))) AS `child_name`,
        IFNULL((SELECT 
                        COUNT(`s2ic`.`student_course`.`student_course_id`)
                    FROM
                        `s2ic`.`student_course`
                    WHERE
                        ((`s2ic`.`student_course`.`course_id` = `c`.`course_id`)
                            AND (`s2ic`.`student_course`.`status` <> 'saved')
                            AND (`s2ic`.`student_course`.`status` <> 'not_applied'))
                    LIMIT 1),
                0) AS `no_of_applicants`,
        IFNULL((SELECT 
                        COUNT(`s2ic`.`student_course`.`student_course_id`)
                    FROM
                        `s2ic`.`student_course`
                    WHERE
                        ((`s2ic`.`student_course`.`course_id` = `c`.`course_id`)
                            AND (`s2ic`.`student_course`.`status` = 'selected')
                            AND (`s2ic`.`student_course`.`rating` <> 0))
                    LIMIT 1),
                0) AS `no_of_ratings`,
        (SELECT 
                `s2ic`.`edupartner`.`parent_name`
            FROM
                `s2ic`.`edupartner`
            WHERE
                (`s2ic`.`edupartner`.`edupartner_id` = `c`.`edupartner_id`)) AS `parent_name`,
        IFNULL((SELECT 
                        CAST(AVG(`s2ic`.`student_course`.`rating`) AS DECIMAL (2 , 1 ))
                    FROM
                        `s2ic`.`student_course`
                    WHERE
                        ((`s2ic`.`student_course`.`course_id` = `c`.`course_id`)
                            AND (`s2ic`.`student_course`.`rating` <> 0))),
                0) AS `total_rating`
    FROM
        `s2ic`.`courses` `c`
    WHERE
        1

这些是表的数据类型 -

# Field, Type, Null, Key, Default, Extra
course_fee, int(20), YES, , , 
course_highlights, longtext, YES, , , 
course_id, int(20), NO, PRI, , auto_increment
created_at, datetime, NO, , CURRENT_TIMESTAMP, 
created_by, int(20), YES, , , 
description, longtext, YES, , , 
domain_tag, text, YES, , , 
duration, text, YES, , , 
edupartner_id, int(20), NO, MUL, , 
is_active, tinyint(1), YES, , 0, 
is_removed, tinyint(1), YES, , 0, 
level, enum('beginner','intermediate','advanced'), NO, , , 
location, text, YES, , , 
name, text, YES, , , 
prerequisite, text, YES, , , 
qualification, text, YES, , , 
start_date, date, YES, , , 
status, enum('active','draft','expired','inactive'), NO, , , 
type, enum('classroom','online'), NO, , , 
updated_at, timestamp, NO, , CURRENT_TIMESTAMP, on update CURRENT_TIMESTAMP
updated_by, int(20), YES, , , 
validity_date, date, YES, , , 

这是我的 logstash conf 文件-

input {
    jdbc {
            jdbc_connection_string => "jdbc:mysql://My DB IP:3306/s2ic"
            jdbc_user => "XXXXXX"
            jdbc_password => "XXXXXX"
            jdbc_driver_library => "/home/ubuntu/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
            jdbc_driver_class => "com.mysql.jdbc.Driver"
            statement => "SELECT * FROM s2ic.courses_view where updated_at >:sql_last_value;"
            tracking_column=> "updated_at"
            tracking_column_type=>"timestamp"
            record_last_run => true
            use_column_value=>true
            schedule => "* * * * * *"
            clean_run=> true

    }
}

output {
stdout{
codec => rubydebug
}
elasticsearch {
document_id=> "%{course_id}"
index => "s2ic_courses"
hosts => ["http://MY Elastic IP:9200"]
}
}

我在 pipeline.yml 中添加了文件 locatoin 并使用 systemctl 重新启动服务。然后将其打印在日志中-

[2020-01-02T14:07:49,624][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.3.1"}
[2020-01-02T14:07:54,726][INFO ][org.reflections.Reflections] Reflections took 192 ms to scan 1 urls, producing 19 keys and 39 values 
[2020-01-02T14:07:59,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://My Elastic IP:9200/]}}
[2020-01-02T14:07:59,272][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://My Elastic IP:9200/]}}
[2020-01-02T14:07:59,690][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://My Elastic IP:9200/"}
[2020-01-02T14:07:59,682][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://My Elastic IP:9200/"}
[2020-01-02T14:07:59,789][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2020-01-02T14:07:59,790][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2020-01-02T14:07:59,795][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-01-02T14:07:59,807][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-01-02T14:07:59,862][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://My Elastic IP:9200"]}
[2020-01-02T14:07:59,867][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//My Elastic IP:9200"]}
[2020-01-02T14:08:00,128][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2020-01-02T14:08:00,248][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-01-02T14:08:00,423][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-01-02T14:08:00,426][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"s2ic", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, :thread=>"#<Thread:0x34d8c27c run>"}
[2020-01-02T14:08:00,976][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"s2ic"}
[2020-01-02T14:08:01,468][INFO ][logstash.filters.geoip   ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.1-java/vendor/GeoLite2-City.mmdb"}
[2020-01-02T14:08:01,712][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-01-02T14:08:01,714][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, :thread=>"#<Thread:0x76e55ce7@/usr/share/logstash/logstash-core/lib/logstash/pipelines_registry.rb:65 run>"}
[2020-01-02T14:08:03,282][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2020-01-02T14:08:03,382][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
[2020-01-02T14:08:03,915][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2020-01-02T14:08:04,192][INFO ][logstash.agent           ] Pipelines running {:count=>2, :running_pipelines=>[:s2ic, :main], :non_running_pipelines=>[]}
[2020-01-02T14:08:05,678][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-01-02T14:08:08,911][INFO ][logstash.inputs.jdbc     ] (0.022587s) SELECT version()
[2020-01-02T14:08:10,049][INFO ][logstash.inputs.jdbc     ] (0.903352s) SELECT * FROM s2ic.courses_view where updated_at >'1970-01-01 00:00:00';
[2020-01-02T14:08:11,478][INFO ][logstash.inputs.jdbc     ] (0.010734s) SELECT version()
[2020-01-02T14:08:12,408][INFO ][logstash.inputs.jdbc     ] (0.872570s) SELECT * FROM s2ic.courses_view where updated_at >'2019-11-28 07:14:25';
[2020-01-02T14:08:12,740][INFO ][logstash.inputs.jdbc     ] (0.003034s) SELECT version()
[2020-01-02T14:08:13,607][INFO ][logstash.inputs.jdbc     ] (0.861277s) SELECT * FROM s2ic.courses_view where updated_at >'2020-01-02 11:31:07';
[2020-01-02T14:08:13,851][INFO ][logstash.inputs.jdbc     ] (0.001440s) SELECT version()
[2020-01-02T14:08:14,236][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"7", :_index=>"s2ic_courses", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x46418cb0>], :response=>{"index"=>{"_index"=>"s2ic_courses", "_type"=>"_doc", "_id"=>"7", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [total_rating] cannot be changed from type [long] to [float]"}}}}
[2020-01-02T14:08:14,711][INFO ][logstash.inputs.jdbc     ] (0.848864s) SELECT * FROM s2ic.courses_view where updated_at >'2020-01-02 11:40:44';
[2020-01-02T14:08:14,887][INFO ][logstash.inputs.jdbc     ] (0.006226s) SELECT version()
[2020-01-02T14:08:15,745][INFO ][logstash.inputs.jdbc     ] (0.847794s) SELECT * FROM s2ic.courses_view where updated_at >'2020-01-02 11:42:03

索引是用 95 个文档创建的,但视图有 96 行。我正在使用多个 conf 文件,这是许多 conf 文件的问题。所有文档都是在重新启动服务后创建的。这种行为背后的原因是什么............请帮助......

标签: mysqlelasticsearchlogstashlogstash-jdbc

解决方案


推荐阅读