java - Apache Flume Java 客户端无法从单个 Kafka 接收器启动
问题描述
使用org.apache.flume.agent.embedded.EmbeddedAgent
. 配置如下:
Map<String, String> configurationProperties = ...;
service.configure(configurationProperties);
在哪里configurationProperties
设置:
{
"kafkaSink.kafka.producer.reconnect.backoff.max.ms": "30000",
"processor.type": "load_balance",
"sinks": "kafkaSink1",
"channel.keep-alive": "0",
"channel.checkpointDir": "********************************",
"kafkaSink.kafka.producer.reconnect.backoff.ms": "2000",
"channel.dataDirs": "********************************",
"kafkaSink.kafka.producer.retry.backoff.ms": "1000",
"processor.selector.maxTimeOut": "60000",
"kafkaSink.kafka.producer.max.request.size": "5485760",
"kafkaSink1.flumeBatchSize": "1000",
"kafkaSink.kafka.producer.buffer.memory": "67108864",
"kafkaSink.kafka.producer.client.id": "********************************",
"kafkaSink1.useFlumeEventFormat": "true",
"kafkaSink1.kafka.topic": "********************************",
"kafkaSink.kafka.producer.batch.size": "8196",
"channel.kafka.dataDirs": "********************************",
"kafkaSink1.type": "KAFKA",
"channel.backupCheckpointDir": "********************************",
"kafkaSink1.allowTopicOverride": "true",
"channel.useDualCheckpoints": "true",
"kafkaSink.kafka.producer.compression.type": "lz4",
"processor.maxBackoff": "60000",
"use_dual_channel": "true",
"channel.capacity": "1000000",
"channel.byteCapacityBufferPercentage": "50",
"channel.transactionCapacity": "1000",
"channel.byteCapacity": "10485760",
"channel.type": "file",
"processor.backoff": "true",
"channel.kafka.checkpointDir": "********************************",
"channel.kafka.backupCheckpointDir": "********************************",
"kafkaSink1.kafka.bootstrap.servers": "********************************",
"kafkaSink.kafka.producer.acks": "-1"
}
在运行时它抛出以下内容:
java.lang.NullPointerException
at org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateGroups(FlumeConfiguration.java:927)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:384)
at org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.access$000(FlumeConfiguration.java:228)
at org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:153)
at org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:133)
at org.apache.flume.agent.embedded.MemoryConfigurationProvider.getFlumeConfiguration(MemoryConfigurationProvider.java:45)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:97)
at org.apache.flume.agent.embedded.MaterializedConfigurationProvider.get(MaterializedConfigurationProvider.java:40)
at org.apache.flume.agent.embedded.EmbeddedAgent.doConfigure(EmbeddedAgent.java:161)
at org.apache.flume.agent.embedded.EmbeddedAgent.configure(EmbeddedAgent.java:99)
at ******************.startService(******************)
而且水槽不会启动。
代码org.apache.flume.conf.sink.SinkGroupConfiguration.configure(SinkGroupConfiguration.java:52
显示它正在寻找sinks
不为空的属性,因此它不应该抛出任何此类错误......
有谁知道为什么?没有关于这些的任何文件....
解决方案
万一有人偶然发现了这个问题……经过进一步调查后,我相信 Apache Flume 项目已经死了,我们将停止使用它。有关详细信息,请参阅以下内容:
许多未解决的问题......没有回应...... https://cwiki.apache.org/confluence/display/FLUME/Developer+Section
上一个版本大约在 2 年前发布(一开始它通常每 6 个月发布一次)。https://flume.apache.org/index.html
Wiki 甚至没有更新最新版本 - 最后一次更新是在 5 年前 - https://cwiki.apache.org/confluence/display/FLUME/Home
可悲的是,我将不得不以这种方式结束这个问题。希望对任何人都有帮助。
推荐阅读
- android - 连接到BLE设备后,我怎么知道有哪些服务可用,我只知道打印uuid但我想知道服务的名称
- android - 将渐变着色器应用于 LazyRow
- sql-server - 在触发器中使用动态 SQL 来识别更改
- python - 对字典理解中的列表求和
- reactjs - React 功能组件 - 同时使用回调和道具
- equation-solving - 给定边际表后解决列联表
- quickbase - 在 QuickBase 中,有没有办法将明细报表添加到公式字段?
- python - 如何跟踪用户输入的数量,并将它们显示在字符串中?
- ruby-on-rails - 优化填充 Rails 计数器缓存
- spring - 为什么我得到一个堆栈溢出的无限列表?