apache-kafka - 卡夫卡流抑制在窗口化后不产生输出
问题描述
我正在窗口化来自流环境的传入对象,收集并打印它。使用 kafka suppress 来避免中间结果。
使用抑制后,我无法进行任何输出。如果我注释掉抑制代码工作正常但打印中间结果。
import com.savk.workout.kafka.streams.kafkastreamsworkout.config.ConfigUtils;
import com.savk.workout.kafka.streams.kafkastreamsworkout.model.Observation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Properties;
@Component
public class ObservationAnalyser {
@PostConstruct
public void initialize() {
StreamsBuilder streamsBuilder;
KStream<String, Observation> observationKStream;
String observationSerde = ConfigUtils.getObservationSerde(); //TODO : Should we move to a JSON Serde?
Properties kafkaStreamProperties = ConfigUtils.getKafkaStreamConfig();
kafkaStreamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
kafkaStreamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, observationSerde);
//JsonSerde<ObservationCollector> observationCollectorJsonSerde = new JsonSerde<>(ObservationCollector.class);
streamsBuilder = new StreamsBuilder();
observationKStream = streamsBuilder.stream(ConfigUtils.KAFKA_SOURCE_TOPIC);
observationKStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(ConfigUtils.ONE_MINUTE_IN_MILLISECONDS)))
.aggregate(
() -> new ObservationCollector(),
(key, value, observationCollector) -> observationCollector.addObservations(value),
Materialized.with(Serdes.String(), new JsonSerde<>(ObservationCollector.class))
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) //AFTER COMMENTING THIS LINE, I CAN SEE THE OUTPUT
.toStream((key, value) -> key.key())
.foreach((key, observationCollector) -> {
System.out.println("Key :: " + key);
for(Observation observation : observationCollector.getObservations()) {
System.out.println("Observation :: " + observation);
}
});
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), kafkaStreamProperties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
我无法弄清楚什么是问题/问题或找到任何资源进行故障排除。
解决方案
默认的“宽限期”是 1 天(如果suppress()
不使用,出于向后兼容性的原因)。因此,窗口不会在事件时间提前 1 天之前关闭。
您可能希望通过以下方式减少宽限期(也可能还有保留时间)
TimeWindows.of(..).grace(...)
Materialized.with(...).withRetentionTime(...)
推荐阅读
- php - 寻找逻辑来扫描阵列以寻找下一个活动时隙
- docker - docker rabbit mq 图像无法加载
- ruby-on-rails - rails 和 rake 有什么区别?
- google-cloud-platform - 具有写入权限和预定义元数据的 GCP 存储 file.getSignedUrl
- javascript - 我想用 rxjs 查找字符串序列
- python - 运行 Windows 批处理文件以使用我的 anaconda 环境调用 python 脚本
- java - 如何通过 Spring WebClient 通过服务名称从另一个微服务调用一个微服务:Java、Spring
- typescript - 使用 mongoose 文档扩展现有接口
- amazon-web-services - 在 aws 负载均衡器上托管 Pritunl VPN
- r - 根据R中的站点距离制作权重列