json - DStream JSON 对象到 SQLite
问题描述
堆栈溢出社区,
我有以下问题:
我正在使用 Spark Streaming 和 KafkaUtils 从 Kafka 主题中读取数据,然后将 Dstream 转换为 JSON。我想要的是将此 JSON 对象保存到具有列行格式的 SQLite 数据库中。
我在 spark-streaming 中运行的代码示例:
import sys
import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == '__main__':
conf = SparkConf().setAppName("PythonStreamingDirectKafka").setMaster("spark://spark-master:7077")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 20)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': brokers})
message = kvs.map(lambda x: x[1])
message.pprint()
# Functions
json_object = message.map(lambda s: eval(s))
temperatures_object = json_object.map(lambda s: s["temperature_value"])
#Aggregations
json_object.pprint()
temperatures_object.pprint()
ssc.start()
ssc.awaitTermination()
DStream的输出
SQLite 架构:
您知道如何实现这一目标吗?如何使用 Pyspark 将 JSON 数据从火花流传输到 SQLite 对我来说很复杂。
我提前感谢任何帮助!
解决方案
推荐阅读
- javascript - 如何使用 vanilla JavaScript 在点击时更改动态制作的 div 的颜色?
- r - 如何遍历多个行索引范围以为每个行索引范围创建单独的数据框-R
- c# - 官方 Neo4J 驱动程序的 Hello World 示例在调用 Session 时存在参考问题
- python - C++ 中的 OpenCV 拉普拉斯输出不包括负值,但在 Python 中不包括
- swift - Swift - Popover 控制器根据加载时间显示在错误的角落
- web - 如何让我的 Wix 网站完全响应?
- ssrs-2008 - SSRS 2008 到 SSRS 2017 迁移 - 订阅问题
- ssl - K3s 入口 TLS 启用访问启用 TLS 的后端,如何?
- excel - CountA 使用工作表中的数据来引用外部文件
- python - 您可以在 GeoDjango 中本地进行方位角等距投影吗?