首页 > 解决方案 > Debezium 服务器在应用程序的处理程序方法中出现错误后停止连接器:意外的数据类型“null”,错误 =“{}”

问题描述

我正在使用 Debezium Server 将 CDC 数据从 Mysql 流式传输到 Kinesis。我创建了一个执行 sh 文件的 systemd 服务

cd /home/ubuntu/Debezium_new/target

if [ -z "$JAVA_HOME" ]; then
  JAVA_BINARY="java"
else
  JAVA_BINARY="$JAVA_HOME/bin/java"
fi

RUNNER=$(ls debezium-server-*runner.jar)

exec $JAVA_BINARY -Dlogging.level=TRACE $DEBEZIUM_OPTS $JAVA_OPTS -cp "$RUNNER:conf:lib/*" io.debezium.server.Main -e LOG_LEVE_TRACE

我在 systemd 日志中记录了以下错误,但我仍然可以看到 systemd 服务处于运行状态。

Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO  [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Stopping down connector
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO  [io.deb.con.mys.MySqlConnectorTask] (pool-3-thread-1) Stopping MySQL connector task
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO  [io.deb.con.mys.ChainedReader] (pool-3-thread-1) ChainedReader: Stopping the binlog reader
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO  [io.deb.con.mys.BinlogReader] (pool-3-thread-1) Discarding 0 unsent record(s) due to the connector shutting down
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,590 INFO  [io.deb.con.mys.BinlogReader] (xxx-xx-xx.xxxxxxxx.us-east-1.rds.amazonaws.com:3306) Stopped reading binlog after 158 events, last recorded offset: {ts_sec=1607783984, file=mysql-bin-changelog.104110, pos=52676, row=9, server_id=1432154115, event=2}
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,590 INFO  [io.deb.con.mys.BinlogReader] (pool-3-thread-1) Discarding 0 unsent record(s) due to the connector shutting down
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,600 INFO  [io.deb.con.mys.MySqlConnectorTask] (pool-3-thread-1) Connector task finished all work and is now shutdown
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,603 INFO  [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Stopped FileOffsetBackingStore
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,604 INFO  [io.deb.ser.ConnectorLifecycle] (pool-3-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Unexpected data type 'null'', error = '{}': io.debezium.DebeziumException: Unexpected data type 'null'
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.server.BaseChangeConsumer.getBytes(BaseChangeConsumer.java:64)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.server.kinesis.KinesisChangeConsumer.handleBatch(KinesisChangeConsumer.java:99)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:82)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:139)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at java.lang.Thread.run(Thread.java:748)

这种情况经常发生。我浏览了 Debezium 源代码以找出任何线索,这是我的观察结果 https://github.com/debezium/debezium/blob/5aaf4d420d983ada746d3f18c49775f318febae4/debezium-server/debezium-server-kinesis/src/main/java/io /debezium/server/kinesis/KinesisChangeConsumer.java

此外,每次运行此命令时,我都可以在该消息行中看到不同的输出。此外,一些数据正在流式传输并立即抛出错误。

Stopped reading binlog after 69 events, last recorded offset: {ts_sec=1607785979, file=mysql-bin-changelog.104117, pos=50978, row=3, server_id=1432154115, event=2}

我可以说 getBytes 方法实际上得到了一个空值,我什至不确定它是如何可能的

 @Override
    public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
            throws InterruptedException {
        for (ChangeEvent<Object, Object> record : records) {
            LOGGER.trace("Received event '{}'", record);
            final PutRecordRequest putRecord = PutRecordRequest.builder()
                    .partitionKey((record.key() != null) ? getString(record.key()) : nullKey)
                    .streamName(streamNameMapper.map(record.destination()))
                    .data(SdkBytes.fromByteArray(getBytes(record.value())))
                    .build();
            client.putRecord(putRecord);
            committer.markProcessed(record);
        }
        committer.markBatchFinished();
    }

https://github.com/debezium/debezium/blob/5aaf4d420d983ada746d3f18c49775f318febae4/debezium-server/debezium-server-core/src/main/java/io/debezium/server/BaseChangeConsumer.java

protected byte[] getBytes(Object object) {
        if (object instanceof byte[]) {
            return (byte[]) object;
        }
        else if (object instanceof String) {
            return ((String) object).getBytes();
        }
        throw new DebeziumException(unsupportedTypeMessage(object));
    }
protected String unsupportedTypeMessage(Object object) {
        final String type = (object == null) ? "null" : object.getClass().getName();
        return "Unexpected data type '" + type + "'";
    }

那么如何在没有人工干预的情况下重启连接器呢?另外,空值背后的原因是什么?

编辑 1:这就是 ChangeEvent<Object, Object> 记录的样子,这些与我尝试读取 binlog 数据时的删除事件有关

[key = {
        "schema": {
            "type": "struct",
            "fields": [{
                    "type": "int64",
                    "optional": false,
                    "field": "id"
                }
            ],
            "optional": false,
            "name": "XXX.XXX_XXXX.customer.Key"
        },
        "payload": {
            "id": 34390
        }
    }, value = null, sourceRecord = SourceRecord {
        sourcePartition = {
            server = XXXX
        },
        sourceOffset = {
            ts_sec = 1607860000,
            file = mysql - bin - changelog.104365,
            pos = 10732,
            row = 1,
            server_id = 1432154115,
            event = 2
        }
    }
    ConnectRecord {
        topic = 'XXX.XXX_XXXX.customer',
        kafkaPartition = null,
        key = Struct {
            id = 34390
        },
        keySchema = Schema {
            XXX.XXX_XXXX.customer.Key: STRUCT
        },
        value = null,
        valueSchema = null,
        timestamp = null,
        headers = ConnectHeaders(headers = )
    }
]

标签: debeziumcdc

解决方案


推荐阅读