pyspark - How to load all records that were already published from Kafka?
问题描述
I have a pyspark structure streaming python app up set up like this
from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("data streaming app")\
.getOrCreate()
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.load()
query = data_raw.writeStream\
.outputMode("append")\
.format("console")\
.option("truncate", "false")\
.trigger(processingTime="5 seconds")\
.start()\
.awaitTermination()
And All that shows up is this
+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+
19/03/04 22:00:50 INFO streaming.StreamExecution: Streaming query made progress: {
"id" : "ab24bd30-6e2d-4c2a-92a2-ddad66906a5b",
"runId" : "29592d76-892c-4b29-bcda-f4ef02aa1390",
"name" : null,
"timestamp" : "2019-03-04T22:00:49.389Z",
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0,
"durationMs" : {
"addBatch" : 852,
"getBatch" : 180,
"getOffset" : 135,
"queryPlanning" : 107,
"triggerExecution" : 1321,
"walCommit" : 27
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[my_topic]]",
"startOffset" : null,
"endOffset" : {
"my_topic" : {
"0" : 303
}
},
"numInputRows" : 0,
"processedRowsPerSecond" : 0.0
} ],
"sink" : {
"description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@74fad4a5"
}
}
As you can see, my_topic
has 303 messages there but I cant get it to show. Additional information includes that I am using the confluent Kafka JDBC connector to query an oracle database and store the rows into the kafka topic. I have an avro schema registry setup with that. If required, I will share these property files as well.
Does anyone have any idea what's going on?
解决方案
作为流式应用程序,此 Spark 结构流式传输仅在消息发布后立即读取消息。为了测试目的,我想做的是阅读主题中的所有内容。为了做到这一点,你所要做的就是一个额外的选项readStream
,即option("startingOffsets", "earliest")
。
data_raw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "kafkahost:9092")\
.option("subscribe", "my_topic")\
.option("startingOffsets", "earliest")
.load()
推荐阅读
- servicenow - 为 ServiceNow 中的每条记录选择第一个非空字段
- pandas - 数据帧数组
- android-studio - 当我在 bluestacks 中运行它时,麦克风在我的应用程序中不起作用
- javascript - 如何使组件始终位于 react-leaflet 地图之上?
- javascript - 逐行分隔数组
- typescript - 隐藏 Mongoose 对象 Typescript 的属性
- node.js - MongoServerSelectionError:连接 ECONNREFUSED 127.0.0.1:27107
- express - 如何在猫鼬中填充对象数组
- micronaut - Micronaut:同一应用程序的多种安全配置
- c# - 如何在 c# 中将 OpenQA.Selenium.Chrome.ChromeDriver 保存到文件中