首页 > 解决方案 > 写入 Kafka 压缩主题的 Spark 代码

问题描述

有谁知道是否可以调用writeStreamKafka 主题,其中创建的主题是压缩主题?下面的代码创建了一个 Kafka 主题,但传递的选项被忽略。

    StreamingQuery query = ds
            .writeStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092"))
            .option("topic", "myTopic")
            .option("cleanup.policy", "compact")
            .option("min.insync.replicas", 1)
            .option("segment.bytes", 4096)
            .option("delete.retention.ms", 100)
            .option("min.compaction.lag.ms", 0)
            .option("min.cleanable.dirty.ratio", 0.01)
            .start();

标签: apache-sparkapache-kafka

解决方案


Spark Structure Streaming,当writeStream在后台使用 kafka 格式调用时使用 KafkaProducer。

如果在代理端auto.create.topics.enabletrue一个值(默认值),则当 KafkaProducer 获取元数据时,代理会创建新主题。

KafkaProducer 不传递任何主题的属性。Broker 使用一次默认值(例如default.replication.factor, num.partitions, log.cleanup.policy)来创建主题。

您不能通过 KafkaProducer 传递主题创建属性。


推荐阅读