首页 > 解决方案 > Camel:如何使用 StreamList 从 SQL 组件流式传输

问题描述

我正在尝试使用 Camels SQL 组件从数据库中使用 outputType=StreamList 进行流式传输。我使用 ConsumerTemplate 从 Java 类中获取 ResultIterator :

public Flux<String> CreateFlux () {
ConsumerTemplate consumer = camelContext.createConsumerTemplate();

    ResultSetIterator resultSetIterator = consumer.receiveBody(
            "sql:SELECT DATA FROM TRANSAKSJON WHERE REQ_ID='" + recId + "'?outputType=StreamList", ResultSetIterator.class);
    ...
    while (result.hasNext()) {
        Map<String, String> map = (Map<String, String>) result.next();
        String data = map.get("DATA");

    }
}

尝试迭代ResultsetIterator 时出现以下错误:

org.h2.jdbc.JdbcSQLException:对象已关闭 [90007-197]

经过检查,我看到连接已关闭。连接 = {HikariProxyConnection@16287} "HikariProxyConnection@1048081993 包装 com.zaxxer.hikari.pool.ProxyConnection.ClosedConnection"

如何使用骆驼 SQL 组件进行流式传输?我必须从不在骆驼路线内的 bean 中使用它。我发现只有在骆驼路线中使用 SQL 组件时,流式传输才有效。 

骆驼版本是:2.24.1

更新1:查看源代码后,它是打算的。onDone 关闭连接。我正在尝试在我的 defaultExchange 上设置 UnitOfWork 以通过将交换标记为未完成来保持连接打开。

Update2:通过设置 UnitOf Work 设法让它工作:

        ProducerTemplate pTmp = camelContext.createProducerTemplate();

        DefaultExchange defaultExchange = new DefaultExchange(camelContext);
        UnitOfWork unitOfWork = new DefaultUnitOfWork(defaultExchange);
        defaultExchange.setUnitOfWork(unitOfWork);
        pTmp.send("direct:DbStream", defaultExchange);

路由 DbStream 执行上述 SQL 选择

标签: javadatabaseapache-camelstreaming

解决方案


不要使用receiveBody,而只是receive 来Exchange取回。然后你可以从它的消息体中获取迭代器,在使用之后,你可以完成交换(参见javadoc)


推荐阅读