首页 > 解决方案 > 使用 Python 为 Kafka 加载 csv

问题描述

我正在尝试从 csv 文件中随机加载和分配主题(5 个主题)信息。这是我当前的代码。如何让这个工作?

pconf = {
    'bootstrap.servers': brokers,
    'partitioner': 'random',
    'key.serializer': StringSerializer('utf_8')
}

producer = SerializingProducer(pconf)
now = datetime.now()

current_time = now.strftime("%d/%m/%Y %H:%M:%S")

f = open('data.csv', 'r')
with f:
    reader = csv.reader(f)
    for row in reader:
        try:
            for row in range(1000):
                producer.produce(random.choice(topics), value=row)
                sleep(5)
        except KeyboardInterrupt:
            sys.stderr.write('%% Aborted by user\n')
        except BufferError:
            sys.stderr.write('%% Local producer queue is full (%d messages awaiting delivery): try again\n' % len(p))

我收到错误:TypeError:需要一个类似字节的对象,而不是“int”

标签: pythonapache-kafka

解决方案


推荐阅读