首页 > 解决方案 > 暂停消费者一段时间后数据丢失

问题描述

通过 Kafka 的动态更改配置,我试图在不重新启动消费者服务的情况下解决这个问题。但是我发现有一个问题,一些数据丢失了。

从这个事实可以看出, 的值msg.value['time']并不总是连续的。

def main():
    while True:
        time_interval = 0
        t0 = datetime.now()
        # get the current configuration
        bootstrap_servers, topic = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
        consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=msgpack.loads)
        consumer.subscribe(topics=topic)
        for msg in consumer:
            if time_interval < 60:
                print('nth: ', msg.value['time'])
                save_data(msg.value)
                time_interval = (datetime.now()-t0).seconds
            else:
                print('check the config')
                break

如何解决数据丢失的问题?顺便问一下,这种情况下我的做法合理吗?</p>


更新:

我做了一些修改,以避免consumer在配置不变的情况下重复实例化对象。

def main():
    bootstrap_servers, topic = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
    while True:
        time_interval = 0
        t0 = datetime.now()
        # get the current configuration
        consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=msgpack.loads)
        consumer.subscribe(topics=topic)
        for msg in consumer:
            if time_interval < 60:
                print('nth: ', msg.value['time'])
                save_data(msg.value)
                time_interval = (datetime.now()-t0).seconds
            else:
                time_interval = 0
                bootstrap_servers1, topic1 = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
                if bootstrap_servers1 != bootstrap_servers or topic1 != topic:
                    bootstrap_servers = bootstrap_servers1
                    topic = topic1
                    print('changing Kafka configuration')
                    break

msg.value['time']变成0, 1, 2, 3, 4, 5 ... 41, 43, 45, 47, 49, 51, 53 ...,在某个点,它开始以某种模式丢失。

生产者端确保msg.value['time']始终是连续的,并且两个示例中的配置都没有改变。


更新:

我犯了一个错误。我没有检查 for 循环中的每个项目,else条件只是跳过了某个项目。


def main():
    bootstrap_servers, topic = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
    while True:
        time_interval = 0
        t0 = datetime.now()
        # get the current configuration
        consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers, value_deserializer=msgpack.loads)
        consumer.subscribe(topics=topic)
        for msg in consumer:
            print('nth: ', msg.value['time'])
            save_data(msg.value)
            if time_interval < 60:
                time_interval = (datetime.now()-t0).seconds
            else:
                time_interval = 0
                t0 = datetime.now()
                bootstrap_servers1, topic1 = KafkaConfig.select(KafkaConfig.bootstrap_servers, KafkaConfig.topic).scalar(as_tuple=True)
                if bootstrap_servers1 != bootstrap_servers or topic1 != topic:
                    bootstrap_servers = bootstrap_servers1
                    topic = topic1
                    print(' changing Kafka configuration')
                    break

标签: apache-kafkakafka-python

解决方案


推荐阅读