首页 > 解决方案 > Spark Structred Streaming Pyspark Sink Csv 不追加

问题描述

将 json 写入 Kafka 主题并从 kafka 主题中读取 json。实际上我订阅主题并逐行编写控制台。但我必须下沉/写入文件 csv. 但我不能。我写了一次 csv 但不追加。

你可以在下面看到我的代码。

谢谢!

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func
spark = SparkSession.builder\
                    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \
                    .appName('kafka_stream_test')\
                    .getOrCreate()
ordersSchema = StructType() \
        .add("a", StringType()) \
        .add("b", StringType()) \
        .add("c", StringType()) \
        .add("d", StringType())\
        .add("e", StringType())\
        .add("f", StringType())

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "product-views") \
    .load()\


df_query = df \
    .selectExpr("cast(value as string)") \
    .select(func.from_json(func.col("value").cast("string"),ordersSchema).alias("parsed"))\
    .select("parsed.a","parsed.b","parsed.c","parsed.d","parsed.e","parsed.f")\


df = df_query \
    .writeStream \
    .format("csv")\
    .trigger(processingTime = "5 seconds")\
    .option("path", "/var/kafka_stream_test_out/")\
    .option("checkpointLocation", "/user/kafka_stream_test_out/chk") \
    .start()

df.awaitTermination()

标签: python-3.xapache-sparkpysparkapache-kafkaspark-structured-streaming

解决方案


是的,因为您需要这个额外的选项.option("format", "append")

aa = df_query \
    .writeStream \
    .format("csv")\
    .option("format", "append")\
    .trigger(processingTime = "5 seconds")\
    .option("path", "/var/kafka_stream_test_out/")\
    .option("checkpointLocation", "/user/kafka_stream_test_out/chk") \
    .outputMode("append") \
    .start()

推荐阅读