首页 > 解决方案 > Spark-SQL 读取 JSON 数据速度慢

问题描述

我正在尝试使用Spark-SQL从 JSON 字符串中读取和选择数据。

这是我做的:</p>

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("aaa");
sparkConf.setMaster("local[*]");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SparkSession sc = SparkSession.builder().sparkContext(javaSparkContext.sc()).getOrCreate();

String data = "{\"temp\":25, \"hum01\":50, \"env\":{\"lux\":1000, \"geo\":[32.5, 43.8]}}";
String querySql = "select env.lux as abc from testData";

System.out.println("start 01, time is"+System.currentTimeMillis());
List<String> dataList = Arrays.asList(data);
Dataset<String> dataset = sc.createDataset(dataList, Encoders.STRING());
dataset.printSchema();
System.out.println("start 02, time is"+System.currentTimeMillis());
Dataset<Row> df = sc.read().json(dataset);
System.out.println("start 03, time is"+System.currentTimeMillis());
List<String> queryResultJson = null;
try{
  df.createOrReplaceTempView("testData");
  System.out.println("start 04, time is"+System.currentTimeMillis());
  Dataset<Row> queryData = sc.sql(querySql);
  System.out.println("start 05, time is"+System.currentTimeMillis());
  queryResultJson = queryData.toJSON().collectAsList();
  System.out.println("start 06, time is"+System.currentTimeMillis());
}catch (Exception e) {
  e.printStackTrace();
} finally {
  sc.catalog().dropTempView("testData");
}

结果如下所示:

start 01, time is1543457455652
start 02, time is1543457458766
start 03, time is1543457459993
start 04, time is1543457460190
start 05, time is1543457460334
start 06, time is1543457460818

数据集创建过程似乎花费了太多时间。我想在流数据处理流程中使用此功能。但是性能太差,无法使用。

有什么方法可以让数据集的创建速度更快吗?或者有没有其他方法可以使用类似 SQL 的语言查询 Json 数据?

标签: javaapache-sparkapache-spark-sql

解决方案


使用 spark 结构化流时,您不会以相同的方式创建数据集。例如,如果您的源是一个带有描述您的数据的模式的套接字:

SparkSession spark = SparkSession.builder()
    .appName("Simple Application")
    .master("local[*]")
    .getOrCreate();
StructType sensorSchema = new StructType().add("temp", new IntegerType())
        .add("hum01", new IntegerType())
        .add("env", new StructType()
                                .add("lux", new IntegerType())
                                .add("geo", new ArrayType(new FloatType(), false)));
Dataset<Row> socketDF = spark
    .readStream()
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .schema()
    .load()
    .selectExp("temp, hum01, env");

然后你可以开始对你的算法进行基准测试。


推荐阅读