首页 > 解决方案 > Spark SQL 转换不返回数据(结构化流)

问题描述

我有一个 Kafka 流,我通过它获取基于 JSON 的 IoT 设备日志。我正在使用 pyspark 来处理流以分析和创建转换后的输出。

我的设备 json 如下所示:

{"messageid":"1209a714-811d-4ad6-82b7-5797511d159f",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}

{"messageid":"4d119126-2d12-412c-99c2-c159381bee5c",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}

我正在尝试转换日志,使其根据时间戳和传感器 ID 返回每个设备的唯一计数。结果 JSON 如下所示:

{
"sensor_id":"CAM_009",
"timestamp":"2020-01-20 19:04:32 +0530",
"location":"General Assembly Area",
count:2
}

我正在尝试的完整代码 - pyspark-kafka.py

spark = SparkSession.builder.appName('analytics').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
brokers='kafka-mybroker-url-host:9092'
readTopic = 'DetectionEntry'
outTopic = 'DetectionResults'

df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",brokers).option("subscribe",readTopic).load()

transaction_detail_df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
alert_schema = StructType() \
    .add("message_id", StringType()) \
    .add("mdsversion", StringType()) \
    .add("timestamp", StringType()) \
    .add("sensor_id", StringType()) \
    .add("location", StringType()) \
    .add("detection_class", StringType()) \

transaction_detail_df2 = transaction_detail_df1\
        .select(from_json(col("value"), alert_schema).alias("alerts"))

transaction_detail_df3 = transaction_detail_df2.select("alerts.*")
transaction_detail_df3 = transaction_detail_df3.withColumn("timestamp",to_timestamp(col("timestamp"),"YYYY-MM-DD HH:mm:ss SSSS")).withWatermark("timestamp", "500 milliseconds")

tempView = transaction_detail_df3.createOrReplaceTempView("alertsview")
results = spark.sql("select sensor_id, timestamp, location, count(*) as count from alertsview group by sensor_id, timestamp, location")
results.printSchema()

results_kakfa_output = results
results_kakfa_output.writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime='3 seconds') \
    .start().awaitTermination()

当我运行此代码时,我得到以下输出。总体目标是以 3 秒的时间间隔处理整个设备日志,并为该时间间隔内的设备的每个时间戳条目找到唯一计数。我已经在具有相同架构的 MySQL 数据库上尝试了 SQL 查询,它工作正常。但是,我在输出中没有得到进一步处理的结果。我无法弄清楚我在这里缺少什么。

在此处输入图像描述

标签: apache-sparkpysparkapache-kafkapyspark-sqlspark-structured-streaming

解决方案


推荐阅读