首页 > 解决方案 > Spark Streaming:在大量数据的情况下不启动 Foreach 写入

问题描述

我有一个火花流作业,它从 kafka 主题中读取数据。试图将传入的数据写入数据库。从而实现mysql sink。在做之前,我写了一个简单的接收器来测试流程。

它在我的本地运行良好,但是当我在开发环境中运行它时,它会抛出SparkException: Exception thrown in awaitResult.

我相信原因可能是开发环境有更多来自 kafka 主题的数据。

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", topic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
//          .option("kafkaConsumer.pollTimeoutMs", "1000")
          .option("maxOffsetsPerTrigger", 1000)
          .load();

 stream.writeStream().foreach(sink).start();

在开发环境中,流在控制台上打印得很好。console.stream.writeStream().format("console").start();

简单接收器:

public class SimpleSink extends ForeachWriter<Row> {

    public SimpleSink() {

    }

    @Override
    public boolean open(long partitionId, long version) {
        System.out.println(">>>");
        return true;

    }

    @Override
    public void process(Row row) {
        System.out.println("comes here");
        //System.out.println(row.mkString(","));
    }

    @Override
    public void close(Throwable errorOrNull) {
    }

}

因为这段代码在我的笔记本电脑上运行良好,这意味着代码很好。但我不知道为什么在更高的 env Sink 中没有被调用。

有没有办法控制进入接收器的数据量?不确定这是否是问题,但这是我的理论。

标签: javaapache-sparkspark-streaming

解决方案


推荐阅读