首页 > 解决方案 > pyspark writeStream:单独的json文件中的每个Data Frame行

问题描述

我正在使用 pyspark 从 Kafka 主题中读取数据作为流数据帧,如下所示:

spark = SparkSession.builder \
  .appName("Spark Structured Streaming from Kafka") \
  .getOrCreate()

sdf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .option("startingOffsets", "latest") \
  .option("failOnDataLoss", "false") \
  .load() \
  .select(from_json(col("value").cast("string"), json_schema).alias("parsed_value"))

sdf_ = sdf.select("parsed_value.*")

我的目标是将每一sdf_行写为单独的json 文件。以下代码:

writing_sink = sdf_.writeStream \
    .format("json") \
    .option("path", "/Desktop/...") \
    .option("checkpointLocation", "/Desktop/...") \
    .start()

writing_sink.awaitTermination()

将在同一个 json 中写入几行数据帧,具体取决于微批次的大小(或者至少这是我的假设)。我需要调整上述内容,以便将数据帧的每一行写入单独的 json 文件中。

我也尝试过使用partitionBy('column'),但这仍然不能完全满足我的需要,而是创建文件夹,其中 json 文件可能仍包含多行写入其中(如果它们具有相同的 id)。

有什么想法可以帮助到这里吗?提前致谢。

标签: pysparkspark-streamingpyspark-dataframes

解决方案


发现以下选项可以解决问题:

   .option("maxRecordsPerFile", 1)

推荐阅读