首页 > 解决方案 > 如何在 KSQLDB 查询的行中使用 protobuf 反序列化?

问题描述

我试图在我的 KSQLDB 应用程序中用 Java 标准化反序列化,但我很难理解如何处理RowKSQLDB 类型返回的Client类型。Ex(删除尝试/捕获):

    import io.confluent.ksql.api.client.Client;
    import io.confluent.ksql.api.client.BatchedQueryResult;

    Client ksqldbClient = kafkaService.getKSQLDBClient();
    String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
    BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
    List<Row> rows = query.get();

我的 KSQLDB 表配置为使用 protobuf 序列化,但似乎Row类型是 JSON?我只能通过以下方式获取其数据:

    for (Row row : rows) {
        String json = row.asObject().toJsonString();
        // Deserialize json string
        ...
    }

KSQLDB 客户端是否仅自己处理 protobuf 反序列化?有没有办法只获取 protobuf 字节,这样我就可以将它传递到我已经定义的 Protobuf 反序列化器中,这样我就不需要编写 JSON 反序列化器了?

标签: javaapache-kafkaprotocol-buffersksqldb

解决方案


row.asObject()返回一个已经反序列化的 KsqlObject,其操作与 JDBC ResultSet 类似,因为您可以在其上为行中的类型调用各种 get 方法。

如果您想映射到从 Protobuf 生成的特定域对象,似乎没有直接的方法,如果您需要该功能,最好直接使用 Kafka Streams 而不是 KSQL


推荐阅读