apache-spark - 为什么我的 Spark Structured Streaming 作业不能批量写入 JDBC?
问题描述
我有一个 Spark Structured 流作业,它从源 Kafka 主题读取并使用forEachBatch写入两个接收器。
- 目的地卡夫卡主题
- JDBC 接收器 (MariaDB) 表
下面是一些附加上下文的伪代码:
def write_to_kafka_and_maria(df, epoch_id):
df.persist()
df=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "target_topic") \
.save()
df.write \
.format("jdbc") \
.options(
url="jdbc:mysql:dbserver:port/db",
driver="org.mariadb.jdbc.Driver",
dbtable="tablename",
user="username",
password="password",
batchSize=100000,
numPartition=10
).mode("append") \
.save()
df.unpersist()
# Read from the topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "source_topic") \
.load()
# Write to Kafka and MariaDB
write = df.writeStream.trigger(once=True).foreachBatch(write_to_kafka_and_maria).start()
write.awaitTermination()
当我运行我的工作时,我可以看到我的记录以大约 7-8k 条记录的速度到达我的目的地 Kafka。但是,直到工作快结束时,我才看到我的记录到达 MariaDB。此外,写入 Kafka 后不会立即执行 MariaDB 写入。相反,写入 MariaDB 会在写入目标主题完成后 10-12 分钟发生。这是预期的行为,还是我应该看到记录一个接一个地分批传播到两个接收器(因为这就是forEachBatch操作的目的)?似乎该作业从源 Kafka 主题读取数据两次,并在写入 Kafka 与 MariaDB 之前在后台执行不同的操作。提前致谢!
解决方案
推荐阅读
- oracle12c - 如何使用 ojdbc7.jar 连接到 Oracle 9i?
- c# - 如何从 ASP.NET Core 2.0 中的自定义中间件请求身份验证
- angularjs - 使用 AngularJS 和 ASP.NET 将日期转换为 dd-mm-yyyy
- python - 可视化 LDA 主题模型时出错
- java - Hibernate 无法解析列
- node.js - 安装 npm bootstrap 的问题
- php - PHP 邮件代码不工作
- php - 如何在 php 中使用 diff 创建日期间隔
- tensorflow - 在 NN 中指定连接(在 keras 中)
- html - 带搜索选项的引导选择