apache-kafka - kafka 从 Snowflake 将部分数据加载到主题中
问题描述
我们已经部署了 Kafka JDBC 源连接器来从雪花表/视图中提取数据。我们的连接器属性如下所示:
{
"name":"source-allocation-unit-snowflake",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"poll.interval.ms": "180000",
"connection.url":"jdbc:snowflake://aaaaaaaa.snowflakecomputing.com/?warehouse=MY_WH_XL&db=MY_DB&schema=STARSP_SRC_VIEWS&role=MY_ROLE&useProxy=true&proxyHost=aaa-aaa&proxyPort=99992&tracing=ALL",
"connection.user":"MYUSER",
"connection.password":"MYPASSWORDS",
"schema.pattern":"STARSP_SRC_VIEWS",
"catalog.pattern":"STARSP_SRC_VIEWS",
"topic.prefix":"sf-ALLOCATION_UNIT",
"query":"SELECT * from STARSP_SRC_VIEWS.V_SRC_COBO_AE_ALLOCATION_UNIT",
"mode":"timestamp",
"timestamp.column.name":"RCREATEDDATE",
"timestamp.initial":"1618869600000",
"validate.non.null":false,
"db.timezone":"Europe/Copenhagen",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"true",
"key.converter.schema.registry.url":"https://aaaqqqqqq.net:99999,https://aaaaaaeewwwaaaaa.net:9444444,https://aaaaasswwww.net:92222222",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable":"true",
"value.converter.schema.registry.url":"https://aaaqqqqqq.net:99999,https://aaaaaaeewwwaaaaa.net:9444444,https://aaaaasswwww.net:92222222",
"transforms":"Cast,ValueToKey,SetSchemaMetadataValue,SetSchemaMetadataKey",
"transforms.Cast.type":"org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec":"CATOR:boolean",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields":"PDATE,SCOUNTRY,AID",
"transforms.SetSchemaMetadataValue.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadataValue.schema.name":"com.my.cobo.avro.AllocationUnit",
"transforms.SetSchemaMetadataKey.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
"transforms.SetSchemaMetadataKey.schema.name":"com.my.cobo.avro.AssetKey"
}
这里的一切看起来都很好。但是我们面临以下两种意想不到的行为:
即使我们在源表上执行“SELECT *”,每次它只将一个国家数据加载到表中。在 Source Table 中,每天 3 个国家/地区的数据会以 5 -10 分钟的间隔一一加载。所有数据都应由连接器选择并加载到主题中。但是Kafka 连接器仅将第一个国家/地区数据加载到主题中,而不是获取接下来的两个国家/地区数据。 我们检查了 Kafka 在 Snowflake 之上运行的准备好的语句,如下所示:
“SELECT * from STARSP_SRC_VIEWS.V_SRC_COBO_AE_ALLOCATION_UNIT WHERE "RECORD_CREATED_DATE" > ? AND "RECORD_CREATED_DATE" < ? ORDER BY "RECORD_CREATED_DATE" ASC "
注意:我们在连接器中提供的初始时间戳是 4 月的日期。所以每次连接器都应该拉取最近加载的表中的所有数据。
- Snowflake 和 Kafka 之间的数据移动非常缓慢。拉动大约 1.8lL 记录大约需要 30 分钟。通过 Teradata 源连接器在 teradata 中提取相同的数据几乎需要 1 分钟。在 teradata souce 连接器中,我们有完全相同的参数,但在 url、用户/密码、连接器类方面略有变化,有什么可以提高性能的吗?
有人可以提出以上两种解决方案的任何解决方案吗?
解决方案
推荐阅读
- mongodb - 如何在 Stitch 注册时将用户数据链接到他未来的帐户
- javascript - 用于登录 React 的中间件,类似于 Multer for Express JS
- python - 在板上搜索坐标更好的矩形
- google-apps-script - 即使在添加更多行后仍保持单元格之间的关系
- regex - 机器人框架中的多行文本提取
- python - 来自 CSV 的 X 轴标签?
- python - 如何创建一个充当 Python 脚本 GUI 的网站
- android - Xamarin Android LinearLayout - 一页上有 2 个 RecyclerViews 的问题
- sql - 历史表上的 SQL 语句
- javascript - 在 React 中传递 refs