dataframe - 如何使用用户定义的函数在 Spark 中解析 json?
问题描述
我正在做一个包含 kafka、spark 和 hive 的项目。我有一个这样的事件示例,
{"event": "OrderEvent", "messageid": "2db62eb5-de95-4ce8-8161-ab7552dc2fd7", "userid": "user-346", "lineitems": [{"productid": "product-784", "quantity": 3}, {"productid": "product-173", "quantity": 1}], "orderid": 50000}
有一个消费者作业订阅 kafka 主题并消费事件,然后将它们写入 hdfs(我的配置单元表的位置)
我的问题是,我想编写一个函数来将每行的 json 事件解析为字符串,但我得到了
AttributeError: 'NoneType' object has no attribute 'repartition'
我的整个消费者工作就像
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json
class OrderEventConsumer:
def __init__(self):
conf = SparkConf().setAppName('OrderEventConsumer')
self.sc = SparkContext().getOrCreate(conf)
self.sc.setLogLevel('ERROR')
self.ssc = StreamingContext(self.sc, 5)
self.ssc.checkpoint('/tmp/SparkCheckpoints')
sqlContext = SQLContext(self.sc)
# Kafka variables
self.zkQuorum = 'localhost:2189'
self.topic = 'test' # 'prod-raw-product-view'
def format_event(self, rdd):
for i in range(len(rdd['lineitems'])):
yield '{},{},{},{},{},{}'.format(rdd['userid'], rdd['orderid'], rdd['lineitems'][i]['productid'],
rdd['lineitems'][i]['quantity'], rdd['messageid'], rdd['event_time'])
def consume(self):
kvs = KafkaUtils.createStream(self.ssc, self.zkQuorum, 'spark-streaming-consumer', {self.topic: 1})
aRdd = kvs.map(lambda x: json.loads(x[1])) \
.foreachRDD(lambda x: x.foreach(lambda x: self.format_event(x))) \
.repartition(1) \
.saveAsTextFiles('hdfs://node1/user/hive/warehouse/hb.db/fact_order/')
self.ssc.start()
self.ssc.awaitTermination()
if __name__ == '__main__':
orderConsumer = OrderEventConsumer()
orderConsumer.consume()
我想向 hdfs 写入一个文件,其中包括每个事件的 lineitems 计数时间行。我能怎么做?
谢谢。
解决方案
你不应该使用foreachRDD
andforeach
函数——它们不返回任何数据。如果你想格式化你的东西,只需map
像上一行一样使用。
另外,不要使用repartition
,而是使用coalesce
- 它可能会更快
PS 如果您刚刚开始,我建议您使用 Spark Structured Streaming - 它可能比 Spark Streaming 更高效且更容易起诉。
推荐阅读
- javascript - 每次在文本字段中按下键时,Vue 都会重新加载/发送 XHR 请求
- api - 检查 API 要求 crx/extension/chrome/chromium
- html - 从 powershell 获取 html 文件值
- python - Python 的 sqlite3 库有时会返回原始日期时间,有时会返回同一列的日期时间
- abp - ABP 框架 4.4.3 包 IdentityServer4 4.1.1 在部署时与 net50 不兼容
- cordova - 如何打印图像蓝牙串行
- c++ - CZMQ 设置发送 HWM / 设置接收 HWM
- mariadb - 如何正确编写此条件请求 MariaDB
- javascript - 模拟拖曳功能默认导出和常规导出笑话测试
- powerbi - 从 Power BI 连接到托管在 Azure VM 中的 SQL Server