pyspark - pyspark writeStream:单独的json文件中的每个Data Frame行
问题描述
我正在使用 pyspark 从 Kafka 主题中读取数据作为流数据帧,如下所示:
spark = SparkSession.builder \
.appName("Spark Structured Streaming from Kafka") \
.getOrCreate()
sdf = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.load() \
.select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))
sdf_ = sdf.select("parsed_value.*")
我的目标是将每一sdf_
行写为单独的json 文件。以下代码:
writing_sink = sdf_.writeStream \
.format("json") \
.option("path", "/Desktop/...") \
.option("checkpointLocation", "/Desktop/...") \
.start()
writing_sink.awaitTermination()
将在同一个 json 中写入几行数据帧,具体取决于微批次的大小(或者至少这是我的假设)。我需要调整上述内容,以便将数据帧的每一行写入单独的 json 文件中。
我也尝试过使用partitionBy('column')
,但这仍然不能完全满足我的需要,而是创建文件夹,其中 json 文件可能仍包含多行写入其中(如果它们具有相同的 id)。
有什么想法可以帮助到这里吗?提前致谢。
解决方案
发现以下选项可以解决问题:
.option("maxRecordsPerFile", 1)
推荐阅读
- swift - 为什么不在 Swift Widget 中使用 URLSession
- javascript - 如何从 Node-Fetch 获取特定的 cookie
- mongodb - 如何将电机的 open_download_stream 与 FastAPI 的 StreamingResponse 一起使用?
- idris - “Double 不是数字类型”是编译器错误吗?
- vb.net - Visual Basic:将文本框限制为特定字符
- javascript - 无法读取“Discord.MessageEmbed()”的属性
- swift - 如何使用 UIColorPickerViewController 更改两件事的颜色
- racket - 覆盖 Racket 中的按钮回调
- data-visualization - 选择可见数据
- bioinformatics - 星形索引生成 - 'std::bad_alloc' 错误