python-3.x - Spark Structred Streaming Pyspark Sink Csv 不追加
问题描述
将 json 写入 Kafka 主题并从 kafka 主题中读取 json。实际上我订阅主题并逐行编写控制台。但我必须下沉/写入文件 csv. 但我不能。我写了一次 csv 但不追加。
你可以在下面看到我的代码。
谢谢!
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func
spark = SparkSession.builder\
.config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \
.appName('kafka_stream_test')\
.getOrCreate()
ordersSchema = StructType() \
.add("a", StringType()) \
.add("b", StringType()) \
.add("c", StringType()) \
.add("d", StringType())\
.add("e", StringType())\
.add("f", StringType())
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "product-views") \
.load()\
df_query = df \
.selectExpr("cast(value as string)") \
.select(func.from_json(func.col("value").cast("string"),ordersSchema).alias("parsed"))\
.select("parsed.a","parsed.b","parsed.c","parsed.d","parsed.e","parsed.f")\
df = df_query \
.writeStream \
.format("csv")\
.trigger(processingTime = "5 seconds")\
.option("path", "/var/kafka_stream_test_out/")\
.option("checkpointLocation", "/user/kafka_stream_test_out/chk") \
.start()
df.awaitTermination()
解决方案
是的,因为您需要这个额外的选项.option("format", "append")
:
aa = df_query \
.writeStream \
.format("csv")\
.option("format", "append")\
.trigger(processingTime = "5 seconds")\
.option("path", "/var/kafka_stream_test_out/")\
.option("checkpointLocation", "/user/kafka_stream_test_out/chk") \
.outputMode("append") \
.start()
推荐阅读
- scala - 面向新手的 UDF 问题中的 Java 字符串
- angularjs - 使用 `$mdDialog.show` 使用第二个组件更改一个组件中的值
- typescript - 如何在 JSDoc 中为 TypeScript 声明具有附加属性的子类型?
- elasticsearch - 为 `+` RFC7159 配置 ElasticSearch?
- php - 我的服务器端 PHP 在 reCaptcha 后仍然被黑客入侵(数百封垃圾邮件)
- android - 使用 adb 查找连接的蓝牙设备名称?
- python - Python从字符串中删除动态子字符串
- javascript - 访问嵌套 JSON 以在 Vue.JS 组件中使用的正确方法是什么?
- r - 这两个有什么区别...一个很好一个还南
- java - 撤消自定义 jar 任务中的文件排除