apache-spark - 从长转换为时间戳以插入数据库
问题描述
目标:从timestamp为long类型的JSON文件中读取数据,插入到Timestamp类型的表中。问题是我不知道如何将 long 类型转换为 Timestamp 类型以进行插入。
输入文件样本:
{"sensor_id":"sensor1","reading_time":1549533263587,"notes":"My Notes for
Sensor1","temperature":24.11,"humidity":42.90}
我想读这个,从中创建一个 Bean,然后插入到一个表中。这是我的 Bean 定义:
public class DummyBean {
private String sensor_id;
private String notes;
private Timestamp reading_time;
private double temperature;
private double humidity;
这是我要插入的表:
create table dummy (
id serial not null primary key,
sensor_id varchar(40),
notes varchar(40),
reading_time timestamp with time zone default (current_timestamp at time zone 'UTC'),
temperature decimal(15,2),
humidity decimal(15,2)
);
这是我的 Spark 应用程序,用于读取 JSON 文件并执行插入(附加)
SparkSession spark = SparkSession
.builder()
.appName("SparkJDBC2")
.getOrCreate();
// Java Bean used to apply schema to JSON Data
Encoder<DummyBean> dummyEncoder = Encoders.bean(DummyBean.class);
// Read JSON file to DataSet
String jsonPath = "input/dummy.json";
Dataset<DummyBean> readings = spark.read().json(jsonPath).as(dummyEncoder);
// Diagnostics and Sink
readings.printSchema();
readings.show();
// Write to JDBC Sink
String url = "jdbc:postgresql://dbhost:5432/mydb";
String table = "dummy";
Properties connectionProperties = new Properties();
connectionProperties.setProperty("user", "foo");
connectionProperties.setProperty("password", "bar");
readings.write().mode(SaveMode.Append).jdbc(url, table, connectionProperties);
输出和错误信息:
root
|-- humidity: double (nullable = true)
|-- notes: string (nullable = true)
|-- reading_time: long (nullable = true)
|-- sensor_id: string (nullable = true)
|-- temperature: double (nullable = true)
+--------+--------------------+-------------+---------+-----------+
|humidity| notes| reading_time|sensor_id|temperature|
+--------+--------------------+-------------+---------+-----------+
| 42.9|My Notes for Sensor1|1549533263587| sensor1| 24.11|
+--------+--------------------+-------------+---------+-----------+
Exception in thread "main" org.apache.spark.sql.AnalysisException: Column "reading_time" not found in schema Some(StructType(StructField(id,IntegerType,false), StructField(sensor_id,StringType,true), StructField(notes,StringType,true), StructField(temperature,DecimalType(15,2),true), StructField(humidity,DecimalType(15,2),true)));
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4$$anonfun$6.apply(JdbcUtils.scala:147)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4$$anonfun$6.apply(JdbcUtils.scala:147)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$4.apply(JdbcUtils.scala:146)
解决方案
谢谢你的帮助。是的,该表缺少该列,因此我修复了该列。这就是解决它的方法(Java版)
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.to_timestamp;
...
Dataset<Row> readingsRow = readings.withColumn("reading_time", to_timestamp(col("reading_time").$div(1000L)));
// Write to JDBC Sink
String url = "jdbc:postgresql://dbhost:5432/mydb";
String table = "dummy";
Properties connectionProperties = new Properties();
connectionProperties.setProperty("user", "foo");
connectionProperties.setProperty("password", "bar");
readingsRow.write().mode(SaveMode.Append).jdbc(url, table, connectionProperties);
推荐阅读
- c - 如何处理C中的指针数组?
- amazon-web-services - AWS EMR 中主从安全组的端口
- react-native - 更新到 React 本机最新版本时,RN App 将无法运行
- julia - 在 for 循环的帮助下跳过计数 10
- vue.js - then()函数在vue store函数中没有执行
- jquery - 使用 renderevent 添加未来事件 fullcalendar
- ruby - 有没有办法在鞋鬼按下按钮时添加音效?
- r - 为 Rmarkdown 标题创建类似作者的添加
- autodesk-forge - 上传到 BIM 360 或 Forge Viewer 时如何防止某些元素爆炸
- java - Java ee / WildFly - 从 Web 模块调用 Bean