scala - KafkaUtils.createRDD的简单Spark Structured Streaming等价物,即通过指定偏移量将kafka主题读取到RDD?
问题描述
如何通过指定开始和结束偏移量将 kafka 主题中的数据读取到 RDD?
KafkaUtils.createRDD
is是实验性的,API 相当不愉快(它返回一个臃肿的 JavaConsumerRecord
类,它甚至不能序列化并将其放入KafkaRDD
,它覆盖了很多方法(如 persist)只是抛出一个异常。
我想要的是这样一个简单的 API:
case class Message(key: String,
value: String,
offset: Long,
timestamp: Long)
def readKafka(topic: String, offsetsByPartition: Map[Int, (Long, Long)])
(config: KafkaConfig, sc: SparkContext): RDD[Message]
或类似的地方key: Array[Byte]
和value: Array[Byte]
解决方案
要从带有偏移量的 kafka 中读取,代码如下所示,如此处所引用
val df =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
以上将读取偏移量内可用的数据,然后您可以将列转换为字符串,然后转换为您的对象Message
。
val messageRDD: RDD[Message] =
df.select(
col("key").cast("string"),
col("value").cast("string"),
col("offset").cast("long"),
col("timestamp").cast("long")
).as[Message].rdd
推荐阅读
- javascript - 用 Jest 测试“console.log”
- ios - 无法在核心数据中订购自定义对象
- reactjs - React router 6,使用Navigate如何获取路径名
- python - 通过 Python 发送电子邮件
- javascript - 如何在 DHTMLX 甘特图中添加父栏并仅使用 javascript 刷新特定的父栏
- c - c中字符串和字符数组之间的长度差异?
- java - 尝试在空对象引用上调用虚拟方法“(java.lang.String)”
- javascript - Javascript 中的饼图无法正常工作
- flutter - Flutter Search Delegate 过滤数据,首字母大写
- typescript - 如何在打字稿中制作一种类型的函数链(数组)?