首页 > 解决方案 > 如何使用 pykafka 消费者从主题中获取数据

问题描述

我不知道如何从 Pykafka 消费者那里获取数据。我什至有问题打印消费者的主题。问题是,无论我对消费者调用什么方法,这个过程都会永远挂起。如果我只是初始化消费者而不使用它,则该过程完成。感谢您提前提供任何帮助。

def getData(spark):
    spark.sparkContext.setLogLevel("WARN")
    scc = StreamingContext(spark, 1)
    topic = "justtopic"
    client = pykafka.KafkaClient("localhost:9092")      
    KAFKA_VERSION = (0, 10)
    print("topics", client.topics)                        <-- this line is working

    consumer = KafkaConsumer(
        'justtopic', bootstrap_servers = 'localhost:9092',
        api_version = KAFKA_VERSION
    )

    print(consumer.topics())                         <-- if i call some function on consumer it hangs forever.
    #rdd = kafkaStream.flatMap(lambda line: line.strip().split("\n")).map(lambda strelem: float(strelem))
    # print("****** ", rdd.count())

标签: apache-sparkpysparkapache-kafkapykafka

解决方案


推荐阅读