首页 > 解决方案 > 为什么我的 Spark Structured Streaming 作业不能批量写入 JDBC?

问题描述

我有一个 Spark Structured 流作业,它从源 Kafka 主题读取并使用forEachBatch写入两个接收器。

下面是一些附加上下文的伪代码:

def write_to_kafka_and_maria(df, epoch_id):
    df.persist()

    df=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    df.write \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
      .option("topic", "target_topic") \
      .save()

    df.write \
      .format("jdbc") \
      .options(
        url="jdbc:mysql:dbserver:port/db",
        driver="org.mariadb.jdbc.Driver",
        dbtable="tablename",
        user="username",
        password="password",
        batchSize=100000,
        numPartition=10
      ).mode("append") \
      .save()
     
     df.unpersist()

# Read from the topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "source_topic") \
  .load()

# Write to Kafka and MariaDB
write = df.writeStream.trigger(once=True).foreachBatch(write_to_kafka_and_maria).start()

write.awaitTermination()

当我运行我的工作时,我可以看到我的记录以大约 7-8k 条记录的速度到达我的目的地 Kafka。但是,直到工作快结束时,我才看到我的记录到达 MariaDB。此外,写入 Kafka 后不会立即执行 MariaDB 写入。相反,写入 MariaDB 会在写入目标主题完成后 10-12 分钟发生。这是预期的行为,还是我应该看到记录一个接一个地分批传播到两个接收器(因为这就是forEachBatch操作的目的)?似乎该作业从源 Kafka 主题读取数据两次,并在写入 Kafka 与 MariaDB 之前在后台执行不同的操作。提前致谢!

标签: apache-sparkpysparkapache-spark-sqlmariadbspark-structured-streaming

解决方案


推荐阅读