首页 > 解决方案 > Flink Kafka Table API for python with JAAS

问题描述

我正在使用 Flink 1.11.2 版本的 Python Table API 使用 SASL 协议连接到 Kafka 主题,但它失败并出现以下错误。我在 Flink java 版本中尝试了相同的属性,并且能够连接。有没有人遇到过这个问题,你是如何解决的?

Caused by: javax.security.auth.login.LoginException: unable to find LoginModule class: org.apache.kafka.common.security.plain.PlainLoginModule

设置:

kafka = Kafka()\
        .version("universal") \
        .topic("test_topic")\
        .property("group.id", "consumer_group")\
        .property("security.protocol", "SASL_PLAINTEXT")\
        .property("sasl.mechanism", "PLAIN")\
        .property("bootstrap.servers",
                  "<remoteIP>:9093")\
        .property("sasl.jaas.config",
                  "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" "
                  "password=\"abc\";")\
        .start_from_latest()

标签: pythonapache-kafkaapache-flinkjaassasl

解决方案


我看到这个问题非常古老,但在确定如何在没有org.apache.kafka.common.security.plain.PlainLoginModule的情况下配置 SASL 时也遇到了问题。

最终让它使用以下配置,并将其发布在此处以供参考(通过 SQL 连接器指令在 python 中使用 Table API):

  KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"kafka-0-external:9094,kafka-1- external:9094,kafka-2-external:9094,kafka-3-external:9094,kafka-4- external:9094").split(',')
  KAFKA_USERNAME = "XXX"
  KAFKA_PASSWORD = "pass"
  KAFKA_SOURCE_TOPIC = 'topic'

  source_ddl = f"""
            CREATE TABLE source_table(
                entry STRING
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE_TOPIC}',
              'properties.bootstrap.servers' = '{','.join(KAFKA_SERVERS)}',
              'properties.group.id' = 'testgroup12',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'raw'
            )
            """

  env = StreamExecutionEnvironment.get_execution_environment()
  env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
  settings = EnvironmentSettings.new_instance()\
                      .in_streaming_mode()\
                      .use_blink_planner()\
                      .build()

  t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
  t_env.execute_sql(source_ddl).wait()


推荐阅读