fluentd - FluentD 将日志从 kafka 转发到另一个 fluentD
问题描述
我需要将我的应用程序日志发送到作为 EFK 服务一部分的 FluentD。所以我尝试配置另一个 FluentD 来做到这一点。
my-fluent.conf:
<source>
@type kafka_group
consumer_group cgrp
brokers "#{ENV['KAFKA_BROKERS']}"
scram_mechanism sha512
username "#{ENV['KAFKA_USERNAME']}"
password "#{ENV['KAFKA_PASSWORD']}"
ssl_ca_certs_from_system true
topics "#{ENV['KAFKA_TOPICS']}"
format json
</source>
<filter TOPIC>
@type parser
key_name log
reserve_data false
<parse>
@type json
</parse>
</filter>
<match TOPIC>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host "#{ENV['FLUENTD_HOST']}"
port "#{ENV['FLUENTD_PORT']}"
shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
</server>
</store>
</match>
我能够stdout
正确地看到输出
2021-07-06 07:36:54.376459650 +0000 主题:{"foo":"bar", ...}
但我无法看到来自 kibana 的日志。跟踪后我发现第二个 fluentd 在接收数据时抛出错误:
{"time":"2021-07-05 11:21:41 +0000","level":"error","message":"读取数据时出现意外错误 host="XXXX" port=58548 error_class=MessagePack: :MalformedFormatError 错误="无效字节"","worker_id":0} {"time":"2021-07-05 11:21:41 +0000","level":"error","worker_id":0, "message":"/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:in
feed_each'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:262:in
block (2 levels) in read_messages'\n/ usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin/in_forward.rb:271:inblock in read_messages'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/server.rb:613:in
on_read_without_connection'\n/usr/lib/ruby/gems/2.7.0/ gems/cool.io-1.7.1/lib/cool.io/io.rb:123:inon_readable'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/io.rb:186:in
on_readable'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/库/酷。io/loop.rb:88:inrun_once'\n/usr/lib/ruby/gems/2.7.0/gems/cool.io-1.7.1/lib/cool.io/loop.rb:88:in
run'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/event_loop.rb:93:inblock in start'\n/usr/lib/ruby/gems/2.7.0/gems/fluentd-1.12.2/lib/fluent/plugin_helper/thread.rb:78:in
在 thread_create'"} 中阻塞
解决方案
问题是在第一次流利中缺少安全标签。
<match TOPIC>
@type copy
<store>
@type stdout
</store>
<store>
@type forward
<server>
host "#{ENV['FLUENTD_HOST']}"
port "#{ENV['FLUENTD_PORT']}"
shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
</server>
<security>
self_hostname HOSTNAME
shared_key "#{ENV['FLUENTD_SHARED_KEY']}"
</security>
</store>
</match>
推荐阅读
- c++ - 如何声明具有 C 链接的函数指针?
- sql - 通过使用 Oracle 连接两个表来更新列
- java - 在 xml drawable 中使用 ?colorVariant 时,Android Studio 应用程序在启动时崩溃
- authentication - 使用 Okta 身份验证发出 Google Drive API 请求
- r - 如何在 R 中的非结构化系列中删除常规月份的观察?
- vue.js - Vuejs 中 Axios API 中最不喜欢的
- java - 嵌套 for 循环到 lambda - Java
- c - 文件传输后无法使用套接字 - C Socket
- azure - Azure Functions 代理似乎正在缓存响应
- c++ - 递增常量指针变量在 for 循环中工作