mysql - 当使用logstash将数据从mysql导入到elasticsearch usig JDBC时,不是所有的记录都被导入了吗?
问题描述
我正在使用在 NDB 引擎上运行的 MySQL 集群。我正在尝试从视图中导入数据。
这是视图的定义-
CREATE
ALGORITHM = UNDEFINED
DEFINER = `s2ic`@`%`
SQL SECURITY DEFINER
VIEW `s2ic`.`courses_view` AS
SELECT
`c`.`course_id` AS `course_id`,
`c`.`edupartner_id` AS `edupartner_id`,
`c`.`name` AS `name`,
`c`.`start_date` AS `start_date`,
`c`.`duration` AS `duration`,
`c`.`created_at` AS `created_at`,
`c`.`prerequisite` AS `prerequisite`,
`c`.`qualification` AS `qualification`,
`c`.`description` AS `description`,
`c`.`type` AS `type`,
`c`.`is_active` AS `is_active`,
`c`.`location` AS `location`,
`c`.`validity_date` AS `validity_date`,
`c`.`level` AS `level`,
`c`.`course_fee` AS `course_fee`,
`c`.`status` AS `status`,
`c`.`domain_tag` AS `domain_tag`,
`c`.`course_highlights` AS `course_highlights`,
`c`.`updated_at` AS `updated_at`,
`c`.`is_removed` AS `is_removed`,
(SELECT
`s2ic`.`users`.`name`
FROM
`s2ic`.`users`
WHERE
(`s2ic`.`users`.`user_id` = (SELECT
`s2ic`.`edupartner_user`.`user_id`
FROM
`s2ic`.`edupartner_user`
WHERE
(`s2ic`.`edupartner_user`.`edupartner_id` = `c`.`edupartner_id`)
LIMIT 1))) AS `child_name`,
IFNULL((SELECT
COUNT(`s2ic`.`student_course`.`student_course_id`)
FROM
`s2ic`.`student_course`
WHERE
((`s2ic`.`student_course`.`course_id` = `c`.`course_id`)
AND (`s2ic`.`student_course`.`status` <> 'saved')
AND (`s2ic`.`student_course`.`status` <> 'not_applied'))
LIMIT 1),
0) AS `no_of_applicants`,
IFNULL((SELECT
COUNT(`s2ic`.`student_course`.`student_course_id`)
FROM
`s2ic`.`student_course`
WHERE
((`s2ic`.`student_course`.`course_id` = `c`.`course_id`)
AND (`s2ic`.`student_course`.`status` = 'selected')
AND (`s2ic`.`student_course`.`rating` <> 0))
LIMIT 1),
0) AS `no_of_ratings`,
(SELECT
`s2ic`.`edupartner`.`parent_name`
FROM
`s2ic`.`edupartner`
WHERE
(`s2ic`.`edupartner`.`edupartner_id` = `c`.`edupartner_id`)) AS `parent_name`,
IFNULL((SELECT
CAST(AVG(`s2ic`.`student_course`.`rating`) AS DECIMAL (2 , 1 ))
FROM
`s2ic`.`student_course`
WHERE
((`s2ic`.`student_course`.`course_id` = `c`.`course_id`)
AND (`s2ic`.`student_course`.`rating` <> 0))),
0) AS `total_rating`
FROM
`s2ic`.`courses` `c`
WHERE
1
这些是表的数据类型 -
# Field, Type, Null, Key, Default, Extra
course_fee, int(20), YES, , ,
course_highlights, longtext, YES, , ,
course_id, int(20), NO, PRI, , auto_increment
created_at, datetime, NO, , CURRENT_TIMESTAMP,
created_by, int(20), YES, , ,
description, longtext, YES, , ,
domain_tag, text, YES, , ,
duration, text, YES, , ,
edupartner_id, int(20), NO, MUL, ,
is_active, tinyint(1), YES, , 0,
is_removed, tinyint(1), YES, , 0,
level, enum('beginner','intermediate','advanced'), NO, , ,
location, text, YES, , ,
name, text, YES, , ,
prerequisite, text, YES, , ,
qualification, text, YES, , ,
start_date, date, YES, , ,
status, enum('active','draft','expired','inactive'), NO, , ,
type, enum('classroom','online'), NO, , ,
updated_at, timestamp, NO, , CURRENT_TIMESTAMP, on update CURRENT_TIMESTAMP
updated_by, int(20), YES, , ,
validity_date, date, YES, , ,
这是我的 logstash conf 文件-
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://My DB IP:3306/s2ic"
jdbc_user => "XXXXXX"
jdbc_password => "XXXXXX"
jdbc_driver_library => "/home/ubuntu/mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "SELECT * FROM s2ic.courses_view where updated_at >:sql_last_value;"
tracking_column=> "updated_at"
tracking_column_type=>"timestamp"
record_last_run => true
use_column_value=>true
schedule => "* * * * * *"
clean_run=> true
}
}
output {
stdout{
codec => rubydebug
}
elasticsearch {
document_id=> "%{course_id}"
index => "s2ic_courses"
hosts => ["http://MY Elastic IP:9200"]
}
}
我在 pipeline.yml 中添加了文件 locatoin 并使用 systemctl 重新启动服务。然后将其打印在日志中-
[2020-01-02T14:07:49,624][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.3.1"}
[2020-01-02T14:07:54,726][INFO ][org.reflections.Reflections] Reflections took 192 ms to scan 1 urls, producing 19 keys and 39 values
[2020-01-02T14:07:59,169][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://My Elastic IP:9200/]}}
[2020-01-02T14:07:59,272][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://My Elastic IP:9200/]}}
[2020-01-02T14:07:59,690][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://My Elastic IP:9200/"}
[2020-01-02T14:07:59,682][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://My Elastic IP:9200/"}
[2020-01-02T14:07:59,789][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2020-01-02T14:07:59,790][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>7}
[2020-01-02T14:07:59,795][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-01-02T14:07:59,807][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-01-02T14:07:59,862][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["http://My Elastic IP:9200"]}
[2020-01-02T14:07:59,867][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//My Elastic IP:9200"]}
[2020-01-02T14:08:00,128][INFO ][logstash.outputs.elasticsearch] Using default mapping template
[2020-01-02T14:08:00,248][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-01-02T14:08:00,423][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2020-01-02T14:08:00,426][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"s2ic", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, :thread=>"#<Thread:0x34d8c27c run>"}
[2020-01-02T14:08:00,976][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"s2ic"}
[2020-01-02T14:08:01,468][INFO ][logstash.filters.geoip ] Using geoip database {:path=>"/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.1-java/vendor/GeoLite2-City.mmdb"}
[2020-01-02T14:08:01,712][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization. It is recommended to log an issue to the responsible developer/development team.
[2020-01-02T14:08:01,714][INFO ][logstash.javapipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>125, :thread=>"#<Thread:0x76e55ce7@/usr/share/logstash/logstash-core/lib/logstash/pipelines_registry.rb:65 run>"}
[2020-01-02T14:08:03,282][INFO ][logstash.inputs.beats ] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2020-01-02T14:08:03,382][INFO ][logstash.javapipeline ] Pipeline started {"pipeline.id"=>"main"}
[2020-01-02T14:08:03,915][INFO ][org.logstash.beats.Server] Starting server on port: 5044
[2020-01-02T14:08:04,192][INFO ][logstash.agent ] Pipelines running {:count=>2, :running_pipelines=>[:s2ic, :main], :non_running_pipelines=>[]}
[2020-01-02T14:08:05,678][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2020-01-02T14:08:08,911][INFO ][logstash.inputs.jdbc ] (0.022587s) SELECT version()
[2020-01-02T14:08:10,049][INFO ][logstash.inputs.jdbc ] (0.903352s) SELECT * FROM s2ic.courses_view where updated_at >'1970-01-01 00:00:00';
[2020-01-02T14:08:11,478][INFO ][logstash.inputs.jdbc ] (0.010734s) SELECT version()
[2020-01-02T14:08:12,408][INFO ][logstash.inputs.jdbc ] (0.872570s) SELECT * FROM s2ic.courses_view where updated_at >'2019-11-28 07:14:25';
[2020-01-02T14:08:12,740][INFO ][logstash.inputs.jdbc ] (0.003034s) SELECT version()
[2020-01-02T14:08:13,607][INFO ][logstash.inputs.jdbc ] (0.861277s) SELECT * FROM s2ic.courses_view where updated_at >'2020-01-02 11:31:07';
[2020-01-02T14:08:13,851][INFO ][logstash.inputs.jdbc ] (0.001440s) SELECT version()
[2020-01-02T14:08:14,236][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>"7", :_index=>"s2ic_courses", :_type=>"_doc", :routing=>nil}, #<LogStash::Event:0x46418cb0>], :response=>{"index"=>{"_index"=>"s2ic_courses", "_type"=>"_doc", "_id"=>"7", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"mapper [total_rating] cannot be changed from type [long] to [float]"}}}}
[2020-01-02T14:08:14,711][INFO ][logstash.inputs.jdbc ] (0.848864s) SELECT * FROM s2ic.courses_view where updated_at >'2020-01-02 11:40:44';
[2020-01-02T14:08:14,887][INFO ][logstash.inputs.jdbc ] (0.006226s) SELECT version()
[2020-01-02T14:08:15,745][INFO ][logstash.inputs.jdbc ] (0.847794s) SELECT * FROM s2ic.courses_view where updated_at >'2020-01-02 11:42:03
索引是用 95 个文档创建的,但视图有 96 行。我正在使用多个 conf 文件,这是许多 conf 文件的问题。所有文档都是在重新启动服务后创建的。这种行为背后的原因是什么............请帮助......
解决方案
推荐阅读
- linux - 链接:为什么链接器不尊重库的符号链接?
- javascript - 如何在屏幕上创建多个项目的幻灯片/滑块
- android - Kotlin Android 项目中的多线程测试,替代 sleep()
- css - 如何使用 Tailwind 将媒体查询应用于高度和宽度?
- python - 使用 SED 或 Python 删除每个 json obj 末尾的所有逗号和开头和结尾的方括号
- android - Android Studio Emulator getExternalFilesDir() 在设备文件资源管理器中“权限被拒绝”
- amazon-s3 - 使用 django s3 sqlite 包来使用 sqlite db
- java - Kafka 流消耗为 CSV
- parameters - Magento \\OfflineShipping\\Model\\SalesRule\\Calculator Magento 2 的 $rulesApplier 参数提供的参数配置无效
- python - 字典中存在键时出现KeyError