java - 改进 kafka 流的最佳实践是什么
问题描述
我正在使用流从一个主题 A 到另一个主题 B 生成数据。但它非常慢。主题 A 有约 1.3 亿条记录的数据。
我们正在过滤具有特定日期的消息并生成主题 B。有没有办法加快速度?
以下是我正在使用的配置:
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Where to find the schema registry instance(s)
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
// streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + port);
// streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8088");
streamsConfiguration.put(StreamsConfig.RETRIES_CONFIG, 10);
streamsConfiguration.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, (10 * 1000L));
streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DefaultBugsnagExceptionHandler.getInstance().getClass());
// streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler);
// Specify (de)serializers for record keys and for record values.
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), "all");
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Records should be flushed every 10 seconds. This is less than the default
// in order to keep this example interactive.
///Messages will be forwarded either when the cache is full or when the commit interval is reached
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 500);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
StreamsConfig config = new StreamsConfig(streamsConfiguration);
StreamsBuilder builder = new StreamsBuilder();
String start_date = "2018-05-10";
String end_date = "2018-05-16";
//DateFormat format = new SimpleDateFormat("yyyy-MM-dd");
//LocalDate dateTime;
// builder.stream("topicA").to("topicB");
KStream<String, avroschems> source = builder.stream("topicA");
source
.filter((k, value) -> LocalDate.parse(value.getDay()).isAfter(LocalDate.parse(start_date)) && LocalDate.parse (value.getDay()).isBefore(LocalDate.parse(end_date)))
.to("bugSnagIntegration_mobileCrashError_filtered");
System.out.println("Starting Kafka Stream");
return new KafkaStreams(builder.build(), config);
我正在尝试将消息复制到某个日期范围内的 topicB。不确定这是否会导致缓慢?
如何实现并发?
解决方案
“极慢”不是一个非常具体的术语。您应该分享一些具体的吞吐量数字。
关于多线程:增加StreamsConfig.NUM_STREAM_THREADS_CONFIG
是正确的。但是,这仅在 CPU 是瓶颈时才有帮助。如果网络是瓶颈,您需要在不同的机器上启动多个应用程序实例(即多次部署确切的某个应用程序);对于这种情况,所有实例也将组成消费者组并分担负载。我建议阅读文档以获取更多详细信息:https ://docs.confluent.io/current/streams/architecture.html#parallelism-model
此外,您可以配置内部使用的消费者和生产者客户端。这也可能有助于提高吞吐量。参照。https://docs.confluent.io/current/streams/developer-guide/config-streams.html#kafka-consumers-producer-and-admin-client-configuration-parameters
推荐阅读
- java - 在java应用程序中读取xml文档
- android - 谷歌地图过渡
- python - 从系列中减去一个数据框(或其唯一的列)
- python - 在kivy上实现pywebview
- vb.net - 在 VB.Net 应用程序的第 40 次迭代中,底层连接被关闭
- python - 如何使用 python 或 mql5 从 IQO 二进制交易室中抓取每个分时数据?
- sql - 用于选择数据库中所有 pk/fk 的 SQL 查询
- php - 以编程方式删除 core_url_rewrite 条目 magento
- mysql - 如何在没有 Web 服务的情况下将 Xamarin Forms 连接到 MySQL 数据库作为应用程序的后端?
- jenkins - Jenkins CLI:错误:匿名缺少整体/读取权限