jdbc - 使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题
问题描述
我已经为此苦苦挣扎了大约一个星期,现在试图获得一个简单的(3 个字段)AVRO 格式的 KSQL 表作为 JDBC 连接器接收器(mysql)的源
我收到以下错误(在 INFO 行之后):
[2018-12-11 18:58:50,678] INFO Setting metadata for table "DSB_ERROR_TABLE_WINDOWED" to Table{name='"DSB_ERROR_TABLE_WINDOWED"', columns=[Column{'MOD_CLASS', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'METHOD', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'COUNT', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2018-12-11 18:58:50,679] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DSB_ERROR_TABLE_WINDOWED
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:79)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:124)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:63)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我可以说接收器在提取模式时正在正确执行某些操作(请参见上面的错误之前的内容),并且在数据库中使用正确的模式成功创建了表:
MariaDB [dsb_errors_ksql]> describe DSB_ERROR_TABLE_WINDOWED;
+-----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-----------+--------------+------+-----+---------+-------+
| MOD_CLASS | varchar(256) | YES | | NULL | |
| METHOD | varchar(256) | YES | | NULL | |
| COUNT | bigint(20) | YES | | NULL | |
+-----------+--------------+------+-----+---------+-------+
3 rows in set (0.01 sec)
这是 KTABLE 的定义:
ksql> describe extended DSB_ERROR_TABLE_windowed;
Name : DSB_ERROR_TABLE_WINDOWED
Type : TABLE
Key field : KSQL_INTERNAL_COL_0|+|KSQL_INTERNAL_COL_1
Key format : STRING
Timestamp field : Not set - using <ROWTIME>
Value format : AVRO
Kafka topic : DSB_ERROR_TABLE_WINDOWED (partitions: 4, replication: 1)
Field | Type
---------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
MOD_CLASS | VARCHAR(STRING)
METHOD | VARCHAR(STRING)
COUNT | BIGINT
---------------------------------------
Queries that write into this TABLE
-----------------------------------
CTAS_DSB_ERROR_TABLE_WINDOWED_37 : create table DSB_ERROR_TABLE_windowed with (value_format='avro') as select mod_class, method, count(*) as count from DSB_ERROR_STREAM window session ( 60 seconds) group by mod_class, method having count(*) > 0;
在此表的模式注册表中自动生成了一个条目(但没有键条目):
{
"subject": "DSB_ERROR_TABLE_WINDOWED-value",
"version": 7,
"id": 143,
"schema": "{\"type\":\"record\",\"name\":\"KsqlDataSourceSchema\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"MOD_CLASS\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"METHOD\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"COUNT\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}
这是 Connect Worker 的定义:
{ "name": "dev-dsb-errors-mysql-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "DSB_ERROR_TABLE_WINDOWED",
"connection.url": "jdbc:mysql://os-compute-d01.maeagle.corp:32692/dsb_errors_ksql?user=xxxxxx&password=xxxxxx",
"auto.create": "true",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-d01.maeagle.corp:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
我的理解(可能是错误的)是 KSQL 应该在模式注册表中创建适当的 AVRO 模式,而 Kafka Connect 应该能够正确读取这些模式。正如我上面提到的,当在 Mysql 中生成适当的表时,有些东西正在工作,尽管我很惊讶没有创建一个关键字段......
大多数帖子和示例都使用 JSON 而不是 AVRO,因此它们并不是特别有用。
它似乎在阅读和插入主题记录的反序列化部分......
我在这一点上不知所措,可以使用一些指导。
我还通过 github 开了一张类似的票:
https://github.com/confluentinc/ksql/issues/2250
问候,
- 约翰
解决方案
正如上面 John 所说,主题记录中的 key 不是字符串,而是一个字符串,后缀为一个 Java 序列化的 64 位整数,表示窗口开始时间。
Connect 不附带可以处理窗口密钥格式的 SMT。但是,可以写一个来去除整数并返回自然键。然后,您可以将其包含在类路径中并更新您的连接配置。
如果您需要数据库中的窗口开始时间,则可以更新 ksqlDB 查询以将窗口开始时间作为字段包含在值中。
推荐阅读
- javascript - 如何将员工验证与 javascript 集成
- reactjs - × TypeError: Cannot read property 'props' of undefined 但我正在使用箭头函数,我尝试使用 bind(this) 但它并没有消失
- javascript - 这个地方的+是什么意思?
- ios - 一行上两个标签的字体大小
- flutter - Flutter:如何将数据添加到列表中?
- c# - 我正在使用带有复选框的隐藏 for 将字符串发送到属性,但即使未选中复选框也会发送它们
- javascript - 在这种情况下如何在 TypeScript 中的类型、类、函数类型之间进行选择?
- javascript - 使用 jQuery.each() 方法显示隐藏元素
- selenium - 如何使用 Selenium 和 Python 在 iframe 中单击元素
- flutter - Flutter 如何在使用 sqflite 的应用程序启动时填充 ListView?