首页 > 解决方案 > KafkaProducer - GSSAPI 库不可用

问题描述

我正在编写一个 python 程序来连接到 Kafka 并读/写消息。

执行 python3 producer.py 时的生产者错误

File "/opt/local/bgdatapp/anaconda3/lib/python3.7/site-packages/kafka/conn.py", line 255, in init
assert gssapi is not None, 'GSSAPI lib not available'
AssertionError: GSSAPI lib not available
Exception ignored in: <function BrokerConnection.del at 0x7f715f4b3378>
Traceback (most recent call last):
File "/opt/local/bgdatapp/anaconda3/lib/python3.7/site-packages/kafka/conn.py", line 696, in del
self._close_socket()
File "/opt/local/bgdatapp/anaconda3/lib/python3.7/site-packages/kafka/conn.py", line 691, in _close_socket
if self._sock:
AttributeError: 'BrokerConnection' object has no attribute '_sock'
INFO:kafka.producer.kafka:Kafka producer closed

操作系统 - Red Hat Enterprise Linux Server 版本 6.10 (Santiago) Python3 - 3.7.1 - Anaconda3 Python Python3 路径 - /opt/local/bgdatapp/anaconda3/bin/python Kerberos - 5 Kafka - Cloudera 13.1

我可以从 shell 访问我的 kafka,并且可以推送和阅读消息。

kafka-console-producer --broker-list host.domain.com:9092 --topic Topic1 --producer.config client.properties

cat client.properties security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka

从 anaconda python 模拟同样会引发错误。

import os
import socket
import gssapi
import logging
from kafka import KafkaProducer
KAFKA_TOPIC = 'Topic1'
KAFKA_BROKERS = 'host.domain.com:9092'
os.environ['KAFKA_OPTS'] = '-Djava.security.auth.login.config=/opt/local/account1/jaas.conf'
logging.basicConfig(level=logging.INFO)
messages = [b'hello kafka', b'I am sending', b'3 test messages']
producer = KafkaProducer(bootstrap_servers=KAFKA_BROKERS, api_version=(0 , 10), security_protocol='SASL_PLAINTEXT', sasl_mechanism='GSSAPI', sasl_kerberos_service_name='kafka', max_request_size=3173440261)
for m in messages:
  print (producer.send(KAFKA_TOPIC, m).get(timeout=30))

错误:

File "/opt/local/bgdatapp/anaconda3/lib/python3.7/site-packages/kafka/conn.py", line 255, in init
    assert gssapi is not None, 'GSSAPI lib not available'
    AssertionError: GSSAPI lib not available
    Exception ignored in: <function BrokerConnection.del at 0x7f715f4b3378>
    Traceback (most recent call last):
    File "/opt/local/bgdatapp/anaconda3/lib/python3.7/site-packages/kafka/conn.py", line 696, in del
    self._close_socket()
    File "/opt/local/bgdatapp/anaconda3/lib/python3.7/site-packages/kafka/conn.py", line 691, in _close_socket
    if self._sock:
    AttributeError: 'BrokerConnection' object has no attribute '_sock'
    INFO:kafka.producer.kafka:Kafka producer closed

你能建议任何解决方案来解决这个问题吗?

谢谢

标签: pythonanacondagssapi

解决方案


这里实际上有两个错误。首先是gssapi导入错误;第二个是 gc/del 期间的错误(self._sock AttributeError)。第二个错误只是一个错误。我们将在下一个版本中解决这个问题。但是假设您修复了 gssapi 导入错误,那么第二个 AttributeError 应该不会发生。

好的,那么您安装了什么 gssapi 库?您是否能够在您的环境中执行此操作:

from gssapi.raw.misc import GSSError

看起来您可能已经安装了较旧/已弃用的 python-gssapi 模块。你想要这个:https ://pypi.org/project/gssapi/ (不是这个:https ://pypi.org/project/python-gssapi/ )


推荐阅读