首页 > 解决方案 > 如何使用用户定义的函数在 Spark 中解析 json?

问题描述

我正在做一个包含 kafka、spark 和 hive 的项目。我有一个这样的事件示例,

{"event": "OrderEvent", "messageid": "2db62eb5-de95-4ce8-8161-ab7552dc2fd7", "userid": "user-346", "lineitems": [{"productid": "product-784", "quantity": 3}, {"productid": "product-173", "quantity": 1}], "orderid": 50000}

有一个消费者作业订阅 kafka 主题并消费事件,然后将它们写入 hdfs(我的配置单元表的位置)

我的问题是,我想编写一个函数来将每行的 json 事件解析为字符串,但我得到了 AttributeError: 'NoneType' object has no attribute 'repartition'

我的整个消费者工作就像

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
import json


class OrderEventConsumer:

def __init__(self):
    conf = SparkConf().setAppName('OrderEventConsumer')
    self.sc = SparkContext().getOrCreate(conf)
    self.sc.setLogLevel('ERROR')
    self.ssc = StreamingContext(self.sc, 5)
    self.ssc.checkpoint('/tmp/SparkCheckpoints')
    sqlContext = SQLContext(self.sc)

    # Kafka variables
    self.zkQuorum = 'localhost:2189'
    self.topic = 'test'  # 'prod-raw-product-view'

def format_event(self, rdd):
    for i in range(len(rdd['lineitems'])):
        yield '{},{},{},{},{},{}'.format(rdd['userid'], rdd['orderid'], rdd['lineitems'][i]['productid'],
                                         rdd['lineitems'][i]['quantity'], rdd['messageid'], rdd['event_time'])

def consume(self):
    kvs = KafkaUtils.createStream(self.ssc, self.zkQuorum, 'spark-streaming-consumer', {self.topic: 1})
    aRdd = kvs.map(lambda x: json.loads(x[1])) \
        .foreachRDD(lambda x: x.foreach(lambda x: self.format_event(x))) \
        .repartition(1) \
        .saveAsTextFiles('hdfs://node1/user/hive/warehouse/hb.db/fact_order/')
    self.ssc.start()
    self.ssc.awaitTermination()


if __name__ == '__main__':
    orderConsumer = OrderEventConsumer()
    orderConsumer.consume()

我想向 hdfs 写入一个文件,其中包括每个事件的 lineitems 计数时间行。我能怎么做?

谢谢。

标签: dataframeapache-sparkapache-kafkaspark-streamingrdd

解决方案


你不应该使用foreachRDDandforeach函数——它们不返回任何数据。如果你想格式化你的东西,只需map像上一行一样使用。

另外,不要使用repartition,而是使用coalesce- 它可能会更快

PS 如果您刚刚开始,我建议您使用 Spark Structured Streaming - 它可能比 Spark Streaming 更高效且更容易起诉。


推荐阅读