首页 > 解决方案 > 从长转换为时间戳以插入数据库

问题描述

目标:从timestamp为long类型的JSON文件中读取数据,插入到Timestamp类型的表中。问题是我不知道如何将 long 类型转换为 Timestamp 类型以进行插入。

输入文件样本:

    {"sensor_id":"sensor1","reading_time":1549533263587,"notes":"My Notes for 
    Sensor1","temperature":24.11,"humidity":42.90}

我想读这个,从中创建一个 Bean,然后插入到一个表中。这是我的 Bean 定义:

public class DummyBean {

    private String sensor_id;
    private String notes;
    private Timestamp reading_time;
    private double temperature;
    private double humidity;

这是我要插入的表:

    create table dummy (
    id serial not null primary key,
    sensor_id   varchar(40),
    notes   varchar(40),
    reading_time    timestamp with time zone default (current_timestamp at time zone 'UTC'),
    temperature   decimal(15,2),
    humidity      decimal(15,2)
    );

这是我的 Spark 应用程序,用于读取 JSON 文件并执行插入(附加)

SparkSession spark = SparkSession
                .builder()
                .appName("SparkJDBC2")
                .getOrCreate();

        // Java Bean used to apply schema to JSON Data
        Encoder<DummyBean> dummyEncoder = Encoders.bean(DummyBean.class);

        // Read JSON file to DataSet
        String jsonPath = "input/dummy.json";
        Dataset<DummyBean> readings = spark.read().json(jsonPath).as(dummyEncoder);

        // Diagnostics and Sink
        readings.printSchema();
        readings.show();


        // Write to JDBC Sink
        String url = "jdbc:postgresql://dbhost:5432/mydb";
        String table = "dummy";
        Properties connectionProperties = new Properties();
        connectionProperties.setProperty("user", "foo");
        connectionProperties.setProperty("password", "bar");
        readings.write().mode(SaveMode.Append).jdbc(url, table, connectionProperties);

输出和错误信息:

root
 |-- humidity: double (nullable = true)
 |-- notes: string (nullable = true)
 |-- reading_time: long (nullable = true)
 |-- sensor_id: string (nullable = true)
 |-- temperature: double (nullable = true)

+--------+--------------------+-------------+---------+-----------+
|humidity|               notes| reading_time|sensor_id|temperature|
+--------+--------------------+-------------+---------+-----------+
|    42.9|My Notes for Sensor1|1549533263587|  sensor1|      24.11|
+--------+--------------------+-------------+---------+-----------+

Exception in thread "main" org.apache.spark.sql.AnalysisException: Column "reading_time" not found in schema Some(StructType(StructField(id,IntegerType,false), StructField(sensor_id,StringType,true), StructField(notes,StringType,true), StructField(temperature,DecimalType(15,2),true), StructField(humidity,DecimalType(15,2),true)));
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4$$anonfun$6.apply(JdbcUtils.scala:147)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4$$anonfun$6.apply(JdbcUtils.scala:147)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4.apply(JdbcUtils.scala:146)

标签: apache-sparkapache-spark-sql

解决方案


谢谢你的帮助。是的,该表缺少该列,因此我修复了该列。这就是解决它的方法(Java版)

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.to_timestamp;

...
Dataset<Row>  readingsRow = readings.withColumn("reading_time", to_timestamp(col("reading_time").$div(1000L)));

// Write to JDBC Sink
String url = "jdbc:postgresql://dbhost:5432/mydb";
String table = "dummy";
Properties connectionProperties = new Properties();
connectionProperties.setProperty("user", "foo");
connectionProperties.setProperty("password", "bar");
readingsRow.write().mode(SaveMode.Append).jdbc(url, table, connectionProperties);


推荐阅读