java - Flink 有异步源码功能吗?
问题描述
我的源函数具有频率控制,而我可以调整将数据刷新到下一个操作员的数据速率。我正在使用 Prometheus+Grafana 测量每个操作员的数据速率。然后我开始以 100 rec/sec 的速度生成数据。在 grafana 仪表板上显示大约 90 记录/秒。然后我将数据速率提高到 200 记录/秒。但是,Grafana 仪表板实际上显示 12 rec/sec。我想象背压正在保存数据。但是 Flink 仪表板没有显示我有背压。
因此,当检查 Flink 代码时,StreamSourceContexts.collect(T element)
那里有一个同步块。我想它是为了确保事件的有序性。但是,如果我StreamSourceContexts.collect(T element)
使用 Future 调用我的 SourceFunction 内部怎么办?我会在事件中体验到乱序吗?是否有允许我以异步方式推送事件的源函数?
@Override
public void collect(T element) {
synchronized (lock) {
output.collect(reuse.replace(element));
}
}
我的源函数:
public class OrdersSource extends RichSourceFunction<Order> {
@Override
public void run(SourceContext<Order> sourceContext) {
try {
while (running) {
generateOrderItem(sourceContext);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void generateOrderItem(SourceContext<Order> sourceContext) {
try {
InputStream stream = new FileInputStream(dataFilePath);
BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));
long startTime = System.nanoTime();
String line = reader.readLine();
while (line != null) {
// I would like to put an async thread here
// Thread newThread = new Thread(() -> {
// sourceContext.collect(getOrderItem(line));
// });
// newThread.start();
sourceContext.collect(getOrderItem(line));
// sleep in nanoseconds to have a reproducible data rate for the data source
this.dataRateListener.busySleep(startTime);
// get start time and line for the next iteration
startTime = System.nanoTime();
line = reader.readLine();
}
reader.close();
reader = null;
stream.close();
stream = null;
} catch (FileNotFoundException e) {
System.err.println("Please make sure they are available at [" + dataFilePath + "].");
System.err.println(
" Follow the instructions at [https://docs.deistercloud.com/content/Databases.30/TPCH%20Benchmark.90/Data%20generation%20tool.30.xml?embedded=true] in order to download and create them.");
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
解决方案
推荐阅读
- python - 如何在 django manytomany m2m_changed 中删除多个实例
- audiokit - AK麦克风追踪器。频率不准确(AudioKit 4.10.1)
- swiftui - 在 UITextField 与 UIViewRepresentable 和 SwifUI 之间绑定文本时出现问题
- flutter - fromJson 返回空值
- rest - Tomcat 阻止 PUT 请求
- python - 如何使用 mojang api 在 python 中获取 uuid?
- asp.net - 从 .aspx 页面抓取数据
- github - Elixir/mix: ** (停止) :eacces on Windows
- python - 无法从 XML 解析 BS 标签
- android - 非活动 InputConnection 上的 getTextBeforeCursor/getSelectedText/getTextAfterCursor