apache-kafka - 如何使用具有特定分区器的 Apache Flink 将数据作为键/值发送到 Kafka
问题描述
我在 Flink 中有一个有效载荷,如下所示;
{
"memberId": 4
"total": 5
}
我想使用指定的分区器将数据作为键值格式发送到 kafka。对于分区器,我将使用 Modulo 分区器。
模分区器的示例;
partitionId = value % numPartitions
假设numPartitions
参数为 3。如果我们可以使用上面定义的有效负载的 memberId,partitionId 应该4 % 3
= 1
根据上面的分区器,我想将具有相同partitionId的数据发送到同一个kafka主题。另一个例子;
如果(假设 numPartitions = 3);
memberId: 3 => (3 % 3) => partitionId = 0 => kafka partition 1
memberId: 8 => (8 % 3) => partitionId = 2 => kafka partition 2
memberId: 2 => (2 % 3) => partitionId = 2 => kafka partition 2
memberId: 6 => (6 % 3) => partitionId = 0 => kafka partition 1
memberId: 7 => (7 % 3) => partitionId = 1 => kafka partition 2
如果我没记错的话,如果我们不能指定任何键和分区函数,flink kafka producer 使用 FlinkFixedPartitioner。如果我们将分区函数设置为null
,flink kafka producer 将使用循环分发。但我不知道如何将数据作为键/值格式发送到 kafka,如何通过模数对其进行分区。我怎样才能做到这一点?
解决方案
如果您使用 a KafkaSerializationSchema
,那么您可以创建 Kafka ProducerRecords
,并设置 Kafka 键(和值)。您也可以在ProducerRecord
.
推荐阅读
- nginx - 为我在 nginx 中已经有 proxy_pass 的应用程序配置清漆
- c# - 在控制器.NET CORE MVC 中重用存储库?
- javascript - 焦点选择器是否不适用于 onfocusout 功能
- scala - 我将如何分组并提取 Scala 中该组的不同列的最高 N?
- javascript - 反应状态不重新调整状态的最新值
- javascript - 在 ES6 中转换往返时间模式
- reactjs - 为什么 svg-icon 字体回退到新罗马时代
- flutter - 如何从本机平台调用 Flutter 应用程序中的无头 Dart 代码
- react-redux - 元素类型无效:在 React - Redux 中需要一个字符串
- python - keras中的多类多维标签分类