首页 > 解决方案 > 为什么 ForEach 接收器在 Spark 结构化流中不调用函数 (show_data_function)?

问题描述

我想查看火花流数据帧中可用的数据,稍后我想对该数据应用业务操作。

到目前为止,我已经尝试将流式 DataFrame 转换为 RDD。一旦该对象转换为 RDD,我想应用一个函数来转换数据,并使用模式创建新列(用于特定消息)。

dsraw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_kafka_server) \
    .option("subscribe", topic) \
    .load() \
    .selectExpr("CAST(value AS STRING)")


print "type (df_stream)", type(dsraw)
print "schema (dsraw)", dsraw.printSchema()


def show_data_fun(dsraw, epoch_id):
    dsraw.show()

    row_rdd = dsraw.rdd.map(lambda row: literal_eval(dsraw['value']))
    json_data = row_rdd.collect()

    print "From rdd : ", type(json_data)
    print "From rdd : ", json_data[0]
    print "show_data_function_call"


jsonDataQuery = dsraw \
    .writeStream \
    .foreach(show_data_fun)\
    .queryName("df_value")\
    .trigger(continuous='1 second')\
    .start()

print the first JSON message which is in the stream.

标签: python-2.7apache-sparkapache-kafkaspark-structured-streaming

解决方案


推荐阅读