首页 > 解决方案 > Spark Streaming Kafka 问题

问题描述

我是 Spark 和 Kafka 集成的新手,我遇到了奇怪的问题。

代码在本地机器上运行良好,当我在生产环境中运行时,数据没有写入 elastisearch。

我不确定数据是否正在从 kafka 读取并解析以将其写入 elasticsearch。我无法使用 println 语句验证数据,因为我在集群模式下运行并且看不到 DStream [String]。这是我如何定义并将变量传递给它的 DStream。

val kafkaParams = Map(
      "bootstrap.servers" -> config.getString("bootstrap.servers"),
      "zookeeper.connect" -> config.getString("zookeeper.connect"),
      "zookeeper.connection.timeout.ms" -> config.getString("zookeeper.connection.timeout.ms"),
      "security.protocol" -> config.getString("security.protocol"),
      "auto.offset.reset" -> config.getString("auto.offset.reset"),
      "group.id" -> config.getString("consumer.group.id")

    )
var message_raw = KafkaUtils.createStream(ssc,kafkaParams,topicConfiguration).map(row=> row._2)

由于它在测试环境中工作,我不确定我在生产集群模式下缺少什么。

请提供任何可能对我的调查有帮助的想法。

谢谢,巴布

标签: apache-sparkelasticsearchapache-kafka

解决方案


推荐阅读