java - Spark-SQL 读取 JSON 数据速度慢
问题描述
我正在尝试使用Spark-SQL
从 JSON 字符串中读取和选择数据。
这是我做的:</p>
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("aaa");
sparkConf.setMaster("local[*]");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SparkSession sc = SparkSession.builder().sparkContext(javaSparkContext.sc()).getOrCreate();
String data = "{\"temp\":25, \"hum01\":50, \"env\":{\"lux\":1000, \"geo\":[32.5, 43.8]}}";
String querySql = "select env.lux as abc from testData";
System.out.println("start 01, time is"+System.currentTimeMillis());
List<String> dataList = Arrays.asList(data);
Dataset<String> dataset = sc.createDataset(dataList, Encoders.STRING());
dataset.printSchema();
System.out.println("start 02, time is"+System.currentTimeMillis());
Dataset<Row> df = sc.read().json(dataset);
System.out.println("start 03, time is"+System.currentTimeMillis());
List<String> queryResultJson = null;
try{
df.createOrReplaceTempView("testData");
System.out.println("start 04, time is"+System.currentTimeMillis());
Dataset<Row> queryData = sc.sql(querySql);
System.out.println("start 05, time is"+System.currentTimeMillis());
queryResultJson = queryData.toJSON().collectAsList();
System.out.println("start 06, time is"+System.currentTimeMillis());
}catch (Exception e) {
e.printStackTrace();
} finally {
sc.catalog().dropTempView("testData");
}
结果如下所示:
start 01, time is1543457455652
start 02, time is1543457458766
start 03, time is1543457459993
start 04, time is1543457460190
start 05, time is1543457460334
start 06, time is1543457460818
数据集创建过程似乎花费了太多时间。我想在流数据处理流程中使用此功能。但是性能太差,无法使用。
有什么方法可以让数据集的创建速度更快吗?或者有没有其他方法可以使用类似 SQL 的语言查询 Json 数据?
解决方案
使用 spark 结构化流时,您不会以相同的方式创建数据集。例如,如果您的源是一个带有描述您的数据的模式的套接字:
SparkSession spark = SparkSession.builder()
.appName("Simple Application")
.master("local[*]")
.getOrCreate();
StructType sensorSchema = new StructType().add("temp", new IntegerType())
.add("hum01", new IntegerType())
.add("env", new StructType()
.add("lux", new IntegerType())
.add("geo", new ArrayType(new FloatType(), false)));
Dataset<Row> socketDF = spark
.readStream()
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.schema()
.load()
.selectExp("temp, hum01, env");
然后你可以开始对你的算法进行基准测试。
推荐阅读
- html - 图标字体可访问性:带有 aria-label 的父 span 或带有 sr-only 的兄弟 span
- go - 省略结构如何归档
- google-cloud-platform - 谷歌云数据流与谷歌云数据融合
- c - 错误 LNK2001:使用 Shining Light Productions 时无法解析外部符号 _H_get0_pqg
- reactjs - 需要使用反应导航实现选项卡之间的边框
- javascript - 按位置从二维数组中获取 2 个值
- javascript - 格式化数组中的最后一个元素
- javascript - 如何将图像直接从烧瓶服务器发送到 html?
- javascript - 为什么只有一个客户端连接时,Socket.IO 会添加两个客户端?
- odata - 使用 OData 连接服务在 Blazor 客户端应用程序中使用 OData