java - Spark SQL:将JSON格式的毫秒时间戳转换为日期格式
问题描述
Schema 的 dataType 声明为,Timestamp
但 spark 作业未将其转换为正确的格式。
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", topic)
// .option("maxOffsetsPerTrigger", 10000)
.load();
Dataset<Row> rawStream = stream
.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), eventSpecificStructType).as("eventData"))
.select("eventData.*")
.filter(col("eventType").equalTo("Test"));
传入的时间戳1542126896113
转换为50838-01-28 18:49:111.0
.
有没有办法将毫秒转换为日期时间格式?
解决方案
您必须在 Java 中创建一个 UDF。
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
SimpleDateFormat dateFormat = new SimpleDateFormat("....Date time pattern...");
spark.udf().register("timestamp", new UDF1<String, Timestamp>() {
private static final long serialVersionUID = 1335972766810808134L;
@Override
public Timestamp call(String source)
{
try{
return new Timestamp(dateFormat.parse(source).getTime());
} catch (ParseException e) {
e.printStackTrace();
}
}
return null;
}
}, DataTypes.TimestampType);
最后:
stream = stream.withColumn("col", callUDF("timestamp", dataframe.col("col")));
推荐阅读
- matlab - 使用 fsolve 的 Levenberg-Marquadt 步进搜索
- django - 如何在模板 Django 2.1 中使用 user.username
- ios - 如何在 IBDesignable 文件中设置高度约束?
- java - 使用 .split 方法和 JOP.InputDialog Java 处理 Null 问题
- javascript - Puppeteer:如何获取节点列表的每个元素的内容?
- r - 使用 set.seed 函数生成可重现的结果
- python - 仅包含 5 个元音中的每一个的函数
- javascript - 如何确定为什么超链接不起作用?
- statistics - 计算两个叠加的高斯函数的质心
- fosuserbundle - HWIOAuth Symfony 4:服务定义必须是以“@”开头的数组或字符串,但为服务找到字符串