首页 > 解决方案 > Spark Structured Streaming:在单独的工作线程上运行 Kafka 消费者

问题描述

所以我有一个 Spark 应用程序,它需要使用结构化流从两个 kafka 集群(Kafka A 和 B)读取两个流,并对这两个流进行一些连接和过滤。那么是否有可能有一个从 A 读取流的 Spark 作业,并consumer在每个读取 Kafka B 并将数据放入地图的工作人员上运行一个线程(称为)。所以稍后当我们过滤时,我们可以做类似的事情stream.filter(row => consumer.idNotInMap(row.id))

我对这种方法有一些疑问:

  1. 如果这种方法可行,当应用程序在集群上运行时会不会导致任何问题?

  2. 每个工作人员上的所有consumer实例都会在集群模式下收到相同的数据吗?或者我们甚至可以让每个consumer工作节点(可能由 Spark 控制)只在 Kafka 分区上监听?

  3. 实例将如何consumer被序列化并传递给工作人员?目前它们是在 Driver 节点上初始化的,但是有什么方法可以为每个工作节点初始化一次吗?

我觉得就我而言,我应该改用流加入。我已经尝试过了,但没有成功,这就是我采用这种方法的原因。它不起作用,因为来自 Kafka A 的流仅是附加的,而流 B 需要具有可以更新的状态,这使得它只能更新。然后 Spark 不支持加入追加和更新模式的流。

下面是一些伪代码:

// SparkJob.scala
val consumer = Consumer()
val getMetadata = udf(id => consumer.get(id))
val enrichedDataSet = stream.withColumn("metadata", getMetadata(stream("id"))
// Consumer.java
class Consumer implements Serializable {
  private final ConcurrentHashMap<Integer, String> metadata;

  public MetadataConsumer() {
    metadata = new ConcurrentHashMap<>();
    // read stream
    listen();
  }

  // process kafka data inside this loop 
  private void listen() {
    Thread t = new Thread(() -> {
      KafkaConsumer consumer = ...;
      while (consumer.hasNext()) {
        var message = consumer.next();
        // update metadata or put in new metadata
        metadata.put(message.id, message);
      }
    });
    t.start();
  }

  public String get(Integer key) {
    return metadata.get(key);
  }
}

标签: scalaapache-sparkapache-kafkaspark-structured-streaming

解决方案


推荐阅读