首页 > 解决方案 > Flink 有异步源码功能吗?

问题描述

我的源函数具有频率控制,而我可以调整将数据刷新到下一个操作员的数据速率。我正在使用 Prometheus+Grafana 测量每个操作员的数据速率。然后我开始以 100 rec/sec 的速度生成数据。在 grafana 仪表板上显示大约 90 记录/秒。然后我将数据速率提高到 200 记录/秒。但是,Grafana 仪表板实际上显示 12 rec/sec。我想象背压正在保存数据。但是 Flink 仪表板没有显示我有背压。

因此,当检查 Flink 代码时,StreamSourceContexts.collect(T element)那里有一个同步块。我想它是为了确保事件的有序性。但是,如果我StreamSourceContexts.collect(T element)使用 Future 调用我的 SourceFunction 内部怎么办?我会在事件中体验到乱序吗?是否有允许我以异步方式推送事件的源函数?

    @Override
    public void collect(T element) {
        synchronized (lock) {
            output.collect(reuse.replace(element));
        }
    }

我的源函数:

public class OrdersSource extends RichSourceFunction<Order> {
    @Override
    public void run(SourceContext<Order> sourceContext) {
        try {
            while (running) {
                generateOrderItem(sourceContext);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void generateOrderItem(SourceContext<Order> sourceContext) {
        try {
            InputStream stream = new FileInputStream(dataFilePath);
            BufferedReader reader = new BufferedReader(new InputStreamReader(stream, StandardCharsets.UTF_8));

            long startTime = System.nanoTime();
            String line = reader.readLine();
            while (line != null) {
                // I would like to put an async thread here
                // Thread newThread = new Thread(() -> {
                //     sourceContext.collect(getOrderItem(line));
                // });
                // newThread.start();
                sourceContext.collect(getOrderItem(line));

                // sleep in nanoseconds to have a reproducible data rate for the data source
                this.dataRateListener.busySleep(startTime);

                // get start time and line for the next iteration
                startTime = System.nanoTime();
                line = reader.readLine();
            }
            reader.close();
            reader = null;
            stream.close();
            stream = null;
        } catch (FileNotFoundException e) {
            System.err.println("Please make sure they are available at [" + dataFilePath + "].");
            System.err.println(
                    " Follow the instructions at [https://docs.deistercloud.com/content/Databases.30/TPCH%20Benchmark.90/Data%20generation%20tool.30.xml?embedded=true] in order to download and create them.");
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

在此处输入图像描述

标签: javaasynchronousapache-flink

解决方案


推荐阅读