首页 > 解决方案 > Flink - 产生运动不起作用

问题描述

我正在尝试运行一个简单的程序,该程序从一个运动流中读取数据,进行简单的转换,然后将结果写入另一个运动流。

在 Flink 1.4.0 上本地运行(这是目前 EMR 支持的版本,因此无法升级)。

这是代码:

def main(args: Array[String]) {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val consumerConfig = new Properties()
  consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
  consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

  val kinesisMaps = env.addSource(new FlinkKinesisConsumer[String](
    "source-stream", new SimpleStringSchema, consumerConfig))

  val jsonMaps = kinesisMaps.map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, String]] }
  val values = jsonMaps.map(jsonMap => jsonMap("field_name"))

  values.print()

  val producerConfig = new Properties()
  producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")

  val kinesisProducer = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
  kinesisProducer.setFailOnError(true)
  kinesisProducer.setDefaultStream("target-stream")
  kinesisProducer.setDefaultPartition("0")

  values.addSink(kinesisProducer)

  // execute program
  env.execute("Flink Kinesis")
}

如果我注释掉生成代码,程序会按预期运行并打印正确的值。

添加生产者代码后,我立即收到以下异常:

org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)
    at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:477)
    at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:248)
    at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:608)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:569)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:264)
    at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:210)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

知道这是什么原因吗?

标签: scalaapache-flinkamazon-kinesis

解决方案


显然,这是 Flink 1.4 中使用的旧版 Amazon KPL 的问题。

对此至少有两种可能的解决方案:

  1. 升级到 Flink 1.5 版本。如果按照此处所述安装它,您仍然可以在 EMR 上使用它,在自定义 EMR 安装部分下: https ://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws .html

  2. 在为 Flink 1.4 构建 Kinesis 连接器时,您可以使用更新的 AWS 依赖项构建它:我已经从 1.5中挑选出连接器的aws依赖项更改,并使用它们构建连接器。pom.xml看起来它按预期工作。


推荐阅读