java - 如何将 java.util.stream.Stream 写入 StreamingResponseBody 输出流
问题描述
我正在构建一个 REST API,其中来自 Oracle 数据库的大量数据可以通过流式传输到客户端应用程序(如文件下载或直接流)以块的形式发送。
我正在从 JpaRepository 获取 Stream,如下所示 -
@Query("select u from UsersEntity u")
Stream<UsersEntity> findAllByCustomQueryAndStream();
但是现在挑战来将此流写入 StreamingResponseBody 输出流
我尝试了很多方法但没有成功 -
第一种方法-
Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();
StreamingResponseBody stream = outputStream -> {
Iterator<UsersEntity> iterator = usersResultStream.iterator();
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
while (iterator.hasNext()) {
oos.write(iterator.next().toString().getBytes());
}
}
};
出现错误-
java.sql.SQLException: Closed Resultset: next
at oracle.jdbc.driver.InsensitiveScrollableResultSet.next(InsensitiveScrollableResultSet.java:565) ~[ojdbc7-12.1.0.2.jar:12.1.0.2.0]
第二种方法-
StreamingResponseBody stream = new StreamingResponseBody() {
@Transactional(readOnly = true)
@Override
public void writeTo(OutputStream outputStream) throws IOException {
Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream();
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
usersResultStream.forEach(user->{
try {
oos.write(user.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
};
出现错误-
org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.
我已在以下链接上传练习代码 - 示例 POC 链接
我对流媒体相关任务没有任何经验,所以请帮助我。
如果我的方向错误,则建议在Spring Framework中执行此操作的任何其他方法。如果可用,请分享任何参考链接。
解决方案
最后,我通过使用服务层解决了这个问题。最初,我在创建问题的控制器类中编写完整的逻辑。
控制器类 -
@RestController
@RequestMapping("/api")
public class UsersController {
@Autowired
private UserService service;
@GetMapping(value = "/userstream")
public ResponseEntity<StreamingResponseBody> fetchUsersStream() {
StreamingResponseBody stream = this::writeTo;
return new ResponseEntity<>(stream, HttpStatus.OK);
}
private void writeTo(OutputStream outputStream) {
service.writeToOutputStream(outputStream);
}
}
服务等级 -
@Service
public class UserService {
@Autowired
private UsersRepository usersRepository;
@Transactional(readOnly = true)
public void writeToOutputStream(final OutputStream outputStream) {
try (Stream<UsersEntity> usersResultStream = usersRepository.findAllByCustomQueryAndStream()) {
try (ObjectOutputStream oos = new ObjectOutputStream(outputStream)) {
usersResultStream.forEach(emp -> {
try {
oos.write(emp.toString().getBytes());
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
完整代码可在 github - https://github.com/bagesh2050/HttpResponseStreamingDemo
不过,我愿意提供与 Http Streaming 相关的建议。如果您有更好的想法,请提供。
推荐阅读
- django - drf-yasg 自定义 json 正文
- python - 重复单词随机次数
- amazon-web-services - 如何近乎实时地将 RDS 数据存储在 S3 中?(AWS DMS)
- javascript - 如何在不使用第三方小部件的情况下将我最近的 Instagram 帖子嵌入到 html 中?
- database - 如何创建一个查询来查找 2 个数字之间的值,这些数字是 MongoDB 中的字符串类型
- git - Windows 10 的 Git 客户端会自动安装 SSH 客户端吗?
- oracle - 创建作业以按导出 CSV 的计划运行
- python - 无法在 python 3.8 中导入 pyplot
- spring-boot - 带有自定义 JSON 响应的 Feign 错误解码器
- reactjs - 避免对组件加载多个 useEffect