首页 > 解决方案 > 使用 Pyspark 从 REST API 获取数据到 Spark Dataframe

问题描述

我正在构建一个数据管道,它以 json 格式使用来自 RESTApi 的数据并推送到 Spark Dataframe。火花版本:2.4.4

但得到错误

df = SQLContext.jsonRDD(rdd) 
AttributeError: type object 'SQLContext' has no attribute 'jsonRDD'

代码:

from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from urllib import urlopen
from pyspark import SQLContext
import json
spark = SparkSession \
.builder \
.appName("DataCleansing") \
.getOrCreate()


def convert_single_object_per_line(json_list):
json_string = ""
for line in json_list:
json_string += json.dumps(line) + "\n"
return json_string

def parse_dataframe(json_data):
r = convert_single_object_per_line(json_data)
mylist = []
for line in r.splitlines():
mylist.append(line)
rdd = spark.sparkContext.parallelize(mylist)
df = SQLContext.jsonRDD(rdd)
return df

url = "https://mylink"
response = urlopen(url)
data = str(response.read())
json_data = json.loads(data)
df = parse_dataframe(json_data)

 

如果有任何其他更好的方法来查询 RestApi 并使用 Pyspark 将数据带到 Spark Dataframe。

我不确定我是否遗漏了什么。

标签: apache-sparkpyspark

解决方案


检查 Spark Rest API数据源。这个库的一个优点是它将使用多个执行器来获取数据 rest api 并为您创建数据框。

在您的代码中,您将所有数据提取到驱动程序并创建 DataFrame,如果您有非常大的数据,它可能会因堆空间而失败。

uri = "https://mylink"
options = { 'url' : url, 'method' : 'GET', 'readTimeout' : '10000', 'connectionTimeout' : '2000', 'partitions' : '10'}
// Now we create the Dataframe which contains the result from the call to the API
df = spark.read.format("org.apache.dsext.spark.datasource.rest.RestDataSource").options(**options).load()


推荐阅读