python - 将 Kafka 与 Python 结合使用的最小示例是什么?
问题描述
我试过的
- 我克隆了https://github.com/wurstmeister/kafka-docker并执行了
sudo docker-compuse up
. - 我开始了
producer.py
下面列出的。 - 我开始了
consumer.py
下面列出的。
这没有用。我改变ports
了docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
开始之后,producer.py
完成的执行和 docker-compose 终端显示
zookeeper_1 | 2018-09-05 14:21:44,001 [myid:] - INFO [SessionTracker:ZooKeeperServer@358] - Expiring session 0x165aa1acb900000, timeout of 6000ms exceeded
zookeeper_1 | 2018-09-05 14:21:44,002 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x165aa1acb900000
kafka_1 | [2018-09-05 14:21:44,028] INFO Creating /controller (is it secure? false) (kafka.zk.KafkaZkClient)
kafka_1 | [2018-09-05 14:21:44,033] INFO Result of znode creation at /controller is: OK (kafka.zk.KafkaZkClient)
zookeeper_1 | 2018-09-05 14:21:44,141 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:delete cxid:0x32 zxid:0x6f txntype:-1 reqpath:n/a Error Path:/admin/reassign_partitions Error:KeeperErrorCode = NoNode for /admin/reassign_partitions
zookeeper_1 | 2018-09-05 14:21:44,152 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:delete cxid:0x34 zxid:0x70 txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
zookeeper_1 | 2018-09-05 14:21:47,621 [myid:] - INFO [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@649] - Got user-level KeeperException when processing sessionid:0x165aa1c265e0000 type:setData cxid:0x3c zxid:0x71 txntype:-1 reqpath:n/a Error Path:/config/topics/mytopic Error:KeeperErrorCode = NoNode for /config/topics/mytopic
kafka_1 | [2018-09-05 14:21:47,628] INFO Topic creation Map(mytopic-0 -> ArrayBuffer(1003)) (kafka.zk.AdminZkClient)
kafka_1 | [2018-09-05 14:21:47,639] INFO [KafkaApi-1003] Auto creation of topic mytopic with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
主题已经创建,很好。但是,当我执行消费者时,他们什么都不做。但是 docker-compose 显示
kafka_1 | [2018-09-05 14:24:52,566] ERROR [KafkaApi-1003] Number of alive brokers '0' does not meet the required replication factor '1' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)
我怎样才能有一个最小的 Kafka 安装/设置来查看 Kafka 与 Python 一起工作?
生产者.py
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'localhost:9092'})
p.produce('mytopic', key='hello', value='world')
print("produce done")
p.flush(10)
消费者.py
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe(['mytopic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
解决方案
试试这个环境部分
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://localhost:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
如果您没有在 Python docker 容器中运行代码,请添加9094:9094
到端口并将 Python 指向localhost:9094
Confluent 映像具有类似的设置,但使用端口 29092
推荐阅读
- javascript - 根据 ID 从 MongoDB 集合中删除特定元素
- python - 我正在使用 selenium 来抓取本地网络
- bash - BASH "touch" 和 "ln-s" 在 for 循环中
- firebase - Xamarin 使用 Firebase 形成跨平台应用程序身份验证
- ios - 不能同时识别 SwiftUI 和 UIView(Controller)Representable 中的触摸/手势
- reactjs - React Hooks 在路由之间不起作用,但在刷新时起作用
- python - 如何修复 HoughCircles 函数的错误?
- python - 跨产品 Python
- reactjs - 在已部署的 AWS 环境中使用 cookie 时出现 CORS 问题
- java - 通过在 constraintLayout 中编程来设置 imageView 的边距