python - Databricks:来自 Kafka 的 Spark 结构化流卡在“流初始化”中
问题描述
我想在带有 kafka 源的数据块中创建结构化流。我按照此处所述的说明进行操作。我的脚本似乎开始了,但是我无法在 databricks 笔记本中打印/输出某些内容。当我使用时,流 itsellf 工作正常并产生结果和工作(在数据块中)confluent_kafka
,因此我似乎缺少了一个不同的问题:
该脚本似乎“卡在”“运行命令”/“流初始化”中。
任何输入都非常感谢!
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Define a data schema
schema = StructType() \
.add('PARAMETERS_TEXTVALUES_070_VALUES', StringType())\
.add('ID', StringType())\
.add('PARAMETERS_TEXTVALUES_001_VALUES', StringType())\
.add('TIMESTAMP', TimestampType())
df = spark \
.readStream \
.format("kafka") \
.option("host", "stream.xxx.com") \
.option("port", 12345)\
.option('kafka.bootstrap.servers', 'stream.xxx.com:12345') \
.option('subscribe', 'stream_test.json') \
.option("startingOffset", "earliest") \
.load()
df_word = df.select(F.col('key').cast('string'),
F.from_json(F.col('value').cast('string'), schema).alias("parsed_value"))
# Group by id and count
df_group = df_word.select('parsed_value.*')\
.groupBy('ID').count()
query = df_group \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
我的流输出数据如下所示:
"PARAMETERS_TEXTVALUES_070_VALUES":'something'
"ID":"47575963333908"
"PARAMETERS_TEXTVALUES_001_VALUES":12345
"TIMESTAMP": "2020-10-22T15:06:42.507+02:00"
澄清一下:我正在尝试将组件打印query
到笔记本上以测试连接。此单元格之后或上述单元格之前没有单元格。
谢谢并保持安全。
解决方案
推荐阅读
- css - 如何添加恰好为 1 行高的填充或边距
- wordpress - 如何修复不再响应的网站?
- spring - Spring Security OAuth2 服务器的客户端登录表单
- sql-server - SQL Server中如何将数据行转换为列
- angular - 导入 json 文件来模拟我的测试数据
- react-native - 应用程序索引是否可以在反应原生 android 应用程序中进行?
- java - java.lang.IllegalArgumentException:未指定数据源 Junit 和 Mockito
- python - Python 无法识别 Windows 上包含回车的文件名
- tensorflow - 迁移学习:从我的预训练模型训练产生大尺寸模型
- azure - 如何使用异步方法从 Azure 函数中的存储读取