首页 > 解决方案 > Flink 使用 Java 中的自定义类设置基本的 Kafka 生产者消费者

问题描述

我想在 Kafka 上使用Flink设置一个基本的生产者-消费者,但我无法通过 Java 向现有消费者生成数据

命令行解决方案

  1. 我使用命令设置了一个Kafka broker使用kafka_2.11-2.4.0ziphttps://kafka.apache.org/downloads

    bin/zookeeper-server-start.sh config/zookeeper.properties

    bin/kafka-server-start.sh config/server.properties

  2. 我使用创建了一个名为 transactions1 的主题

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions1

    现在我可以在命令行上使用生产者和消费者来查看主题是否已创建并正常工作。

  3. 要设置消费者我运行

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions1 --from-beginning

    现在,如果任何生产者向主题发送数据,transactions1我将在消费者控制台中看到它。

    我通过运行测试消费者是否正在工作

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic transactions1

    并在 cli 中的生产者中输入以下数据行,这些数据行也显示在消费者 cli 中。

{"txnID":1,"amt":100.0,"account":"AC1"}

{"txnID":2,"amt":10.0,"account":"AC2"}

{"txnID":3,"amt":20.0,"account":"AC3"}

现在我想在 Java 代码中复制第 3 步,即生产者和消费者,这是这个问题的核心问题。

  1. 所以我用 build.gradle 设置了一个 gradle java8 项目
...
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
    compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
    // https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
    compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
    compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
    compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
    compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
    compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
    compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
  1. 我设置了一个自定义类,您可以通过扩展与FlinkTransactions.class相关的类来建议使用 Kryo、Protobuf 或 TbaseSerializer 对序列化逻辑进行更改。
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;

public class Transaction {
    public final int txnID;
    public final float amt;
    public final String account;

    public Transaction(int txnID, float amt, String account) {
        this.txnID = txnID;
        this.amt = amt;
        this.account = account;
    }


    public String toJSONString() {
        Gson gson = new Gson();
        return gson.toJson(this);
    }

    public static Transaction fromJSONString(String some) {
        Gson gson = new Gson();
        return gson.fromJson(some, Transaction.class);
    }

    public static MapFunction<String, String> mapTransactions() {
        MapFunction<String, String> map = new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value != null || value.trim().length() > 0) {
                    try {
                        return fromJSONString(value).toJSONString();
                    } catch (Exception e) {
                        return "";
                    }
                }
                return "";
            }
        };
        return map;
    }

    @Override
    public String toString() {
        return "Transaction{" +
                "txnID=" + txnID +
                ", amt=" + amt +
                ", account='" + account + '\'' +
                '}';
    }
}
  1. 现在是时候使用 Flink 在 topic 上生产和消费流​​了transactions1
public class SetupSpike {
    public static void main(String[] args) throws Exception {
        System.out.println("begin");
        List<Transaction> txns = new ArrayList<Transaction>(){{
            add(new Transaction(1, 100, "AC1"));
            add(new Transaction(2, 10, "AC2"));
            add(new Transaction(3, 20, "AC3"));
        }};
        // This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray 
        //via producer and then to the topic in Kafka broker 
        //and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.
        
        String topic = "transactions1";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", topic);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);

        // working Consumer logic below which needs edit if you change serialization
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
        myConsumer.setStartFromEarliest();     // start from the earliest record possible
        DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);
       
        //working Producer logic below which works if you are sinking a pre-existing DataStream
        //but needs editing to work with Java List<Transaction> datatype.
        System.out.println("sinking expanded stream");
        MapFunction<String, String> etl = new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value != null || value.trim().length() > 0) {
                    try {
                        return fromJSONString(value).toJSONString();
                    } catch (Exception e) {
                        return "";
                    }
                }
                return "";
            }
        };
        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                        try {
                            System.out.println(element);
                            return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return null;
                    }
                }, properties, Semantic.EXACTLY_ONCE);
//        stream.timeWindowAll(Time.minutes(1));
        stream.addSink(myProducer);
        JobExecutionResult execute = env.execute();

    }
}

如您所见,我无法使用提供的列表执行此操作txns。以上是我可以从 Flink 文档中收集的工作代码,用于重定向主题流数据并通过 Cli 生产者手动发送数据。问题是用 java 编写 KafkaProducer 代码,将数据发送到主题,这进一步加剧了诸如

  1. 添加时间戳、水印
  2. KeyBy 操作
  3. GroupBy/WindowBy 操作
  4. 在 Sinking 之前添加自定义 ETL 逻辑。
  5. Flink 中的序列化/反序列化逻辑

使用过 Flink 的人可以帮助我如何在 Flink 中生成主题txns列表transactions1,然后验证它是否适用于消费者?此外,在下沉之前添加时间戳或一些处理的问题上的任何帮助都会有很大帮助。您可以在https://github.com/devssh/kafkaFlinkSpike上找到源代码,其目的是生成 Flink 样板以从内存存储中添加“AC1”的详细信息,并将其与实时发送的 Transaction 事件一起加入扩展输出给用户。

标签: serializationapache-kafkadeserializationapache-flinkflink-streaming

解决方案


几点,不分先后:

最好不要像您在此处所做的那样将 Flink 版本 1.9.2 与版本 1.9.0 混合在一起:

compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'

有关如何使用时间戳、水印、keyBy、windows 等的教程,请参阅Ververica 的在线培训材料

List<Transaction> txns用作输入流,您可以这样做(文档):

DataStream<Transaction> transactions = env.fromCollection(txns);

有关在使用 Flink 和 Kafka 时如何处理序列化/反序列化的示例,请参阅Flink Operations Playground,特别是查看ClickEventDeserializationSchemaand ClickEventStatisticsSerializationSchema,它们在ClickEventCount.java中使用并在此处定义。(注意:这个 Playground 还没有针对 Flink 1.10 进行更新。)


推荐阅读