python - Kafka:Scala 到 Python 的转换
问题描述
参考:https ://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#consumerstrategies
这个例子有python版本吗?参考仅具有 java 等效项。我在https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html发现了一些相似之处。
我可以将 bootstrap.servers 与 bootstrap_servers 匹配,将 key.serializer 与 key_serializer 匹配,将 value.serializer 与 value_serializer 匹配,但我无法匹配最后 3 个“group.id”、“auto.offset.reset”和“enable.auto”。犯罪”。
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value))
解决方案
您正在查看的 Scala 代码是针对消费者的。所以你需要检查消费者设置而不是生产者的设置。
如果您查看https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html,您可以找到它们的等价物:
group.id
:group_id
auto.offset.reset
:auto_offset_reset
enable.auto.commit
:enable_auto_commit
另请注意,消费者具有反序列化器而不是序列化器,因此:
key.deserializer
:key_deserializer
value.deserializer
:value_deserializer
推荐阅读
- reactjs - 反应自定义钩子没有获得新的价值
- mysql - 如何使用 Rails 进行动态嵌套查询
- javascript - 有没有办法(或解决方法)使用应用脚本将 Google 表格中的表格添加到 Google 幻灯片演示文稿中?
- c - 如何将 LLVMBuildGEP 与 LLVM C API 一起使用
- python - 使用回调和以下链接抓取的项目数量不一致
- websphere - 用于线程挂起通知的 JMX 管理客户端
- apache-spark - “TypeError:在 Python 3.8 上导入 pyspark 时需要一个整数(获取类型字节)”
- python - 如果 __name__ == '__main__' 在内部训练神经网络和没有它之间的区别
- android - FCM 推送通知数据导出到链接的 BigQuery 不起作用
- c# - LINQ:根据最新时间通过选定的 ID 获取唯一行