python - 如何在 Windows 上读取文件夹中的文本文件并写入 Kafka Topic
问题描述
我的 Windows 10 机器上有一个本地设置的 Kafka。目前我仍在学习 Kafka 并在我的本地机器上测试它的功能。
我能够使用以下命令成功创建主题并通过命令提示符运行生产者和消费者。
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test
现在我正在尝试将网络文件夹中生成的文本文件的内容流式传输到 Kafka 主题。
我已经阅读了Spool Dir Connector,但不知道如何在 Windows 机器上配置它。
我正在考虑的替代选项是使用 python 读取文件,然后使用 kafka-python 将其写入 Kafka。在这种情况下,我不确定性能影响以及如何跟踪哪些文件已处理,哪些未处理。
from kafka import KafkaProducer
import os
source_dir = 'C:/PATH/TO/SOURCE/FOLDER/'
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: str(x).encode('utf-8'),
key_serializer=lambda x: str(x).encode('utf-8'))
for filename in os.listdir(source_dir):
with open(filename, mode='r') as f:
data = f.read()
producer.send('test', value=data)
这样做的正确方法是什么?任何帮助表示赞赏
编辑: 我尝试根据本文档在 Windows 上配置融合平台,因为配置 Spooldir 需要融合平台。我在启动 Zookeeper 时遇到错误
类路径为空。请首先构建项目,例如通过运行 'gradlew jarAll
解决方案
这样做的正确方法是什么?
您的 Python 代码不会跟踪文件偏移或文件的成功读取,因此最好使用其他一些 spooldir 连接器
还值得指出的是,Kafka 记录的默认最大大小为 1MB,将其用于文件传输通常是一种反模式。相反,对文件使用一些共享的 NAS 或 blob/对象存储,并通过 Kafka 发送 URI 定位器
阅读有关 Spool Dir 连接器的信息,但无法弄清楚如何在 Windows 机器上配置它
指令应该和Linux一样,只是文件路径和脚本不同。Confluent hub cli 没有 Windows 二进制文件,因此您需要手动设置它
配置 Spooldir 需要融合平台。
它不是。此外,Confluent 平台仍在使用 Apache Kafka 和 Zookeeper,所以你会从中得到的任何错误,你应该在之前得到过
解压压缩包
查找和编辑connect-standalone.properties
编辑plugin.path
以包含C:\\path\\to\\kafka-connect-spooldir
(确保这是 JAR 文件所在的父目录)
在提取的文件夹中,您应该有另一个属性文件。配置那个
一旦你配置了这两个属性文件,使用它们来运行一个独立的连接服务器
bin\windows\connect-standalone config\connect-standalone.properties spooldir.properties
推荐阅读
- angular - cellClicked 不调用函数 AgGrid Angular 2
- java - 无法安装应用程序,因为 Websphere SRVE0303 错误
- javascript - For循环和数组的一些数学问题
- wso2 - 使用 SQL Server Express 设置 WSO2 API Manager 2.2 时出现 JDBC SQL Server 异常
- java - Java8 toMap异常消息令人困惑
- jenkins - Groovy slacksend 函数在詹金斯管道中不起作用
- javascript - 带有 fontawesome 图标的 Html 下拉菜单
- php - 完全禁用 WooCommerce 商店页面,但保留搜索页面
- python - importError:没有名为 jose 的模块
- vba - 在Excel宏中复制粘贴功能时如何分配负号