首页 > 解决方案 > 如何创建一个无限项流的可观察对象

问题描述

我有一个发射器,可以无限发射物品。如何将发射器发出的项目流转换为 RxJava 2 中的 Observable(或其中一种)。

标签: rx-java2

解决方案


您是否正在寻找流式传输数据?假设我正在尝试从数据库中流式传输数据。

return Observable.using(
            () -> getQueryConnectionSubscription(sql),

            connectionSubscription -> Observable.create((subscriber) -> {

                ResultSet resultSet = connectionSubscription.getResultSet();
                int rowNumber = 0;
                while (!subscriber.isDisposed() && resultSet.next()) {

                    T row = rowMapper.mapRow(resultSet, rowNumber);
                    subscriber.onNext(row);
                }
                subscriber.onComplete();

            }),

            (queryConnectionSubscription) -> {
                queryConnectionSubscription.close();
            });

我不确定您的数据来源是什么。但是只要你有数据,你就会一直调用subscriber.onNext(data)。如果您想要完整的详细信息,请查看链接 https://www.developerthoughtsonline.com/2019/02/02/streaming-with-reactive-java-and-spring-jdbctemplate/


推荐阅读