首页 > 解决方案 > CDC debezium(postgres)未发布某些表的事件

问题描述

我已经使用以下脚本在 docker 网络中设置了 CDC 管道

  1. 动物园管理员

    --restart always \
    -p 2181:2181 \
    -p 2888:2888 \
    -p 3888:3888 \
    --network=test-net \
    -v /var/ibank/msdata/test/zookeper/config/zoo.cfg:/zookeper/conf/zoo.cfg \
    -v /var/ibank/msdata/test/zookeper/data/main-data:/zookeper/data \
    -v /var/ibank/msdata/test/zookeper/data/txns:/zookeper/txns \
    -v /var/ibank/msdata/test/zookeper/data/logs:/zookeper/logs \
    debezium/zookeeper:1.6 
    
    
  2. 卡夫卡

--name kafka \
--restart=always \
--network=test-net \
-p 9092:9092 \
-p 29092:29092 \
-e ADVERTISED_HOST_NAME=kafka \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT \
-e ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_LISTENERS=PLAINTEXT_HOST://0.0.0.0:29092,PLAINTEXT://kafka:9092 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 \
-v /var/ibank/msdata/test/kafka/data/kafka-data:/var/lib/kafka/data \
debezium/kafka:1.6
  1. 连接
--restart=always \
--net=test-net \
-p 8083:8083 \
-e BOOTSTRAP_SERVERS=kafka:9092 \
-e GROUP_ID=afr-group-test \
-e CONFIG_STORAGE_TOPIC=afr_storage_topic \
-e OFFSET_STORAGE_TOPIC=afr_offset_topic \
debezium/connect:1.6
  1. 使用以下配置添加连接器
{
    "name": "afr-connector",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "test_user",
      "database.password": "test_user",
      "database.dbname": "ibank",
      "database.server.name": "ibankserver1",
      "table.include.list": "private.sessions, private.transactions, private.failed_auth_attempts",
      "plugin.name": "pgoutput",
      "publication.autocreate.mode": "filtered",
      "snapshot.mode": "never",
      "heartbeat.interval.ms": 1000,
      "heartbeat.action.query": "update debezium_heartbeat set last_heartbeat_ts = now();"  
    }
  }  

Postgres 在同一个网络中,debezium 连接器成功连接到数据库。PG 用户拥有所有需要的权限(更新、选择、复制、登录)。

问题是我收到了事务和 failed_auth_attempts 表的惰性(这就是我所需要的),但会话表不起作用。

可能是什么问题?

这是表的架构

create table private.sessions
(
    id                        serial not null
        constraint sessions_pkey
            primary key,
    session_id                varchar(30),
    session_key               varchar(30),
    user_id                   integer
        constraint sessions_user_id_fkey
            references private.users,
    session_key_type          varchar(80),
    last_update_time          timestamp with time zone,
    ip_address                varchar,
    extradata                 jsonb,
    ip_address_location       varchar(30),
    auth_type                 varchar(30),
    user_agent_session_init   jsonb,
    last_used_ip_address      varchar(30),
    is_valid                  boolean,
    created_at                timestamp with time zone default now(),
    user_agent_session_actual jsonb,
    login_otp_sent_counter    integer                  default 0,
    otp_value                 varchar(8),
    restore_otp_sent_counter  integer                  default 0,
    firebase_token            varchar
);

alter table private.sessions
    owner to test_user;

grant select on private.sessions to test_user;

grant delete, insert, references, select, trigger, truncate, update on private.sessions to test_user;

标签: postgresqldockerapache-kafkadebezium

解决方案


事实证明,由于某些未知原因,debezium 并没有仅为“会话”表创建发布。删除连接器并重新创建它没有帮助,然后我手动删除了由 debezium 创建的所有发布,并为会话表重新创建它们。


推荐阅读