json - 为什么 PySpark 在尝试将字符串从 Kafka 主题加载到字典时会抛出 JSONDecodeError?
问题描述
我正在使用Spark Structured Streaming从 Kafka 主题(记录为 JSON 格式)中读取数据,目的是在将数据输入 Spark 后执行一些转换。我的第一步是将主题中的记录作为字符串读取,然后将它们转换为字典(通过 pythonjson
库)以执行转换(我不想to_json
在 Spark 中使用该功能,因为我的 Kafka 主题中的记录格式可能更改,我不想使用不需要模式的Spark Streaming,因为没有本机功能可以写回主题)。JSONDecodeError
我在尝试转换字符串时收到(如下)。
以下是产生我的错误的组件:
我在本地运行以下 PySpark 代码:
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
import pyspark
import json
from pyspark.sql.types import StringType
def str_to_json(s):
j = json.loads(s)
return j
if __name__ == "__main__":
spark = SparkSession.builder \
.master("local") \
.appName("App Name") \
.getOrCreate()
strToJson = udf(str_to_json, StringType())
spark.udf.register("strToJson", strToJson)
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "first_topic") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df_transformed = df.withColumn('strToJson', strToJson('value'))
query = df_transformed \
.writeStream \
.format("console") \
.outputMode("update") \
.start()
kafka-console-producer
然后,我也使用本地计算机上的 Kafka 主题以以下格式写入记录:
'{"name":"bob"}'
当 Spark 尝试使用 UDF 将记录的值(来自 Kafka 主题)转换为 PySpark 作业中的 JSON/字典对象时,会向控制台输出以下错误:
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我在 UDF 中添加了几个调试打印语句,以查看传入的值:
>>> type(s)
<class 'str'>
>>> print(s)
'{"name": "bob"}'
解决方案
推荐阅读
- node.js - 如何使 socket.io 在 pm2 集群模式下正常工作?
- excel - 将特定列复制到另一个工作表
- android - 使用 Visual Studio 在 Flutter 上运行 Gradle 时出错
- windows - 我需要在 Ansible 中找到除一两个文件之外的所有文件
- php - 如何将数据添加到表中的现有记录
- c# - 具有通用约束的通用接口无法转换
- javascript - 指向子对象时引用整个数组的对象数组的索引
- vue.js - 如何加载 JSON 配置文件并让 Vue 应用程序等待这个?
- python - get_dummies() 用于多个 Pandas DataFrame
- php - 如何制作多语言联系表