apache-kafka - Kafka文件流连接和流API
问题描述
正在研究文件流连接器,我在文件中有超过一千万条记录(它不是单个文件,它按帐户#分区)。我必须将这些文件加载到主题中并更新我的流。经历了独立的流,我有以下问题,需要帮助来实现。
- 查看数据集,我有两个帐户#,每个帐户有 5 行,我需要将它们分组为两行并作为 acctNbr 键。
如何编写我的源连接器来读取文件并获取分组逻辑?
我的代理在 Linux 机器 X、Y、Z 中运行。源连接器的后期开发,我的 jar 文件是否应该部署在每个代理中(如果我开始在分布式代理中运行)?
我只有 30 分钟的窗口来提取文件拖放到主题?调整逻辑以降低工作窗口的所有参数是什么?仅供参考,该主题将设置超过 50 个分区和 3 个代理。
数据集:
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"1234567","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-01","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-02","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-03","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-04","currentPrice":"10","availQnty":"10"}
{"acctNbr":"abc3355","secNbr":"AAPL","date":"2010-01-05","currentPrice":"10","availQnty":"10"}
解决方案
如何编写我的源连接器来读取文件并获取分组逻辑
FileSream 连接器无法做到这一点,除了编写您自己的连接器的示例之外,它并没有用于此目的。换句话说,不要在生产中使用。
话虽如此,您可以使用 Flume、Filebeat、Fluentd、NiFi、Streamsets 等替代解决方案来全局化您的文件路径,然后将所有记录逐行发送到 Kafka 主题中。
源连接器的后期开发,我的 jar 文件应该部署在每个代理中
您不应在任何代理上运行 Connect。Connect 服务器称为worker。
只有 30 分钟的窗口来提取文件拖放到主题?
不清楚这个数字是从哪里来的。上面列出的任何上述方法都监视所有新文件,没有任何定义的窗口。
推荐阅读
- javascript - Firechat - 无法发送消息
- c# - 动态阅读类
- c# - 如何从目录创建图像集合
- python - 为什么我的代码输出 0000000000000 作为无效的 ISBN 号?
- java - 放心的 POST 调用抛出 org.apache.http.NoHttpResponseException
- jquery - 删除具有类名的元素的父级
- ruby-on-rails - Rails 5:如何将父资源的属性传递给嵌套资源?
- python - 如何使用 Python shlex 解析 bash 数组?
- spring-mvc - 可以大摇大摆地记录在不同端口上运行的多个服务
- cakephp - Cakephp 2.9 替代 cakephp 3.0 TableRegistry