apache-spark - Spark SQL 转换不返回数据(结构化流)
问题描述
我有一个 Kafka 流,我通过它获取基于 JSON 的 IoT 设备日志。我正在使用 pyspark 来处理流以分析和创建转换后的输出。
我的设备 json 如下所示:
{"messageid":"1209a714-811d-4ad6-82b7-5797511d159f",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}
{"messageid":"4d119126-2d12-412c-99c2-c159381bee5c",
"mdsversion":"1.0",
"timestamp":"2020-01-20 19:04:32 +0530",
"sensor_id":"CAM_009",
"location":"General Assembly Area",
"detection_class":"10"}
我正在尝试转换日志,使其根据时间戳和传感器 ID 返回每个设备的唯一计数。结果 JSON 如下所示:
{
"sensor_id":"CAM_009",
"timestamp":"2020-01-20 19:04:32 +0530",
"location":"General Assembly Area",
count:2
}
我正在尝试的完整代码 - pyspark-kafka.py
spark = SparkSession.builder.appName('analytics').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
brokers='kafka-mybroker-url-host:9092'
readTopic = 'DetectionEntry'
outTopic = 'DetectionResults'
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers",brokers).option("subscribe",readTopic).load()
transaction_detail_df1 = df.selectExpr("CAST(value AS STRING)", "timestamp")
alert_schema = StructType() \
.add("message_id", StringType()) \
.add("mdsversion", StringType()) \
.add("timestamp", StringType()) \
.add("sensor_id", StringType()) \
.add("location", StringType()) \
.add("detection_class", StringType()) \
transaction_detail_df2 = transaction_detail_df1\
.select(from_json(col("value"), alert_schema).alias("alerts"))
transaction_detail_df3 = transaction_detail_df2.select("alerts.*")
transaction_detail_df3 = transaction_detail_df3.withColumn("timestamp",to_timestamp(col("timestamp"),"YYYY-MM-DD HH:mm:ss SSSS")).withWatermark("timestamp", "500 milliseconds")
tempView = transaction_detail_df3.createOrReplaceTempView("alertsview")
results = spark.sql("select sensor_id, timestamp, location, count(*) as count from alertsview group by sensor_id, timestamp, location")
results.printSchema()
results_kakfa_output = results
results_kakfa_output.writeStream \
.format("console") \
.outputMode("append") \
.trigger(processingTime='3 seconds') \
.start().awaitTermination()
当我运行此代码时,我得到以下输出。总体目标是以 3 秒的时间间隔处理整个设备日志,并为该时间间隔内的设备的每个时间戳条目找到唯一计数。我已经在具有相同架构的 MySQL 数据库上尝试了 SQL 查询,它工作正常。但是,我在输出中没有得到进一步处理的结果。我无法弄清楚我在这里缺少什么。
解决方案
推荐阅读
- python-3.x - 无法从 OpenGL_accelerate 加载 numpy_formathandler 加速器
- spring - 值对象为 @AggregateIdentifier 和 @TargetAggregateIdentifier
- spring - Spring Cloud Contract - 字符串数组作为查询参数
- xamarin.forms - 从视图模型到另一个视图的方向
- laravel - Laravel/php 应用程序不执行更改
- angular - 如何提供资产文件夹之外的图像?
- android - BottomSheetFragment 是否需要 ViewModel?
- github - 来自分叉的拉取请求不会触发 travis ci
- c# - 计算有多少用户添加到数据库
- java - 使用 api 提供的签名哈希对 PDF 进行签名