debezium - Debezium 服务器在应用程序的处理程序方法中出现错误后停止连接器:意外的数据类型“null”,错误 =“{}”
问题描述
我正在使用 Debezium Server 将 CDC 数据从 Mysql 流式传输到 Kinesis。我创建了一个执行 sh 文件的 systemd 服务
cd /home/ubuntu/Debezium_new/target
if [ -z "$JAVA_HOME" ]; then
JAVA_BINARY="java"
else
JAVA_BINARY="$JAVA_HOME/bin/java"
fi
RUNNER=$(ls debezium-server-*runner.jar)
exec $JAVA_BINARY -Dlogging.level=TRACE $DEBEZIUM_OPTS $JAVA_OPTS -cp "$RUNNER:conf:lib/*" io.debezium.server.Main -e LOG_LEVE_TRACE
我在 systemd 日志中记录了以下错误,但我仍然可以看到 systemd 服务处于运行状态。
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO [io.deb.con.com.BaseSourceTask] (pool-3-thread-1) Stopping down connector
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO [io.deb.con.mys.MySqlConnectorTask] (pool-3-thread-1) Stopping MySQL connector task
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO [io.deb.con.mys.ChainedReader] (pool-3-thread-1) ChainedReader: Stopping the binlog reader
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,586 INFO [io.deb.con.mys.BinlogReader] (pool-3-thread-1) Discarding 0 unsent record(s) due to the connector shutting down
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,590 INFO [io.deb.con.mys.BinlogReader] (xxx-xx-xx.xxxxxxxx.us-east-1.rds.amazonaws.com:3306) Stopped reading binlog after 158 events, last recorded offset: {ts_sec=1607783984, file=mysql-bin-changelog.104110, pos=52676, row=9, server_id=1432154115, event=2}
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,590 INFO [io.deb.con.mys.BinlogReader] (pool-3-thread-1) Discarding 0 unsent record(s) due to the connector shutting down
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,600 INFO [io.deb.con.mys.MySqlConnectorTask] (pool-3-thread-1) Connector task finished all work and is now shutdown
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,603 INFO [org.apa.kaf.con.sto.FileOffsetBackingStore] (pool-3-thread-1) Stopped FileOffsetBackingStore
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: 2020-12-12 14:39:44,604 INFO [io.deb.ser.ConnectorLifecycle] (pool-3-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: Unexpected data type 'null'', error = '{}': io.debezium.DebeziumException: Unexpected data type 'null'
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.server.BaseChangeConsumer.getBytes(BaseChangeConsumer.java:64)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.server.kinesis.KinesisChangeConsumer.handleBatch(KinesisChangeConsumer.java:99)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:82)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:171)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at io.debezium.server.DebeziumServer.lambda$start$1(DebeziumServer.java:139)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Dec 12 14:39:44 ip-172-31-71-33 bash[2929]: #011at java.lang.Thread.run(Thread.java:748)
这种情况经常发生。我浏览了 Debezium 源代码以找出任何线索,这是我的观察结果 https://github.com/debezium/debezium/blob/5aaf4d420d983ada746d3f18c49775f318febae4/debezium-server/debezium-server-kinesis/src/main/java/io /debezium/server/kinesis/KinesisChangeConsumer.java
此外,每次运行此命令时,我都可以在该消息行中看到不同的输出。此外,一些数据正在流式传输并立即抛出错误。
Stopped reading binlog after 69 events, last recorded offset: {ts_sec=1607785979, file=mysql-bin-changelog.104117, pos=50978, row=3, server_id=1432154115, event=2}
我可以说 getBytes 方法实际上得到了一个空值,我什至不确定它是如何可能的
@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Received event '{}'", record);
final PutRecordRequest putRecord = PutRecordRequest.builder()
.partitionKey((record.key() != null) ? getString(record.key()) : nullKey)
.streamName(streamNameMapper.map(record.destination()))
.data(SdkBytes.fromByteArray(getBytes(record.value())))
.build();
client.putRecord(putRecord);
committer.markProcessed(record);
}
committer.markBatchFinished();
}
protected byte[] getBytes(Object object) {
if (object instanceof byte[]) {
return (byte[]) object;
}
else if (object instanceof String) {
return ((String) object).getBytes();
}
throw new DebeziumException(unsupportedTypeMessage(object));
}
protected String unsupportedTypeMessage(Object object) {
final String type = (object == null) ? "null" : object.getClass().getName();
return "Unexpected data type '" + type + "'";
}
那么如何在没有人工干预的情况下重启连接器呢?另外,空值背后的原因是什么?
编辑 1:这就是 ChangeEvent<Object, Object> 记录的样子,这些与我尝试读取 binlog 数据时的删除事件有关
[key = {
"schema": {
"type": "struct",
"fields": [{
"type": "int64",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "XXX.XXX_XXXX.customer.Key"
},
"payload": {
"id": 34390
}
}, value = null, sourceRecord = SourceRecord {
sourcePartition = {
server = XXXX
},
sourceOffset = {
ts_sec = 1607860000,
file = mysql - bin - changelog.104365,
pos = 10732,
row = 1,
server_id = 1432154115,
event = 2
}
}
ConnectRecord {
topic = 'XXX.XXX_XXXX.customer',
kafkaPartition = null,
key = Struct {
id = 34390
},
keySchema = Schema {
XXX.XXX_XXXX.customer.Key: STRUCT
},
value = null,
valueSchema = null,
timestamp = null,
headers = ConnectHeaders(headers = )
}
]
解决方案
推荐阅读
- ios - 在城市飞艇门户上更改捆绑包 ID
- c# - 直接在游戏中绘制,而不是在屏幕中
- angular - 在辅助路由中访问子节点:Angular
- android - 调用 onShowFileChooser() 时如何从 FileChooserParams 中提取值?
- apache-kafka - 如何在@KafkaListeners 中将 groupId 设置为 null
- php - 在选项标签中使用三元运算符动态设置选定值
- environment-variables - Cloud Run 中的环境变量引用 Google Cloud 项目 ID
- node.js - 如果流未预先通过管道传输,则不会发出 ReadableStream 的“数据”事件
- angular - 找不到“管道”编号
- symfony - symfony 3.4 学说 COUNT