apache-spark - 如何使用 Spark 结构化流同时写入 Parquet 并调用 REST API
问题描述
如何使用 spark 结构化流同时写入 parquet 和调用 REST API?下面是我需要集成的地方:
- 通过 spark SQL 结构化流,可以从 Kafka 主题中消费。
- 该消息采用 avro 格式,并且能够写入 parquet 文件系统。
- 另一方面,能够读取 parquet 文件系统并根据需要触发任何 SQL 查询。
以下是一些我卡住的集成或处理,任何人都可以帮忙:
- 所以,我现在必须集成一个休息调用,同时我应该能够写入 parquet 文件系统并调用休息 API。
- 要调用 REST API,我还应该先将 Dataset 转换为 Avro 对象,然后为 REST API 准备请求对象。
上面的流式实现是在 JAVA 上完成的。如果建议使用基于 JAVA 的 API 或方法,那将有很大帮助。
供参考。我正在使用最新版本的火花流:spark-streaming-kafka-0-10_2.12 -> 2.4.0 spark-streaming_2.12 -> 3.0.1
{
//dataSet -> dataset having kafka message
Dataset<Row> output = dataSet.select(package$.MODULE$.from_avro(col("value"), avroSchema).as("EventMessage")).select("EventMessage.*");
output
.writeStream()
.outputMode(OutputMode.Append().toString()).format("console")
.foreachBatch((VoidFunction2<Dataset<Row>, Long>) (df, batchId) -> {
df.write().mode(OutputMode.Append().toString()).format("parquet").partitionBy("action").parquet(STREAM_PARQUET_OUTPUT_PATH);
// REST API CALL BLOCK
//df -> avro object -> API Rquest Object -> REST Call
}).start().awaitTermination();
}
解决方案
推荐阅读
- c# - 使用 HttpClient 时出现意外响应
- node.js - 在 Ubuntu 上通过 Compass 加载 MongoDB Atlas 时出错
- python - 将 Python 列表项分解为更小的列表,替换新的子列表项,并将列表与新值重新组合在一起
- java - 搜索的项目在点击时给我错误的结果
- java - 为什么我在Android中从毫秒转换为本地时间时时间不正确?
- ef-core-3.1 - ef core 接收奇怪的主机不能为空错误
- c# - 如何向 XAML 页面 BindingContext 提供具有带有参数的构造函数的 ViewModel
- python - 如何从另一个脚本中获取输出
- java - 为行矩阵乘法运行多个线程
- mysql - Mysql 触发器 ON INSERT + INNER JOIN 语法错误