首页 > 解决方案 > 如何将 java.util.stream.Stream 写入 StreamingResponseBody 输出流

问题描述

我正在构建一个 REST API,其中来自 Oracle 数据库的大量数据可以通过流式传输到客户端应用程序(如文件下载或直接流)以块的形式发送。

我正在从 JpaRepository 获取 Stream,如下所示 -

@Query("select u from UsersEntity u")
Stream<UsersEntity> findAllByCustomQueryAndStream();

但是现在挑战来将此流写入 StreamingResponseBody 输出流

我尝试了很多方法但没有成功 -

第一种方法-

Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();

        StreamingResponseBody stream = outputStream -> {
            Iterator<UsersEntity> iterator = usersResultStream.iterator();

            try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {

                while (iterator.hasNext()) {
                    oos.write(iterator.next().toString().getBytes());
                }
            }
        };

出现错误-

java.sql.SQLException: Closed Resultset: next
    at oracle.jdbc.driver.InsensitiveScrollableResultSet.next(InsensitiveScrollableResultSet.java:565) ~[ojdbc7-12.1.0.2.jar:12.1.0.2.0]

第二种方法-

StreamingResponseBody stream = new StreamingResponseBody() {

            @Transactional(readOnly = true)
            @Override
            public void writeTo(OutputStream outputStream) throws IOException {

                Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();

                try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {

                    usersResultStream.forEach(user->{
                        try {
                            oos.write(user.toString().getBytes());
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
                }
            }
        }; 

出现错误-

org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.

我已在以下链接上传练习代码 - 示例 POC 链接

我对流媒体相关任务没有任何经验,所以请帮助我。

如果我的方向错误,则建议在Spring Framework中执行此操作的任何其他方法。如果可用,请分享任何参考链接。

标签: javaspring-bootjpa-2.0

解决方案


最后,我通过使用服务层解决了这个问题。最初,我在创建问题的控制器类中编写完整的逻辑。

控制器类 -

@RestController
@RequestMapping("/api")
public class UsersController {
    @Autowired
    private UserService service;

    @GetMapping(value = "/userstream")
    public ResponseEntity<StreamingResponseBody> fetchUsersStream() {

        StreamingResponseBody stream = this::writeTo;

        return new ResponseEntity<>(stream, HttpStatus.OK);
    }

    private void writeTo(OutputStream outputStream) {
        service.writeToOutputStream(outputStream);
    }
}

服务等级 -

@Service
public class UserService {

    @Autowired
    private UsersRepository usersRepository;

    @Transactional(readOnly = true)
    public void writeToOutputStream(final OutputStream outputStream) {
        try (Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream()) {
            try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {

                usersResultStream.forEach(emp -> {
                    try {
                        oos.write(emp.toString().getBytes());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

完整代码可在 github - https://github.com/bagesh2050/HttpResponseStreamingDemo

不过,我愿意提供与 Http Streaming 相关的建议。如果您有更好的想法,请提供。


推荐阅读