spark-streaming - 流式传输到 SQL 数据仓库时 Azure Databricks 缺少条目
问题描述
首先我有以下说明,当上传 20.000 个文件时,我在数据库中获得了 20.000 条记录(每个文件仅包含 1 个记录)。
aTracking = sqlContext.read.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")
aTracking.write \
.option('user', dwUser) \
.option('password', dwPass) \
.jdbc('jdbc:sqlserver://' + dwServer + ':' + dwJdbcPort + ';database=' + dwDatabase, 'stg_tr_energy_xmlin.csv_in', mode = 'append' )
然后,出于速度目的,我认为使用 Polybase 流式传输会更好……编码为……但是我只有 +- 17.000 个条目。
aTracking = spark.readStream.format('csv').options(header='true', delimiter=';').schema(csvSchema).load("wasbs://" + blobContainer + "@" + blobStorage + ".blob.core.windows.net/rtT*.csv")
aTracking.writeStream \
.format("com.databricks.spark.sqldw") \
.option("url", sqlDwUrl) \
.option("tempDir", "wasbs://uploaddw@" + blobStorage + ".blob.core.windows.net/stream") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "stg_tr_energy_xmlin.csv_in") \
.option("checkpointLocation", "/checkpoint") \
.start()
有什么建议可能导致这种情况吗?
解决方案
在检查点位置跟踪结构化流查询的状态。“假设每个流源都有偏移量(类似于 Kafka 偏移量(...))来跟踪流中的读取位置。引擎使用检查点和预写日志来记录每个触发器中正在处理的数据的偏移量范围”。有关更多详细信息,请参阅Spark 文档(搜索检查点)。
因此,如果您想重新处理所有文件,请删除下面定义的检查点位置目录(或定义一个新目录):
.option("checkpointLocation", "/checkpoint").
此外,还检查了目标目录中的 _spark_metadata 文件,因此如果要再次写入所有数据,还应该清理目标目录(使用 Azure SQL 数据仓库临时目录)。