python-2.7 - 为什么 ForEach 接收器在 Spark 结构化流中不调用函数 (show_data_function)?
问题描述
我想查看火花流数据帧中可用的数据,稍后我想对该数据应用业务操作。
到目前为止,我已经尝试将流式 DataFrame 转换为 RDD。一旦该对象转换为 RDD,我想应用一个函数来转换数据,并使用模式创建新列(用于特定消息)。
dsraw = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", bootstrap_kafka_server) \
.option("subscribe", topic) \
.load() \
.selectExpr("CAST(value AS STRING)")
print "type (df_stream)", type(dsraw)
print "schema (dsraw)", dsraw.printSchema()
def show_data_fun(dsraw, epoch_id):
dsraw.show()
row_rdd = dsraw.rdd.map(lambda row: literal_eval(dsraw['value']))
json_data = row_rdd.collect()
print "From rdd : ", type(json_data)
print "From rdd : ", json_data[0]
print "show_data_function_call"
jsonDataQuery = dsraw \
.writeStream \
.foreach(show_data_fun)\
.queryName("df_value")\
.trigger(continuous='1 second')\
.start()
print the first JSON message which is in the stream.
解决方案
推荐阅读
- python - 如何在我的游戏内计时器继续时单击由 graphics.py 制作的窗口内部?
- c++ - AStar 网格算法是否只处理方形网格?
- ruby-on-rails - 有没有办法在控制器内使用 link_to 标签?
- vue.js - 升级后 VueJS 3.0.0 Pug 渲染失败
- php - SQL 准备语句更新值防止 sql 注入
- wordpress - 缓慢的加载时间
- google-cloud-platform - 在 Google Cloud Platform 中设置 FTP
- usb - 通过 libusb 的 Windows 复合设备(usb 音频类和自定义批量接口)?
- python - 带有“或”的函数的条件语句
- ios - iOS 14:UNUserNotificationCenter requestAuthorizationWithOptions 总是失败并显示“此应用程序不允许通知”