postgresql - ksqlDB Postgres 接收器主键
问题描述
我有一个使用 JDBC 连接器复制到 PostgreSQL Sink 的 ksqlDB 表:
CREATE TABLE IF NOT EXISTS engagements WITH (FORMAT='AVRO', PARTITIONS=1) AS
SELECT
LATEST_BY_OFFSET(people.FirstName) AS FirstName,
LATEST_BY_OFFSET(people.LastName) AS LastName,
COUNT(interaction.InteractionId) AS TotalInteractions,
LATEST_BY_OFFSET(interaction.Kind) AS EngagementType,
LATEST_BY_OFFSET(content.Name) AS ContentName,
interaction.ContentId AS ContentId,
people.PersonId AS PersonId
FROM interaction
JOIN people
ON interaction.PersonId = people.PersonId
JOIN content
ON interaction.ContentId = content.ContentId
GROUP BY interaction.ContentId,
people.PersonId
EMIT CHANGES;
ksql> DESCRIBE ENGAGEMENTS;
Name : ENGAGEMENTS
Field | Type
----------------------------------------------------
CONTENTID | VARCHAR(STRING) (primary key)
PERSONID | VARCHAR(STRING) (primary key)
FIRSTNAME | VARCHAR(STRING)
LASTNAME | VARCHAR(STRING)
TOTALINTERACTIONS | BIGINT
ENGAGEMENTTYPE | VARCHAR(STRING)
CONTENTNAME | VARCHAR(STRING)
----------------------------------------------------
以下是连接器设置:
DROP CONNECTOR JDBC_SINK_POSTGRES_01;
CREATE SINK CONNECTOR JDBC_SINK_POSTGRES_01 WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'connection.url' = 'jdbc:postgresql://postgres:5432/',
'connection.user' = 'postgres',
'connection.password' = 'postgres',
'topics' = 'ENGAGEMENTS',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'key.converter.schema.registry.url' = 'http://schema-registry:8081',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url' = 'http://schema-registry:8081',
'auto.create' = 'true',
'auto.evolve' = 'true',
'insert.mode' = 'upsert'
);
此设置一切正常,创建表并将数据复制到其中,除了复制的表没有主键。我尝试将以下行添加到该连接器设置但没有成功(没有创建表,没有复制):
'pk.mode' = 'record_key',
'pk.fields' = 'PERSONID,CONTENTID'
任何想法这里有什么不正确的?谢谢!
解决方案
推荐阅读
- c++ - 如果基类具有在成员函数中修改的私有数据成员,派生类如何使用该函数?
- java - Integer.Parse 在 excel 获取的字符串上抛出 NumberFormatException
- php - 从 PHP5 更改 PHP7
- java - 修补 java.base 会导致 java.lang.LinkageError
- javascript - 使用用户在脚本中选择的文件的结果
- ios - 如何在 Swift 中使用 Alamofire 拍照并通过网络发送?
- java - 设计适用于所有租户的 camunda 通用流程
- reactjs - 如何使用从 React 中的 API 获取的数据打开 Bootstrap 模式?
- css - 如何在上按钮和下面板接触的空间去除边框以使其看起来像合并?
- ruby-on-rails - 整数模型属性在 assert_equal 调用中转换为字符串