python - Flink Kafka Table API for python with JAAS
问题描述
我正在使用 Flink 1.11.2 版本的 Python Table API 使用 SASL 协议连接到 Kafka 主题,但它失败并出现以下错误。我在 Flink java 版本中尝试了相同的属性,并且能够连接。有没有人遇到过这个问题,你是如何解决的?
Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule
设置:
kafka = Kafka()\
.version("universal") \
.topic("test_topic")\
.property("group.id", "consumer_group")\
.property("security.protocol", "SASL_PLAINTEXT")\
.property("sasl.mechanism", "PLAIN")\
.property("bootstrap.servers",
"<remoteIP>:9093")\
.property("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" "
"password=\"abc\";")\
.start_from_latest()
解决方案
我看到这个问题非常古老,但在确定如何在没有org.apache.kafka.common.security.plain.PlainLoginModule的情况下配置 SASL 时也遇到了问题。
最终让它使用以下配置,并将其发布在此处以供参考(通过 SQL 连接器指令在 python 中使用 Table API):
KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"kafka-0-external:9094,kafka-1- external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4- external:9094").split(',')
KAFKA_USERNAME = "XXX"
KAFKA_PASSWORD = "pass"
KAFKA_SOURCE_TOPIC = 'topic'
source_ddl = f"""
CREATE TABLE source_table(
entry STRING
) WITH (
'connector' = 'kafka',
'topic' = '{KAFKA_SOURCE_TOPIC}',
'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
'properties.group.id' = 'testgroup12',
'properties.sasl.mechanism' = 'PLAIN',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
'scan.startup.mode' = 'latest-offset',
'format' = 'raw'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
settings = EnvironmentSettings.new_instance()\
.in_streaming_mode()\
.use_blink_planner()\
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
t_env.execute_sql(source_ddl).wait()
推荐阅读
- reactjs - 在 react js 中创建一个计数器并获取开始停止时间和日期
- java - 如何在 JFrame 标题栏上放置一个按钮
- numpy - 多类逻辑回归
- go - Graphql 订阅不适用于 Gin
- json - 从管道中的 Groovy 变量创建 JSON 文件 - jenkins 作业
- c# - Windows 服务器中的 System.Net.Mail.SmtpException
- r - 带有两个条件和 NA 的 if 语句
- reactjs - 添加新组件时自动重置错误
- php - Laravel ShouldBeUnique 不会让工作变得独一无二
- angular - 角度使图表中的选项可选选项