首页 > 解决方案 > 如何在 Windows 上读取文件夹中的文本文件并写入 Kafka Topic

问题描述

我的 Windows 10 机器上有一个本地设置的 Kafka。目前我仍在学习 Kafka 并在我的本地机器上测试它的功能。

我能够使用以下命令成功创建主题并通过命令提示符运行生产者和消费者。

.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
.\bin\windows\kafka-server-start.bat .\config\server.properties

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test

现在我正在尝试将网络文件夹中生成的文本文件的内容流式传输到 Kafka 主题。

我已经阅读了Spool Dir Connector,但不知道如何在 Windows 机器上配置它。

我正在考虑的替代选项是使用 python 读取文件,然后使用 kafka-python 将其写入 Kafka。在这种情况下,我不确定性能影响以及如何跟踪哪些文件已处理,哪些未处理。

from kafka import KafkaProducer
import os

source_dir = 'C:/PATH/TO/SOURCE/FOLDER/'

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
    value_serializer=lambda x: str(x).encode('utf-8'),
    key_serializer=lambda x: str(x).encode('utf-8'))

for filename in os.listdir(source_dir):
    with open(filename, mode='r') as f:
       data = f.read()
       producer.send('test', value=data)
    

这样做的正确方法是什么?任何帮助表示赞赏

编辑: 我尝试根据本文档在 Windows 上配置融合平台,因为配置 Spooldir 需要融合平台。我在启动 Zookeeper 时遇到错误

类路径为空。请首先构建项目,例如通过运行 'gradlew jarAll

标签: pythonapache-kafka

解决方案


这样做的正确方法是什么?

您的 Python 代码不会跟踪文件偏移或文件的成功读取,因此最好使用其他一些 spooldir 连接器

还值得指出的是,Kafka 记录的默认最大大小为 1MB,将其用于文件传输通常是一种反模式。相反,对文件使用一些共享的 NAS 或 blob/对象存储,并通过 Kafka 发送 URI 定位器

阅读有关 Spool Dir 连接器的信息,但无法弄清楚如何在 Windows 机器上配置它

指令应该和Linux一样,只是文件路径和脚本不同。Confluent hub cli 没有 Windows 二进制文件,因此您需要手动设置它

配置 Spooldir 需要融合平台。

它不是。此外,Confluent 平台仍在使用 Apache Kafka 和 Zookeeper,所以你会从中得到的任何错误,你应该在之前得到过


下载连接器

解压压缩包

查找和编辑connect-standalone.properties

编辑plugin.path以包含C:\\path\\to\\kafka-connect-spooldir(确保这是 JAR 文件所在的父目录)

在提取的文件夹中,您应该有另一个属性文件。配置那个

一旦你配置了这两个属性文件,使用它们来运行一个独立的连接服务器

bin\windows\connect-standalone config\connect-standalone.properties spooldir.properties

推荐阅读