sql - Pyspark 流式附加旋转批处理到 sql server
问题描述
我正在使用 pyspark 流从 kafka 服务器流式传输数据,逐批操作(使用 foreachBatch),并使用 jdbc 将每个批次附加到 Microsoft SQL 服务器。
这是我的代码的主要相关部分:
定义流
string_value_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", constants.kafka_server) \
.option("subscribe", "topics") \
.option("maxOffsetsPerTrigger", 1000) \
.option("startingOffsets", "earliest") \
.load() \
.selectExpr("CAST(value AS STRING)")
定义模式和操作 df
schema = T.StructType([T.StructField('StationId', T.StringType(), False),
T.StructField('Date', T.StringType(), False),
T.StructField('Variable', T.StringType(), False),
T.StructField('Value', T.IntegerType(), False)])
json_df = string_value_df.select(F.from_json(F.col("value"),schema=schema).alias('json'))
streaming_df = json_df.select("json.*")
开始流式传输
query = df \
.writeStream \
.foreachBatch(write_to_sql) \
.outputMode("Update") \
.start() \
.awaitTermination()
在每次批处理操作期间,我使用 pivot 将多个变量值记录转换为列形式的变量,同时对“StationID”和“Date”进行分组
原始数据形式(每批如何到达)是:
站号 | 日期 | 多变的 | 价值 |
---|---|---|---|
一个 | yyyyMMdd | 聚氯乙烯 | x1 |
一个 | yyyyMMdd | 雪 | x2 |
一个 | yyyyMMdd | SNWD | x3 |
一个 | yyyyMMdd | TMAX | x4 |
一个 | yyyyMMdd | TMIN | x5 |
在我的转型之后,包括枢轴:
站号 | 日期 | 聚氯乙烯 | 雪 | SNWD | TMAX | TMIN |
---|---|---|---|---|---|---|
一个 | yyyyMMdd | x1 | x2 | x3 | x4 | x5 |
这是应用于每个批次的函数:
打包一批
def write_to_sql(df, df_id):
df = df.groupBy("StationId", "Date").pivot("Variable").sum("Value")
try:
df.write \
.format("jdbc") \
.mode("append") \
.option("url", constants.url) \
.option("dbtable", constants.table_name) \
.option("user", constants.username) \
.option("password", constants.password) \
.save()
except ValueError as error:
print("Connector write failed", error)
我的问题是将新批次附加到服务器上的 SQL 表中。
当同一批次中有两行具有相同的 <StationId, Date> 时,枢轴工作正常并正确显示在服务器中。
在其中,如果特定 <StationId, Date> 对的多个(不同变量)记录在多个批次之间划分,那么在将其附加到服务器时,它似乎没有完全分组。
站号 | 日期 | 聚氯乙烯 | 雪 | SNWD | TMAX | TMIN |
---|---|---|---|---|---|---|
一个 | yyyyMMdd | x1 | 无效的 | x3 | 无效的 | 无效的 |
一个 | yyyyMMdd | 无效的 | x2 | 无效的 | 无效的 | 无效的 |
[x1 和 x3 出现在同一个批次中]
是否有任何有效的方法可以将批次附加到服务器,同时跨不同变量维护 <StationId, Date> 上的分组?
非常感谢
解决方案
您可以提前执行以下操作来匹配架构,这样就不会遇到麻烦:这是我尝试过滤掉 4 个变量PRCP
, TMAX
, TMIN
,时的练习解决方案SNWD
。
batch_df = batch_df.filter(batch_df.Q_Flag.isNull()).drop("M_Flag", "Q_Flag", "S_Flag", "ObsTime")
batch_df = batch_df.withColumn("Year_Rec", F.expr("substring(Date,0,4)").cast(IntegerType())).withColumn(
"Month_Rec", F.expr("substring(Date,5,2)").cast(IntegerType())).drop("Date")
batch_df = batch_df.filter(
((batch_df.Variable == "PRCP") & (batch_df.Value >= 0))
| (batch_df.Variable == "TMAX")
| (batch_df.Variable == "TMIN")
| ((batch_df.Variable == "SNWD") & (batch_df.Value >= 0)))
ger_swe_df = batch_df.groupBy("StationId", "Year_Rec", "Month_Rec", "Variable") \
.agg(F.sum(F.col("Value")).alias("batch_sum"), F.count("*").alias("batch_count"))
ger_swe_df = ger_swe_df.groupby("StationId", "Year_Rec", "Month_Rec").pivot("Variable") \
.agg(F.first("batch_count").cast(LongType()).alias('batch_count'),
F.first("batch_sum").cast(LongType()).alias('batch_sum'))
cols = {str(col)[:4] for col in ger_swe_df.columns if str(col)[:4] in vars}
missing = vars - cols
for var in missing:
ger_swe_df = ger_swe_df.withColumn(var + '_batch_count', F.lit(None).cast(LongType())).withColumn(
var + '_batch_sum', F.lit(None).cast(LongType()))
ger_swe_df = ger_swe_df.select("StationId", "Year_Rec", "Month_Rec", "PRCP_batch_count", 'PRCP_batch_sum',
'TMAX_batch_count', 'TMAX_batch_sum', 'TMIN_batch_count', 'TMIN_batch_sum',
'SNWD_batch_count', 'SNWD_batch_sum') \
.withColumn("Season", ((F.col("Month_Rec") % 12) / 3 + 1).cast(IntegerType()))
ger_swe_df.withColumn("batchId", F.lit(batch_id)) \
.write \
.format("com.microsoft.sqlserver.jdbc.spark") \
.mode("append") \
.option("url", url) \
.option("dbtable", table_name) \
.option("user", username) \
.option("password", password) \
.save()
推荐阅读
- php - 两个复选框以相同的形式相互干扰
- java - REST 项目错误(Tomcat:HTTP 错误 404)
- r - 累积计数而不是最后一个文件
- reactjs - 使用箭头运算符的数组并尝试按插入顺序推送项目?
- azure - 如何创建允许注册应用程序和服务主体的 Azure 自定义角色
- r - R将多个数据表附加到列表
- orbeon - 基于数据类型的 Orbeon if 语句
- tensorflow - 谷歌云机器学习引擎:使用 TensorFlow 1.10?
- python-3.x - python v3.7 字典帮助
- c++ - Vector of non-static void member pointer functions with this