scala - Beam 管道:按时间桶从 Kafka 到 HDFS
问题描述
我正在尝试烘焙一个非常简单的管道,它从 Kafka ( KafkaIO.read
) 读取事件流并将相同的事件写入 HDFS,按小时将每个事件存储在一起(小时是从事件的时间戳字段中读取的,而不是处理时间)。
不能对事件的时间戳做出任何假设(即使 99% 的时间是实时的,它们也可能跨越多天),并且绝对没有关于事件顺序的信息。我的第一次尝试是创建一个在处理时间内运行的管道。
我的管道如下所示:
val kafkaReader = KafkaIO.read[String, String]()
.withBootstrapServers(options.getKafkaBootstrapServers)
.withTopic(options.getKafkaInputTopic)
.withKeyDeserializer(classOf[StringDeserializer])
.withValueDeserializer(classOf[StringDeserializer])
.updateConsumerProperties(
ImmutableMap.of("receive.buffer.bytes", Integer.valueOf(16 * 1024 * 1024))
)
.commitOffsetsInFinalize()
.withoutMetadata()
val keyed = p.apply(kafkaReader)
.apply(Values.create[String]())
.apply(new WindowedByWatermark(options.getBatchSize))
.apply(ParDo.of[String, CustomEvent](new CustomEvent))
val outfolder = FileSystems.matchNewResource(options.getHdfsOutputPath, true)
keyed.apply(
"write to HDFS",
FileIO.writeDynamic[Integer, CustomEvent]()
.by(new SerializableFunction[CustomEvent, Integer] {
override def apply(input: CustomEvent): Integer = {
new Instant(event.eventTime * 1000L).toDateTime.withMinuteOfHour(0).withSecondOfMinute(0)
(eventZeroHoured.getMillis / 1000).toInt
}
})
.via(Contextful.fn(new SerializableFunction[CustomEvent, String] {
override def apply(input: CustomEvent): String = {
convertEventToStr(input)
}
}), TextIO.sink())
.withNaming(new SerializableFunction[Integer, FileNaming] {
override def apply(bucket: Integer): FileNaming = {
new BucketedFileNaming(outfolder, bucket, withTiming = true)
}
})
.withDestinationCoder(StringUtf8Coder.of())
.to(options.getHdfsOutputPath)
.withTempDirectory("hdfs://tlap/tmp/gulptmp")
.withNumShards(1)
.withCompression(Compression.GZIP)
)
这是我的WindowedByWatermark:
class WindowedByWatermark(bucketSize: Int = 5000000) extends PTransform[PCollection[String], PCollection[String]] {
val window: Window[String] = Window
.into[String](FixedWindows.of(Duration.standardMinutes(10)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(bucketSize))
)
.withAllowedLateness(Duration.standardMinutes(30))
.discardingFiredPanes()
override def expand(input: PCollection[String]): PCollection[String] = {
input.apply("window", window)
}
}
管道运行完美,但由于写入阶段(由writeDynamic
. 大多数事件都是实时发生的,因此它们属于同一时间。我也尝试使用小时和分钟来存储数据,但没有太多帮助。
经过几天的痛苦,我决定使用 a 复制相同的 Flink bucketingSink
,并且性能非常好。
val stream = env
.addSource(new FlinkKafkaConsumer011[String](options.kafkaInputTopic, new SimpleStringSchema(), properties))
.addSink(bucketingSink(options.hdfsOutputPath, options.batchSize))
根据我的分析(即使使用 JMX),Beam 中的线程在写入 HDFS 的阶段正在等待(这会导致管道暂停从 Kafka 检索数据)。
因此,我有以下问题:
bucketingSink
是否可以像在 Beam 中所做的那样下推铲斗?- 有没有更聪明的方法可以在 Beam 中实现同样的效果?
解决方案
推荐阅读
- python - 数据框转移python熊猫
- python - 如何使用 Selenium Python 单击链接问题
- flutter - 如何在按下时立即更改图标按钮颜色(颤动)
- c# - Add custom XML using C# VSTO addon
- save - 如何将每个图像保存到 jupyter notebook 上这个数组的文件夹中?
- java - 将 StringBuilder 输出直接转换为 InputSream
- intellij-idea - 如何在 IntelliJ IDEA 中为 Ktor 设置运行配置?
- f# - 如何限制列表中的线程数
F# 中的 .asParallel - javascript - 检查是否从具有相同类 javascript 的输入中选择了至少一个单选按钮
- macos - Visual Studio for Mac 不会创建按钮单击处理程序