java - Spark Streaming:在大量数据的情况下不启动 Foreach 写入
问题描述
我有一个火花流作业,它从 kafka 主题中读取数据。试图将传入的数据写入数据库。从而实现mysql sink。在做之前,我写了一个简单的接收器来测试流程。
它在我的本地运行良好,但是当我在开发环境中运行它时,它会抛出SparkException: Exception thrown in awaitResult
.
我相信原因可能是开发环境有更多来自 kafka 主题的数据。
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
// .option("kafkaConsumer.pollTimeoutMs", "1000")
.option("maxOffsetsPerTrigger", 1000)
.load();
stream.writeStream().foreach(sink).start();
在开发环境中,流在控制台上打印得很好。console.stream.writeStream().format("console").start();
简单接收器:
public class SimpleSink extends ForeachWriter<Row> {
public SimpleSink() {
}
@Override
public boolean open(long partitionId, long version) {
System.out.println(">>>");
return true;
}
@Override
public void process(Row row) {
System.out.println("comes here");
//System.out.println(row.mkString(","));
}
@Override
public void close(Throwable errorOrNull) {
}
}
因为这段代码在我的笔记本电脑上运行良好,这意味着代码很好。但我不知道为什么在更高的 env Sink 中没有被调用。
有没有办法控制进入接收器的数据量?不确定这是否是问题,但这是我的理论。
解决方案
推荐阅读
- html - 为什么即使我的类标签正确,CSS 网格也不起作用?
- java - 如何使用 Java8 流在下面的数组列表中找到第二高的薪水
- javascript - TypeError:this.querySelectorAll 在 react js 中使用 D3Funnel 时不是函数
- mongodb - 在 MongoDB 中建模 RBAC 的最佳方法是什么:嵌入还是链接?
- python - 我不明白默认值如何在继承中起作用
- spring-boot - 无法调用“org.apache.commons.logging.Log.isDebugEnabled()”,因为“this.logger”为空
- python - 在某个计时器没有动作后,安全凸轮停止记录
- python - SessionNotCreatedException:消息:会话未创建:此版本的 ChromeDriver 仅支持使用 ChromeDriver 和 Chrome 的 Chrome 版本 87
- json - React 显示 json 键和值
- json - 如何查询数组字段(AWS Glue)?