首页 > 解决方案 > DStream JSON 对象到 SQLite

问题描述

堆栈溢出社区,

我有以下问题:

我正在使用 Spark Streaming 和 KafkaUtils 从 Kafka 主题中读取数据,然后将 Dstream 转换为 JSON。我想要的是将此 JSON 对象保存到具有列行格式的 SQLite 数据库中。

我在 spark-streaming 中运行的代码示例:

import sys
import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

if __name__ == '__main__':

    conf = SparkConf().setAppName("PythonStreamingDirectKafka").setMaster("spark://spark-master:7077")
    sc = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")

    ssc = StreamingContext(sc, 20)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': brokers})
    message = kvs.map(lambda x: x[1])
    message.pprint()

    # Functions
    json_object = message.map(lambda s: eval(s))
    temperatures_object = json_object.map(lambda s: s["temperature_value"])

    #Aggregations
    json_object.pprint()
    temperatures_object.pprint()

    ssc.start()
    ssc.awaitTermination()

DStream的输出

DStream 输出

SQLite 架构:

数据库模式

您知道如何实现这一目标吗?如何使用 Pyspark 将 JSON 数据从火花流传输到 SQLite 对我来说很复杂。

我提前感谢任何帮助!

标签: jsonsqliteapache-sparkpysparkspark-streaming

解决方案


推荐阅读