首页 > 解决方案 > kafka 从 Snowflake 将部分数据加载到主题中

问题描述

我们已经部署了 Kafka JDBC 源连接器来从雪花表/视图中提取数据。我们的连接器属性如下所示:

{

"name":"source-allocation-unit-snowflake",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"poll.interval.ms": "180000",

"connection.url":"jdbc:snowflake://aaaaaaaa.snowflakecomputing.com/?warehouse=MY_WH_XL&db=MY_DB&schema=STARSP_SRC_VIEWS&role=MY_ROLE&useProxy=true&proxyHost=aaa-aaa&proxyPort=99992&tracing=ALL",
"connection.user":"MYUSER",
"connection.password":"MYPASSWORDS",

"schema.pattern":"STARSP_SRC_VIEWS",
"catalog.pattern":"STARSP_SRC_VIEWS",

"topic.prefix":"sf-ALLOCATION_UNIT",
"query":"SELECT * from STARSP_SRC_VIEWS.V_SRC_COBO_AE_ALLOCATION_UNIT",

"mode":"timestamp",
"timestamp.column.name":"RCREATEDDATE",
"timestamp.initial":"1618869600000",
"validate.non.null":false,
"db.timezone":"Europe/Copenhagen",


"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable":"true",
"key.converter.schema.registry.url":"https://aaaqqqqqq.net:99999,https://aaaaaaeewwwaaaaa.net:9444444,https://aaaaasswwww.net:92222222",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schemas.enable":"true",
"value.converter.schema.registry.url":"https://aaaqqqqqq.net:99999,https://aaaaaaeewwwaaaaa.net:9444444,https://aaaaasswwww.net:92222222",


"transforms":"Cast,ValueToKey,SetSchemaMetadataValue,SetSchemaMetadataKey",
"transforms.Cast.type":"org.apache.kafka.connect.transforms.Cast$Value",
"transforms.Cast.spec":"CATOR:boolean",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields":"PDATE,SCOUNTRY,AID",
"transforms.SetSchemaMetadataValue.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
"transforms.SetSchemaMetadataValue.schema.name":"com.my.cobo.avro.AllocationUnit",
"transforms.SetSchemaMetadataKey.type":"org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
"transforms.SetSchemaMetadataKey.schema.name":"com.my.cobo.avro.AssetKey"

}

这里的一切看起来都很好。但是我们面临以下两种意想不到的行为:

  1. 即使我们在源表上执行“SELECT *”,每次它只将一个国家数据加载到表中。在 Source Table 中,每天 3 个国家/地区的数据会以 5 -10 分钟的间隔一一加载。所有数据都应由连接器选择并加载到主题中。但是Kafka 连接器仅将第一个国家/地区数据加载到主题中,而不是获取接下来的两个国家/地区数据。 我们检查了 Kafka 在 Snowflake 之上运行的准备好的语句,如下所示:

    “SELECT * from STARSP_SRC_VIEWS.V_SRC_COBO_AE_ALLOCATION_UNIT WHERE "RECORD_CREATED_DATE" > ? AND "RECORD_CREATED_DATE" < ? ORDER BY "RECORD_CREATED_DATE" ASC "

注意:我们在连接器中提供的初始时间戳是 4 月的日期。所以每次连接器都应该拉取最近加载的表中的所有数据。

  1. Snowflake 和 Kafka 之间的数据移动非常缓慢。拉动大约 1.8lL 记录大约需要 30 分钟。通过 Teradata 源连接器在 teradata 中提取相同的数据几乎需要 1 分钟。在 teradata souce 连接器中,我们有完全相同的参数,但在 url、用户/密码、连接器类方面略有变化,有什么可以提高性能的吗?

有人可以提出以上两种解决方案的任何解决方案吗?

标签: apache-kafkasnowflake-cloud-data-platformapache-kafka-connectconfluent-platform

解决方案


推荐阅读