首页 > 解决方案 > DataStream Flink中从JDBC源读取数据的问题

问题描述

我正在启动一个新的 Flink 应用程序,以允许我的公司执行大量报告。我们有一个现有的遗留系统,其中大部分数据都保存在 SQL Server 数据库中。在开始使用新部署的 Kafka 流中的更多数据之前,我们首先需要使用这些数据库中的数据。

我花了很多时间阅读 Flink 书籍和网页,但我有一些简单的问题和假设希望你能帮助我进步。

首先,我想使用 DataStream API,这样我们既可以使用历史数据,也可以使用实时数据。我不认为我想使用 DataSet API,但我也没有看到使用 SQL/Table api 的意义,因为我更愿意在 Java 类中编写我的函数。我需要维护自己的状态,似乎 DataStream 键控功能是要走的路。

现在我正在尝试针对我们的生产数据库实际编写代码,我需要能够通过 SQL 查询读取数据的“流”——似乎没有 JDBC 源连接器,所以我认为我必须进行 JDBC 调用我自己,然后可能使用 env.fromElements() 创建一个数据源。显然这是一个“有界”数据集,但我还有什么意思要加载历史数据呢?将来我还想包含一个 Kafka 流,它只有几周的数据,所以我想我有时需要将来自 SQL Server/Snowflake 数据库的数据与来自 Kafka 流的实时流合并。什么是最好的做法,因为我没有看到讨论这个的例子。

通过从 JDBC 源检索数据,我还看到了一些使用 StreamingTableEnvironment 的示例 - 我是否打算以某种方式使用它而不是将数据从 JDBC 连接查询到我的 DataStream 函数等?同样,我想用 Java 而不是 Flink SQL 编写我的函数。如果我只使用 DataStream API,使用 StreamingTableEnvironment 查询 JDBC 数据是最佳做法吗?

标签: javajdbcapache-kafkaapache-flinkflink-streaming

解决方案


以下方法可用于从数据库中读取并创建数据流:

  1. 您可以使用 RichParallelSourceFunction 对数据库进行自定义查询并从中获取数据流。可以在 RichParallelSourceFunction 类的扩展中触发带有 JDBC 驱动程序的 SQL。

  2. 使用Table DataStream API - 可以通过创建 JDBC 目录来查询数据库,然后将其转换为流

  3. 对此的替代方案,可能是更昂贵的解决方案 - 您可以使用 Flink CDC 连接器,它为 Apache Flink 提供源连接器,使用变更数据捕获 (CDC) 从不同数据库获取变更

然后您可以将 Kafka 添加为源并获取数据流。

因此,简要来说,您的管道可能如下所示:您将两个源都转换为数据流,您可以使用例如协同处理功能加入这些流,这也将使您有可能维护状态并在业务逻辑中使用它。最后,使用 Sink 函数将您的最终输出发送到数据库、Kafka 甚至 AWS S3 存储桶。


推荐阅读