首页 > 解决方案 > 流启动后 Spark Stream 新作业

问题描述

我有一种情况,我试图使用来自 kafka 的火花流进行流式传输。流是直接流。我能够创建一个流然后开始流式传输,还能够通过流式传输获取 kafka 上的任何更新(如果有)。

当我有新请求流式传输新主题时,问题就出现了。由于每个 jvm 只能有 1 个 SparkStreaming 上下文,因此我无法为每个新请求创建一个新流。

我想出的方法是

  1. 一旦创建了 DStream 并且 spark 流已经在进行中,只需将一个新流附加到它。这似乎不起作用,createDStream(用于新主题2)不返回流并且停止进一步处理。流继续在第一个请求(比如 topic1)上继续。

  2. 其次,我想停止流,创建 DStream,然后再次开始流。我不能使用相同的流上下文(它会抛出一个在流停止后无法添加作业的异常),如果我为新主题(topic2)创建一个新流,旧的流主题(topic1)会丢失并且它会流式传输只有新的。

这是代码,看看

 JavaStreamingContext javaStreamingContext;
        if(null == javaStreamingContext) {
            javaStreamingContext =  JavaStreamingContext(sparkContext, Durations.seconds(duration));
        } else {
            StreamingContextState streamingContextState = javaStreamingContext.getState();
            if(streamingContextState == StreamingContextState.STOPPED) {
                javaStreamingContext =  JavaStreamingContext(sparkContext, Durations.seconds(duration));
            }


        }
Collection<String> topics = Arrays.asList(getTopicName(schemaName));
        SparkVoidFunctionImpl impl = new SparkVoidFunctionImpl(getSparkSession());

        KafkaUtils.createDirectStream(javaStreamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))
                .map((stringStringConsumerRecord) -> stringStringConsumerRecord.value())
                .foreachRDD(impl);
if (javaStreamingContext.getState() == StreamingContextState.ACTIVE) {

            javaStreamingContext.start();
            javaStreamingContext.awaitTermination();
        }

不用担心 SparkVoidFunctionImpl,这是一个自定义类,是 VoidFunction 的实现。

以上是方法1,我不停止现有的流媒体。当一个新的请求进入这个方法时,它没有得到一个新的流对象,它试图创建一个 dstream。问题是 DStream 对象永远不会返回。

KafkaUtils.createDirectStream(javaStreamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, getKafkaParamMap()))

这不会返回 dstream,控制只是终止而没有错误。进一步的步骤不会执行。

我尝试了很多事情并阅读了多篇文章,但我相信这是一个非常常见的生产级别问题。完成的任何流式传输都将针对多个不同的主题进行,并且每个主题的处理方式都不同。

请帮忙

标签: javaapache-sparkapache-kafkastreamingspark-streaming

解决方案


问题是 spark master 向工作人员发送代码,尽管数据是流式传输的,但除非重新启动作业,否则底层代码和变量值保持静态。

我能想到的选项很少:

  1. Spark 作业服务器:每次您想从不同的主题订阅/流式传输而不是触及已经运行的作业时,开始一个新作业。从您的 API 正文中,您可以提供参数或主题名称。如果您想停止来自特定主题的流式传输,只需停止相应的工作。它将为您提供很大的灵活性和对资源的控制。

  2. [理论] 主题过滤器:订阅您认为需要的所有主题,当记录被拉一段持续时间时,根据主题列表过滤掉记录。通过 API 操作此主题列表以增加或减少您的主题范围,它也可以是广播变量。这只是一个想法,我根本没有尝试过这个选项。

  3. 另一种解决方法是在您需要时使用微服务将您的 Topic-2 数据中继到 Topic-1 并在您不想时停止。


推荐阅读