首页 > 解决方案 > ksqlDB Postgres 接收器主键

问题描述

我有一个使用 JDBC 连接器复制到 PostgreSQL Sink 的 ksqlDB 表:

CREATE TABLE IF NOT EXISTS engagements WITH (FORMAT='AVRO', PARTITIONS=1) AS
SELECT
    LATEST_BY_OFFSET(people.FirstName) AS FirstName,
    LATEST_BY_OFFSET(people.LastName) AS LastName,
    COUNT(interaction.InteractionId) AS TotalInteractions,
    LATEST_BY_OFFSET(interaction.Kind) AS EngagementType,
    LATEST_BY_OFFSET(content.Name) AS ContentName,
    interaction.ContentId AS ContentId,
    people.PersonId AS PersonId
FROM interaction
JOIN people
ON interaction.PersonId = people.PersonId
JOIN content
ON interaction.ContentId = content.ContentId
GROUP BY interaction.ContentId,
    people.PersonId
EMIT CHANGES;
ksql> DESCRIBE ENGAGEMENTS;

Name                 : ENGAGEMENTS
 Field             | Type                           
----------------------------------------------------
 CONTENTID         | VARCHAR(STRING)  (primary key) 
 PERSONID          | VARCHAR(STRING)  (primary key) 
 FIRSTNAME         | VARCHAR(STRING)                
 LASTNAME          | VARCHAR(STRING)                
 TOTALINTERACTIONS | BIGINT                         
 ENGAGEMENTTYPE    | VARCHAR(STRING)                
 CONTENTNAME       | VARCHAR(STRING)                
----------------------------------------------------

以下是连接器设置:

DROP CONNECTOR JDBC_SINK_POSTGRES_01;
CREATE SINK CONNECTOR JDBC_SINK_POSTGRES_01 WITH (
    'connector.class'                     = 'io.confluent.connect.jdbc.JdbcSinkConnector',
    'connection.url'                      = 'jdbc:postgresql://postgres:5432/',
    'connection.user'                     = 'postgres',
    'connection.password'                 = 'postgres',
    'topics'                              = 'ENGAGEMENTS',
    'key.converter'                       = 'io.confluent.connect.avro.AvroConverter',
    'key.converter.schema.registry.url'   = 'http://schema-registry:8081',
    'value.converter'                     = 'io.confluent.connect.avro.AvroConverter',
    'value.converter.schema.registry.url' = 'http://schema-registry:8081',
    'auto.create'                         = 'true',
    'auto.evolve'                         = 'true',
    'insert.mode'                         = 'upsert'
);

此设置一切正常,创建表并将数据复制到其中,除了复制的表没有主键。我尝试将以下行添加到该连接器设置但没有成功(没有创建表,没有复制):

    'pk.mode'                             = 'record_key',
    'pk.fields'                           = 'PERSONID,CONTENTID'

任何想法这里有什么不正确的?谢谢!

标签: postgresqlksqldb

解决方案


推荐阅读