apache-kafka - 在 kafka 流记录转发期间,接收器的分区计数减少
问题描述
我正在使用 kafka 流来处理一些 kafka 记录,我有两个节点,一个用于进行一些转换,另一个是最终接收器。
我的主题是 INTER_TOPIC 和 FINAL_TOPIC 每个有 20 个分区。我的生产者写入 INTER_TOPIC 正在写入键值,分区器是循环法。
下面是我的间变换节点的代码。
public void streamHandler() {
Properties props = getKafkaProperties();
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> processStream = builder.stream("INTER_TOPIC",
Consumed.with(Serdes.String(), Serdes.String()));
//processStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));
processStream.map((key, value) -> getTransformer().transform(key, value)).filter((key,value)->filteroutFailedRequest(key,value)).to("FINAL_TOPIC", Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams IStreams = new KafkaStreams(builder.build(), props);
IStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throw-able e) {
logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
}
});
IStreams.cleanUp();
IStreams.start();
try {
System.in.read();
} catch (IOException e) {
logger.error("Failed streaming ",e);
}
}
但是我的接收器仅在 2 个分区中获取数据,但我配置了 20 个流线程,并且我验证了我的生产者正在写入所有 20 个分区,如何知道我的转换节点转发到我的 FINAL_TOPIC 的所有 20 个分区
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:39:41,416 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:40:57,427 INFO c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
解决方案
并且 partition-er 是循环的
为什么你认为分区器是循环的?默认情况下,Kafka Streams 基于键应用基于散列的分区。
如果要更改默认分区器,可以实现接口StreamPartitioner
并通过以下方式传递:
Produced.with(Serdes.String(), Serdes.String())
.withStreamPartitioner(...)
推荐阅读
- ruby - 代码块中的两条垂直线和 super.select 是什么意思?
- c++ - 在可变参数模板的情况下如何应用模板模板参数
- java - MALLET 无法恢复实例列表
- angular - 被调用但未在角度测试中显示的功能
- python - 从源代码字符串中提取 Python 函数源文本
- java - 如何强制 Android ListView 以更少(滚动动作/拖动)更改/更新列表项位置?
- javascript - 有没有办法通过 JavaScript 信任 HTTPS 证书?
- python - 将数据添加到 most_common() 列表?
- c - 为什么我的 pthread_t 指针数组会导致 pthread_create 出现段错误,但对数组中 pthread_t 的引用却没有
- php - 如何对 $_POST $_FILES 文件进行 base 64 编码