apache-kafka - 由于突然终止,kafkastream 重新启动后未收到任何记录
问题描述
我的 Stream 工作正常,由于某种原因它被使用命令 pkill 杀死,现在重新启动后它不起作用,它没有收到任何东西。
我的流消费者:这里我配置为 4 个流任务,
//流消费者处理程序
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> userStream = builder.stream("usertopic",Consumed.with(Serdes.String(), Serdes.String()));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "ALL my bootstrap servers);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "500");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//consumer_timeout_ms
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 2000);
props.put("state.dir","/tmp/kafka/stat));
userStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));
/* pass stream to my logic commented */
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
}
});
kafkaStreams.cleanUp();
kafkaStreams.start();
流启动日志:
:43,593 INFO [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] o.a.k.c.c.KafkaConsumer: [Consumer clientId=streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50-restore-consumer, groupId=] Unsubscribed all topics or patterns and assigned partitions
2019-09-09T16:06:43,593 INFO [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] o.a.k.s.p.i.StreamThread: stream-thread [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] State transition from PARTITIONS_ASSIGNED to RUNNING
2019-09-09T16:06:43,593 INFO [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-50] o.a.k.s.KafkaStreams: stream-client [streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d] State transition from REBALANCING to RUNNING
2019-09-09T16:11:39,386 INFO [kafka-producer-network-thread | b2b] o.a.k.c.Metadata: Cluster ID: O8kYaORIQQ2vie_DUA_xvA
2019-09-09T16:11:41,148 INFO [AsyncResolver-bootstrap-executor-0] c.n.d.s.r.a.ConfigClusterResolver: Resolving eureka endpoints via configuration
2019-09-09T16:11:41,624 INFO [kafka-producer-network-thread | streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-1-producer] o.a.k.c.Metadata: Cluster ID: O8kYaORIQQ2vie_DUA_xvA
2019-09-09T16:11:41,672 INFO [kafka-producer-network-thread | streams-userstream-76ccdde0-115e-489e-894f-ec88aa9e3e2d-StreamThread-2-producer] o.a.k.c.Metadata: Cluster ID: O8kYaORIQQ2vie_DUA_xvA
我在/tmp/kafka/stat中删除了我的应用程序 ID, 但仍然没有收到。
但是当我测试时,在kafka消费者客户端下面获取数据
/kafka-console-consumer.sh --bootstrap-server serverip:9237 --from-beginning --topic usertopic
解决方案
我认为您可能正在运行同一组的其他消费者实例,出于测试目的,您可以更改流组名
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream");
change to
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-userstream-test");
如果这可行,那么它应该是以下任何原因
1. Other instance is running with same group name
2. Your stream is not gracefully shutdown
3. your state dir is not cleaned
推荐阅读
- angular - 从 Firebase 检索用户信息并在应用中显示
- ios - iPhone 上的 Safari 不允许在字段中输入文本
- jquery - Wordpress 管理面板中的 jQuery 问题
- list - 根据另一列创建具有多个列的唯一名称的列表?
- javascript - 使用从 Alphavantage 返回的 axios 对象更新 reactjs 状态
- c# - 为什么这个 C# 脚本会导致在 Unity 中循环的游戏对象之间出现延迟峰值?
- android - Android Oreo+:AppWidget 点击不调用 JobIntentService.onHandleWork()
- javascript - 函数在调用时正常运行,但总是返回 undefined
- python - 无法从 gevent 导入 wsgi
- android - BoxStore 中的最大盒子数