首页 > 解决方案 > Kafka文件流连接和流API

问题描述

正在研究文件流连接器,我在文件中有超过一千万条记录(它不是单个文件,它按帐户#分区)。我必须将这些文件加载​​到主题中并更新我的流。经历了独立的流,我有以下问题,需要帮助来实现。

  1. 查看数据集,我有两个帐户#,每个帐户有 5 行,我需要将它们分组为两行并作为 acctNbr 键。

如何编写我的源连接器来读取文件并获取分组逻辑?

  1. 我的代理在 Linux 机器 X、Y、Z 中运行。源连接器的后期开发,我的 jar 文件是否应该部署在每个代理中(如果我开始在分布式代理中运行)?

  2. 我只有 30 分钟的窗口来提取文件拖放到主题?调整逻辑以降低工作窗口的所有参数是什么?仅供参考,该主题将设置超过 50 个分区和 3 个代理。

数据集:

{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}

标签: apache-kafkaapache-kafka-streamsapache-kafka-connectkafka-producer-api

解决方案


如何编写我的源连接器来读取文件并获取分组逻辑

FileSream 连接器无法做到这一点,除了编写您自己的连接器的示例之外,它并没有用于此目的。换句话说,不要在生产中使用。

话虽如此,您可以使用 Flume、Filebeat、Fluentd、NiFi、Streamsets 等替代解决方案来全局化您的文件路径,然后将所有记录逐行发送到 Kafka 主题中。

源连接器的后期开发,我的 jar 文件应该部署在每个代理中

您不应在任何代理上运行 Connect。Connect 服务器称为worker

只有 30 分钟的窗口来提取文件拖放到主题?

不清楚这个数字是从哪里来的。上面列出的任何上述方法都监视所有新文件,没有任何定义的窗口。


推荐阅读