java - 合并来自单个流的 Kafka 记录值
问题描述
我有一个主题,它接收可能包含部分数据的 JSON 记录。我想合并这些数据,所以我尝试在最终数据记录中收集尽可能多的信息。
t1: { id: '1234', attribute1: 'foo' }
t2: { id: '1234', attribute2: 'bar' }
合并记录值后所需的流:
t1: { id: '1234', attribute1: 'foo' }
t2: { id: '1234', attribute1: 'bar', attribute2: 'bar' }
为了实现这一点,我尝试了:
//key of the topic is id
KStream<String, MyObject> input = ...
return input.groupByKey().reduce((current, newEvent) -> return newEvent.merge(current)).toStream();
但这只会产生一个条目,因为 groupy/reduce 会产生一个 KTable。有没有可能实现这一目标?
编辑:流定义是正确的,默认情况下reduce似乎不会向下游发送所有消息,而是在这样做之前缓存它们。要禁用此行为,配置属性:
cache.max.bytes.buffering: 0
必须设置。
解决方案
尝试这个
public class MergeStreams {
public Topology buildTopology(Properties allProps) {
final StreamsBuilder builder = new StreamsBuilder();
final String rockTopic = allProps.getProperty("input.rock.topic.name");
final String classicalTopic = allProps.getProperty("input.classical.topic.name");
final String allGenresTopic = allProps.getProperty("output.topic.name");
KStream<String, SongEvent> rockSongs = builder.stream(rockTopic);
KStream<String, SongEvent> classicalSongs = builder.stream(classicalTopic);
KStream<String, SongEvent> allSongs = rockSongs.merge(classicalSongs);
allSongs.to(allGenresTopic);
return builder.build();
}
public void createTopics(Properties allProps) {
AdminClient client = AdminClient.create(allProps);
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
allProps.getProperty("input.rock.topic.name"),
Integer.parseInt(allProps.getProperty("input.rock.topic.partitions")),
Short.parseShort(allProps.getProperty("input.rock.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("input.classical.topic.name"),
Integer.parseInt(allProps.getProperty("input.classical.topic.partitions")),
Short.parseShort(allProps.getProperty("input.classical.topic.replication.factor"))));
topics.add(new NewTopic(
allProps.getProperty("output.topic.name"),
Integer.parseInt(allProps.getProperty("output.topic.partitions")),
Short.parseShort(allProps.getProperty("output.topic.replication.factor"))));
client.createTopics(topics);
client.close();
}
public Properties loadEnvProperties(String fileName) throws IOException {
Properties allProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
allProps.load(input);
input.close();
return allProps;
}
public static void main(String[] args) throws Exception {
if (args.length < 1) {
throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
}
MergeStreams ms = new MergeStreams();
Properties allProps = ms.loadEnvProperties(args[0]);
allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
allProps.put(SCHEMA_REGISTRY_URL_CONFIG, allProps.getProperty("schema.registry.url"));
Topology topology = ms.buildTopology(allProps);
ms.createTopics(allProps);
final KafkaStreams streams = new KafkaStreams(topology, allProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close(Duration.ofSeconds(5));
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}
需要更多详细信息,请点击这里
推荐阅读
- python - 如何使用来自 RobotFramework 的参数调用 Python 函数?
- kotlin - 在 Linux 上的 Kotlin/Native 中调用具有作用域协程的类方法会引发异常
- css - Stylelint 规则在同一行上强制执行选择器和属性
- ruby-on-rails - 设计 SessionsController#create:覆盖默认登录策略
- xcode - 为什么 gitignore playground.xcworkspace 用于 Xcode Swift Playgrounds?
- python - 如何对嵌套的 OrderedDicts 进行排序?
- amazon-web-services - AWS Glue:附加 VPC 以使用 VPC 端点访问 S3
- r - 在 R 中将数据从长格式转换为宽格式时出现问题
- javascript - 如何使用 php 或其他工具在列表顶部显示最近添加的书签帖子?
- image - 通过 Cloudinary.DotNet 上传多张照片