首页 > 解决方案 > Kafka:Scala 到 Python 的转换

问题描述

参考:https ://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies

这个例子有python版本吗?参考仅具有 java 等效项。我在https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html发现了一些相似之处。

我可以将 bootstrap.servers 与 bootstrap_servers 匹配,将 key.serializer 与 key_serializer 匹配,将 value.serializer 与 value_serializer 匹配,但我无法匹配最后 3 个“group.id”、“auto.offset.reset”和“enable.auto”。犯罪”。

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

标签: pythonapache-sparkpysparkapache-kafkaspark-streaming

解决方案


您正在查看的 Scala 代码是针对消费者的。所以你需要检查消费者设置而不是生产者的设置。

如果您查看https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html,您可以找到它们的等价物:

  • group.idgroup_id
  • auto.offset.resetauto_offset_reset
  • enable.auto.commitenable_auto_commit

另请注意,消费者具有反序列化器而不是序列化器,因此:

  • key.deserializerkey_deserializer
  • value.deserializervalue_deserializer

推荐阅读