apache-kafka - 暂停消费者一段时间后数据丢失
问题描述
通过 Kafka 的动态更改配置,我试图在不重新启动消费者服务的情况下解决这个问题。但是我发现有一个问题,一些数据丢失了。
从这个事实可以看出, 的值msg.value['time']
并不总是连续的。
def main():
while True:
time_interval = 0
t0 = datetime.now()
# get the current configuration
bootstrap_servers, topic = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=msgpack.loads)
consumer.subscribe(topics=topic)
for msg in consumer:
if time_interval < 60:
print('nth: ', msg.value['time'])
save_data(msg.value)
time_interval = (datetime.now()-t0).seconds
else:
print('check the config')
break
如何解决数据丢失的问题?顺便问一下,这种情况下我的做法合理吗?</p>
更新:
我做了一些修改,以避免consumer
在配置不变的情况下重复实例化对象。
def main():
bootstrap_servers, topic = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
while True:
time_interval = 0
t0 = datetime.now()
# get the current configuration
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=msgpack.loads)
consumer.subscribe(topics=topic)
for msg in consumer:
if time_interval < 60:
print('nth: ', msg.value['time'])
save_data(msg.value)
time_interval = (datetime.now()-t0).seconds
else:
time_interval = 0
bootstrap_servers1, topic1 = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
if bootstrap_servers1 != bootstrap_servers or topic1 != topic:
bootstrap_servers = bootstrap_servers1
topic = topic1
print('changing Kafka configuration')
break
msg.value['time']
变成0, 1, 2, 3, 4, 5 ... 41, 43, 45, 47, 49, 51, 53 ...
,在某个点,它开始以某种模式丢失。
生产者端确保msg.value['time']
始终是连续的,并且两个示例中的配置都没有改变。
更新:
我犯了一个错误。我没有检查 for 循环中的每个项目,else
条件只是跳过了某个项目。
def main():
bootstrap_servers, topic = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
while True:
time_interval = 0
t0 = datetime.now()
# get the current configuration
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=msgpack.loads)
consumer.subscribe(topics=topic)
for msg in consumer:
print('nth: ', msg.value['time'])
save_data(msg.value)
if time_interval < 60:
time_interval = (datetime.now()-t0).seconds
else:
time_interval = 0
t0 = datetime.now()
bootstrap_servers1, topic1 = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
if bootstrap_servers1 != bootstrap_servers or topic1 != topic:
bootstrap_servers = bootstrap_servers1
topic = topic1
print(' changing Kafka configuration')
break
解决方案
推荐阅读
- windows - .txt 与 Windows 激活(批处理文件)
- java - 将 getText() 函数与 JTextField 一起使用时出现空指针异常
- tibco - 报错 Tibco business studio:“应用模块中没有组件”
- android - 在 android 上录制的 ios 上播放音频文件的问题
- c# - 将“byteA”声明为 var 时出现错误。如何解决?
- travis-ci - 如何根据 Travis 的分支部署到 ACS
- php - 计算和添加数组php中的数组值
- mongodb - 在 python 中排序和加入 mongodb 集合
- google-maps - 如何模拟“google_maps_flutter”包进行颤振测试?
- sql - 无法在 MS Access 上使用 NOT EXISTS 删除所有不在前 20 名中的行