首页 > 解决方案 > 嵌入式 Debezium。试图查看偏移文件的格式。不工作

问题描述

我有这段代码用于将 debezium 连接到本地 mysql 容器。我正在尝试查看偏移文件的格式。这是我的代码:

package io.debezium.examples.kinesis;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.PutRecordRequest;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.relational.history.MemoryDatabaseHistory;
import io.debezium.util.Clock;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.core.JsonProcessingException;

/**
 * Demo for using the Debezium Embedded API to send change events to Amazon Kinesis.
 */
public class ChangeDataSender implements Runnable {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeDataSender.class);

    private static final String APP_NAME = "kinesis";
    private static final String KINESIS_REGION_CONF_NAME = "kinesis.region";

    private final Configuration config;
    private final JsonConverter valueConverter;
    private final AmazonKinesis kinesisClient;

    private final ObjectMapper mapper;

    public ChangeDataSender() {
        config = Configuration.empty().withSystemProperties(Function.identity()).edit()
                 .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.mysql.MySqlConnector")
                //                .with(EmbeddedEngine.CONNECTOR_CLASS, "io.debezium.connector.postgresql.PostgresConnector")
                .with(EmbeddedEngine.ENGINE_NAME, APP_NAME)
                .with(MySqlConnectorConfig.SERVER_NAME,APP_NAME)
                .with(MySqlConnectorConfig.SERVER_ID, 8192)

                // for demo purposes let's store offsets and history only in memory
                .with(EmbeddedEngine.OFFSET_STORAGE, "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                .with("offset.storage.file.filename",
                        "/offsetStoragePath/storage/offset.dat")
                .with("offset.flush.interval.ms", 60000)
                .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")//
                .with("database.history.file.filename", "/debezium/dbhistory.dat")//

                // Send JSON without schema
                // .with("schemas.enable", true)
                .with("schemas.enable", false)
                .with("database.dbname", "poc")
                .with("plugin.name", "wal2json")
                .build();

        valueConverter = new JsonConverter();
        valueConverter.configure(config.asMap(), false);

        final String regionName = "us-west-2";

        final AWSCredentialsProvider credentialsProvider = new ProfileCredentialsProvider("something");

        kinesisClient = AmazonKinesisClientBuilder.standard()
                .withCredentials(credentialsProvider)
                .withRegion(regionName)
                .build();
        mapper = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT);
    }

    @Override
    public void run() {
        final EmbeddedEngine engine = EmbeddedEngine.create()
                .using(config)
                .using(this.getClass().getClassLoader())
                .using(Clock.SYSTEM)
                .notifying(this::sendRecord)
                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            LOGGER.info("Requesting embedded engine to shut down");
            engine.stop();
        }));

        awaitTermination(executor);
    }

    private void awaitTermination(ExecutorService executor) {
        try {
            while (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                LOGGER.info("Waiting another 10 seconds for the embedded engine to shut down");
            }
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    private void sendRecord(SourceRecord record){

        // LOGGER.info(record.toString());
        // LOGGER.info(record.valueSchema().fields().toString());
        // LOGGER.info(record.keySchema());
        // LOGGER.info(record.key());
        // LOGGER.info(record.value());
        // LOGGER.info(record.valueSchema());
        LOGGER.info(record.topic());
        // LOGGER.info(record.timestamp());
        // try {
        //     String json =  mapper.writeValueAsString(record);
        //     LOGGER.info(json);
        // } catch (JsonProcessingException e) {
        //     LOGGER.info("Cannot print record in json format");
        // }

        // We are interested only in data events not schema change events
        // if (record.topic().equals(APP_NAME)) {
        //     return;
        // }

        Schema schema = null;

        // if ( null == record.keySchema() ) {
        //     LOGGER.error("The keySchema is missing. Something is wrong.");
        //     return;
        // }

        // For deletes, the value node is null
        if ( null != record.valueSchema() ) {
            schema = SchemaBuilder.struct()
                    // .field("key", record.keySchema())
                    .field("value", record.valueSchema())
                    .build();
        }
        // else{
        //     schema = SchemaBuilder.struct()
        //             .field("key", record.keySchema())
        //             .build();
        // }

        Struct message = new Struct(schema);
        // message.put("key", record.key());

        if ( null != record.value() )
            message.put("value", record.value());

        // String partitionKey = String.valueOf(record.key() != null ? record.key().hashCode() : -1);
        String partitionKey = String.valueOf(record.key() != null ? record.key().hashCode() : -1);
        LOGGER.info(String.format("topic : %s", record.topic()));
        final byte[] payload = valueConverter.fromConnectData("dummy", schema, message);

        PutRecordRequest putRecord = new PutRecordRequest();

        // putRecord.setStreamName(streamNameMapper(record.topic()));
        // putRecord.setStreamName("kinesis.inventory.customers");
        putRecord.setStreamName("kinesis.cscetbon.psql");
        putRecord.setPartitionKey(partitionKey);
        putRecord.setData(ByteBuffer.wrap(payload));

        System.out.println(payload);
    }

    private String streamNameMapper(String topic) {
        return topic;
    }

    public static void main(String[] args) {
        new ChangeDataSender().run();
    }
}

重要的是:

.with("offset.storage.file.filename",
                            "/offsetStoragePath/storage/offset.dat")
                    .with("offset.flush.interval.ms", 60000)
                    .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")//
                    .with("database.history.file.filename", 

除了将其打印到标准输出之外,我目前没有对日志做任何事情。

所以我用这个命令为 mysql 运行一个 docker 容器:

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9
Initializing database

我正在mysql中运行命令:

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'✔ 13:48 ~ $ docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

我无法生成偏移文件。知道我做错了什么吗?

标签: mysqldebezium

解决方案


是否有任何更改已处理?如果可以,请尝试使用io.debezium.embedded.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy.

您还确定运行 Debezium Embedded 的用户可以写入给定的路径吗?


推荐阅读