java - Transformer Kafka 中的 ManagedChannel 线程安全吗
问题描述
这是我的变压器:
public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {
private ManagedChannel channel;
private InfoClient infoclient;
private LRUCacheCollector < String,
InfoResponse > cache;
public DataEnricher() {}
@Override
public void init(ProcessorContext context) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
infoclient = new InfoClient(channel);
}
@Override
public KeyValue < byte[],
EnrichedData > transform(byte[] key, EnrichedData request) {
InfoResponse infoResponse = null;
String someInfo = request.getSomeInfo();
try {
infoResponse = infoclient.getMoreInfo(someInfo);
} catch (Exception e) {
logger.warn("An exception has occurred during retrieval.", e.getMessage());
}
EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
return new KeyValue < > (key, enrichedData);
}
@Override
public KeyValue < byte[],
DataEnricher > punctuate(long timestamp) {
return null;
}
@Override
public void close() {
client.shutdown();
}
}
在 Kafka Streams 中,每个流线程初始化它自己的流拓扑副本,然后根据 ProcessorContext(即每个任务,即每个分区)实例化该拓扑。所以不会init()
被调用并覆盖/泄漏每个分区的通道,并且由于我们有多个线程,甚至可以竞争创建channel/client
? 有没有办法防止这种情况?
这在方法中被调用run()
:
public KafkaStreams createStreams() {
final Properties streamsConfiguration = new Properties();
//other configuration is setup here
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(
StreamsConfig.NUM_STREAM_THREADS_CONFIG,
3);
StreamsBuilder streamsBuilder = new StreamsBuilder();
RequestJsonSerde requestSerde = new RequestJsonSerde();
DataEnricher dataEnricher = new DataEnricher();
// Get the stream of requests
final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
.stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
.filter((key, request) - > {
return Objects.nonNull(request);
}
.transform(() - > dataEnricher);
enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));
return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}
解决方案
我假设每个拓扑(或)TransformerSupplier
创建一个Transformer
实例,因此每个拓扑创建一个。在这种情况下,没有被覆盖的危险。另外我假设你也关闭了它的频道。ProcessorContext
channel
channel
client.shutdown()
推荐阅读
- gradle - Gradle:让依赖的版本依赖于传递依赖的版本
- ios - 正则表达式以@开头,以空格结尾
- java - JUnit 5 模拟 YearMonth.now 但为 YearMonth.from 调用真实方法
- envoyproxy - Envoy 如何支持随机采样但稳定的跟踪?
- colors - 创建带有彩色字形(表情符号)的字体
- wpf - 为什么 UpdateSourceTrigger=Propertychanged 在 IntegerUpdown 中无法正常工作?
- laravel - Laravel+Vue webpack 从特定路径加载块
- python - 如何让我的 pygame konami 代码更简单?
- javascript - 如果将新值添加到 Angular 的输入中,如何刷新下拉列表?
- c# - 命中 API -RestSharp 时无法连接到远程服务器