apache-spark - 无法从 spark 读取 Azure Eventhub 主题
问题描述
环境细节
- 火花版本:3.x
- Python 3.8 版和 Java 8 版
- 天蓝色-eventhubs-spark_2.12-2.3.17.jar
import json
from pyspark.sql import SparkSession
#the below command getOrCreate() uses the SparkSession shared across the jobs instead of using one SparkSession per job.
spark = SparkSession.builder.appName('ntorq_eventhub_load').getOrCreate()
#ntorq adls checkpoint location.
ntorq_connection_string = "connection-string"
ehConf = {}
ehConf['eventhubs.connectionString'] = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(ntorq_connection_string)
# ehConf['eventhubs.connectionString'] = ntorq_connection_string
ehConf['eventhubs.consumerGroup'] = "$default"
OFFSET_START = "-1" # the beginning
OFFSET_END = "@latest"
# Create the positions
startingEventPosition = {
"offset": OFFSET_START ,
"seqNo": -1, #not in use
"enqueuedTime": None, #not in use
"isInclusive": True
}
endingEventPosition = {
"offset": OFFSET_END, #not in use
"seqNo": -1, #not in use
"enqueuedTime": None,
"isInclusive": True
}
# Put the positions into the Event Hub config dictionary
ehConf["eventhubs.startingPosition"] = json.dumps(startingEventPosition)
ehConf["eventhubs.endingPosition"] = json.dumps(endingEventPosition)
df = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.load() \
.selectExpr("cast(body as string) as body_str")
df.writeStream \
.format("console") \
.start()
错误
21/04/25 20:17:53 WARN Utils: Your hostname,resolves to a loopback address: 127.0.0.1; using 192.168.1.202 instead (on interface en0)
21/04/25 20:17:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/04/25 20:17:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):
File "/Users/PycharmProjects/pythonProject/test.py", line 12, in <module>
ehConf['eventhubs.connectionString'] = spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(ntorq_connection_string)
TypeError: 'JavaPackage' object is not callable
代码在 databricks 环境中运行良好但无法使用来自 eventthub 的所有消息我尝试在每次运行之前清除默认检查点文件夹但仍然面临问题,所以想在本地系统上尝试。在尝试面临 JavaPackage 问题的本地环境时。感谢任何帮助。谢谢你
解决方案
创建会话时需要添加 EventHubs 包:
park = SparkSession.builder.appName('ntorq_eventhub_load')\
.config("spark.jars.packages", "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.18")\
.getOrCreate()
推荐阅读
- c - 结构内的释放 char**
- c# - .net core data annotation display Name - inherite to viewmodels
- scala - Spark dataframe cast column for Kudu compatibility
- pentaho-data-integration - In Pentaho Data Integration can I output conditionally?
- excel - 如何使变量从单个单元格中获取多个值并将其用作自动过滤条件
- android - Jenkins:Gradle 构建:AAPT2 问题:AAPT2 aapt2-3.4.0-5326820-windows 守护进程 #0:守护进程启动失败
- java - 用于匹配数学表达式的正则表达式
- python - 使用 Pandas 合并 csv 文件中的数据
- php - 如何使用codeigniter中的引物键从子表中获取图像
- testing - TFS 2017 测试任务版本 2 无法获得功能标志警告