apache-kafka - Kafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区
问题描述
过去几周,我们一直致力于将 Kafka Connect 添加到我们的数据平台,并认为这将是一种将 Kafka 中的数据提取到 S3 数据湖中的有用方法。我们已经使用 FieldPartitioner 和 TimeBasePartitioner 并看到了一些相当不错的结果。
我们还需要按用户 ID 进行分区 - 但是尝试在用户 ID 字段上使用 FieldPartitioner 后,连接器非常慢 - 尤其是与按日期分区等相比。我知道按 ID 分区会创建很多输出分区因此不会那么快——这很好,但它需要能够跟上生产者的步伐。
到目前为止,我们已经尝试增加内存和堆 - 但我们通常不会看到任何内存问题,除非我们将 flush.size 增加到一个很大的数字。我们还尝试了小刷新大小、非常小和大的 rotate.schedule.interval.ms 配置。我们还研究了网络,但这似乎很好 - 使用其他分区器网络保持良好。
在可能浪费大量时间之前,是否有人尝试或成功通过 id 字段进行分区,尤其是在较大的主题上,使用 S3 Sink 连接器?或者有没有人在配置或设置方面有任何建议,可能是一个不错的地方?
解决方案
我不习惯 Kafka 的连接器,但我至少会尝试提供帮助。
我不知道您是否可以将连接器配置为 kafka 主题的分区级别;我假设这里有一些方法可以做到这一点。
一种可能的方法是专注于您的客户向 Kafka 代理生产的步骤。我的建议是实施你自己的Partitioner
,以便“进一步”控制你想在 kafka 方面发送数据的位置。
这是您的自定义分区器的示例/简化。例如,key
您的生产者发送的格式如下:id_name_date
. 此自定义分区器尝试提取第一个元素 ( id
),然后选择所需的分区。
public class IdPartitioner implements Partitioner
{
@Override
public int partition(String topic, Object key, byte[] kb,
Object v, byte[] vb, Cluster cl)
{
try
{
String pKey= (String) key;
int id = Integer.parseInt(pKey.substring(0,pKey.indexOf("_")));
/* getPartitionForId would decide which partition number corresponds
for the received ID.You could also implement the logic directly here.*/
return getPartitionForId(id);
}
catch (Exception e)
{return 0;}
}
@Override
public void close()
{
//maybe some work here if needed
}
}
即使您可能需要更多KafkaConnect
的调整,我相信这个选项可能会有所帮助。假设您有一个包含 5 个分区的主题,并且 getPartitionForId
只检查 ID 的第一个数字以确定分区(为简化起见,min Id 为 100,max Id 为 599)。
所以如果接收到的 key 是 fe: 123_tempdata_20201203
,则分区方法会返回0
,即第一个分区。
partition 0
(图片显示的是 P1 而不是P0 ,因为我认为这样的示例看起来更自然,但请注意,第一个分区实际上定义为,所以我只好找借口,比如:看起来更自然)。
基本上,这将是S3 上传之前的预调整或住宿。
我知道这可能不是理想的答案,因为我不知道您系统的确切规格。我的猜测是有可能将主题分区直接指向 s3 位置。
如果没有可能这样做,至少我希望这可以给你一些进一步的想法。干杯!
推荐阅读
- java - 如何打印所有回文数?
- reactjs - 在反应js中将状态从子类发送到父类
- reactjs - 控制台日志更改音乐来源,但歌曲继续播放原始设置状态
- arrays - 发生 MIPS 异常 5 [存储中的地址错误]
- mysql - 来自 MySQL 数据库的 SSRS 已发布报告显示空白/空行
- tensorflow - Keras 的 LSTM 层中的时间步长是多少?以及如何选择此参数的值?
- r - 使用上一行的计算值来计算当前
- excel - 使用“插入超链接”对话框向单元格添加链接
- security - 如何在不停机的情况下在 Kafka 集群中启用安全性
- vue.js - 复杂的预期 v-bind 指令 / v-for
I'm running into a strange issue with a complex loop I'm attempting within my component.
<template v-for="(scorecard, scorecardIndex) in scorecards"> <template v-for="(proper