首页 > 解决方案 > Pyspark 流式附加旋转批处理到 sql server

问题描述

我正在使用 pyspark 流从 kafka 服务器流式传输数据,逐批操作(使用 foreachBatch),并使用 jdbc 将每个批次附加到 Microsoft SQL 服务器。

这是我的代码的主要相关部分:

定义流

string_value_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", constants.kafka_server) \
    .option("subscribe", "topics") \
    .option("maxOffsetsPerTrigger", 1000) \
    .option("startingOffsets", "earliest") \
    .load() \
    .selectExpr("CAST(value AS STRING)")

定义模式和操作 df

schema = T.StructType([T.StructField('StationId', T.StringType(), False),
                            T.StructField('Date', T.StringType(), False),
                            T.StructField('Variable', T.StringType(), False),
                            T.StructField('Value', T.IntegerType(), False)])

json_df = string_value_df.select(F.from_json(F.col("value"),schema=schema).alias('json'))
streaming_df = json_df.select("json.*")

开始流式传输

query = df \
    .writeStream \
    .foreachBatch(write_to_sql) \
    .outputMode("Update") \
    .start() \
    .awaitTermination()

在每次批处理操作期间,我使用 pivot 将多个变量值记录转换为列形式的变量,同时对“StationID”和“Date”进行分组

原始数据形式(每批如何到达)是:

站号 日期 多变的 价值
一个 yyyyMMdd 聚氯乙烯 x1
一个 yyyyMMdd x2
一个 yyyyMMdd SNWD x3
一个 yyyyMMdd TMAX x4
一个 yyyyMMdd TMIN x5

在我的转型之后,包括枢轴:

站号 日期 聚氯乙烯 SNWD TMAX TMIN
一个 yyyyMMdd x1 x2 x3 x4 x5

这是应用于每个批次的函数:

打包一批

def write_to_sql(df, df_id):
    df = df.groupBy("StationId", "Date").pivot("Variable").sum("Value")

    try:
        df.write \
            .format("jdbc") \
            .mode("append") \
            .option("url", constants.url) \
            .option("dbtable", constants.table_name) \
            .option("user", constants.username) \
            .option("password", constants.password) \
            .save()
    except ValueError as error:
        print("Connector write failed", error)

我的问题是将新批次附加到服务器上的 SQL 表中。
当同一批次中有两行具有相同的 <StationId, Date> 时,枢轴工作正常并正确显示在服务器中。
在其中,如果特定 <StationId, Date> 对的多个(不同变量)记录在多个批次之间划分,那么在将其附加到服务器时,它似乎没有完全分组。

站号 日期 聚氯乙烯 SNWD TMAX TMIN
一个 yyyyMMdd x1 无效的 x3 无效的 无效的
一个 yyyyMMdd 无效的 x2 无效的 无效的 无效的

[x1 和 x3 出现在同一个批次中]
是否有任何有效的方法可以将批次附加到服务器,同时跨不同变量维护 <StationId, Date> 上的分组?
非常感谢

标签: sqlapache-sparkpysparkpivot

解决方案


您可以提前执行以下操作来匹配架构,这样就不会遇到麻烦:这是我尝试过滤掉 4 个变量PRCP, TMAX, TMIN,时的练习解决方案SNWD

batch_df = batch_df.filter(batch_df.Q_Flag.isNull()).drop("M_Flag", "Q_Flag", "S_Flag", "ObsTime")
batch_df = batch_df.withColumn("Year_Rec", F.expr("substring(Date,0,4)").cast(IntegerType())).withColumn(
        "Month_Rec", F.expr("substring(Date,5,2)").cast(IntegerType())).drop("Date")
batch_df = batch_df.filter(
        ((batch_df.Variable == "PRCP") & (batch_df.Value >= 0))
        | (batch_df.Variable == "TMAX")
        | (batch_df.Variable == "TMIN")
        | ((batch_df.Variable == "SNWD") & (batch_df.Value >= 0)))
    ger_swe_df = batch_df.groupBy("StationId", "Year_Rec", "Month_Rec", "Variable") \
        .agg(F.sum(F.col("Value")).alias("batch_sum"), F.count("*").alias("batch_count"))
    ger_swe_df = ger_swe_df.groupby("StationId", "Year_Rec", "Month_Rec").pivot("Variable") \
        .agg(F.first("batch_count").cast(LongType()).alias('batch_count'),
             F.first("batch_sum").cast(LongType()).alias('batch_sum'))
cols = {str(col)[:4] for col in ger_swe_df.columns if str(col)[:4] in vars}
missing = vars - cols
for var in missing:
    ger_swe_df = ger_swe_df.withColumn(var + '_batch_count', F.lit(None).cast(LongType())).withColumn(
            var + '_batch_sum', F.lit(None).cast(LongType()))
ger_swe_df = ger_swe_df.select("StationId", "Year_Rec", "Month_Rec", "PRCP_batch_count", 'PRCP_batch_sum',
                                   'TMAX_batch_count', 'TMAX_batch_sum', 'TMIN_batch_count', 'TMIN_batch_sum',
                                   'SNWD_batch_count', 'SNWD_batch_sum') \
        .withColumn("Season", ((F.col("Month_Rec") % 12) / 3 + 1).cast(IntegerType()))
ger_swe_df.withColumn("batchId", F.lit(batch_id)) \
        .write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("append") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password) \
        .save()

推荐阅读