scala - Spark Structured Streaming:在单独的工作线程上运行 Kafka 消费者
问题描述
所以我有一个 Spark 应用程序,它需要使用结构化流从两个 kafka 集群(Kafka A 和 B)读取两个流,并对这两个流进行一些连接和过滤。那么是否有可能有一个从 A 读取流的 Spark 作业,并consumer
在每个读取 Kafka B 并将数据放入地图的工作人员上运行一个线程(称为)。所以稍后当我们过滤时,我们可以做类似的事情stream.filter(row => consumer.idNotInMap(row.id))
?
我对这种方法有一些疑问:
如果这种方法可行,当应用程序在集群上运行时会不会导致任何问题?
每个工作人员上的所有
consumer
实例都会在集群模式下收到相同的数据吗?或者我们甚至可以让每个consumer
工作节点(可能由 Spark 控制)只在 Kafka 分区上监听?实例将如何
consumer
被序列化并传递给工作人员?目前它们是在 Driver 节点上初始化的,但是有什么方法可以为每个工作节点初始化一次吗?
我觉得就我而言,我应该改用流加入。我已经尝试过了,但没有成功,这就是我采用这种方法的原因。它不起作用,因为来自 Kafka A 的流仅是附加的,而流 B 需要具有可以更新的状态,这使得它只能更新。然后 Spark 不支持加入追加和更新模式的流。
下面是一些伪代码:
// SparkJob.scala
val consumer = Consumer()
val getMetadata = udf(id => consumer.get(id))
val enrichedDataSet = stream.withColumn("metadata", getMetadata(stream("id"))
// Consumer.java
class Consumer implements Serializable {
private final ConcurrentHashMap<Integer, String> metadata;
public MetadataConsumer() {
metadata = new ConcurrentHashMap<>();
// read stream
listen();
}
// process kafka data inside this loop
private void listen() {
Thread t = new Thread(() -> {
KafkaConsumer consumer = ...;
while (consumer.hasNext()) {
var message = consumer.next();
// update metadata or put in new metadata
metadata.put(message.id, message);
}
});
t.start();
}
public String get(Integer key) {
return metadata.get(key);
}
}
解决方案
推荐阅读
- angular - 如何使用 ToastrService 在 Angular 中显示服务器错误的 Toastr 通知
- python - 如何让机器人响应特定反应?不和谐.py
- jenkins - 如何在 jenkins 管道中使用 emailext-template 来获取模板?
- azure-devops - Azure Devops / Loop 并在单个 bash 脚本中显示所有模板参数
- r - 当这些行值出现在 R 中的 df2 中时,返回 df1 的行索引
- excel - 数据透视表度量不一致
- python - 在 Python 中查找缺少 .csv 文件的目录
- python - 无法解决“不允许的方法 - 请求的 URL 不允许该方法。” 错误
- c# - 单线程枚举有效,但多线程无效,不知道为什么
- android - 为什么要在安卓手机上发布Apk显示灰屏?