首页 > 解决方案 > 在 Micronaut 控制器中流式传输大响应而不会耗尽内存

问题描述

我们将 Micronaut 与 Mongo 一起使用,通过一些控制器公开数据。由于响应实体的大小正在增长,我们的应用程序有时会出现内存不足。因此,我们正在研究切换到异步 mongo 驱动程序并使用响应式响应将数据流式传输到客户端。不幸的是,我们无法更改 API 响应结构或内容类型(全部application/json

我们的 API 之一返回的实体结构如下:

[
  { "field": "value" },
  { "field": "value" },
  ...
  { "field": "value" }
]

我们使用这个控制器开始工作,其中dataStore返回 a Publisher<Example>

    @Get("all")
    Flowable<Example> getAllExamples() {
        return Flowable.fromPublisher(dataStore.find()).map(SomeMapper::toPublic);
    }

这很好用,大量示例列表在将其流式传输到客户端之前不必完全加载到内存中。

其他 API 返回(imo 更明智的)结构:

{
  "list": [
    { "field": "value" },
    { "field": "value" },
    ...
    { "field": "value" }
  ],
  "meta": {
    ...
  }
}

我们可以为这样的实体应用类似的发布者/可流动模式,还是在将这些响应发送出去之前将它们加载到内存中?

我们尝试了以下签名:

    @Get("all/dev")
    Single<ExamplesWrapper> getAllDev() {
        Publisher<Example> dev = dataStore.find();
        return Flowable.fromPublisher(dev)
                .map(mapper::map)
                .collect((Callable<ArrayList<Example>>) ArrayList::new, ArrayList::add)
                .map(ExampleWrapper::new);
    }

包装器将添加一些元数据的地方。但这再次将其全部加载到内存中,然后再将其发送出去,从而使应用程序崩溃。

将 Flowable 添加到响应包装器中:


public class ExamplesWrapper {

    private final Flowable<Example> examples;

    @ConstructorProperties({"examples"})
    public ExamplesWrapper(Flowable<Example> examples) {
        this.examples = examples;
    }

    public Flowable<Example> getExamples() {
        return examples;
    }
}

也因一些不错的杰克逊映射异常而失败。

元数据不依赖于实际的示例数据(它添加了一些静态公司信息)。我们能否以某种方式实现这样的端点,而不必将所有数据加载到内存中?

标签: javajsonmongodbreactive-programmingmicronaut

解决方案


文档

6.20 写入响应数据

响应式写入响应数据

Micronaut 的 HTTP 服务器通过返回一个 Publisher 来支持写入响应数据块,该 Publisher 发出可以编码为 HTTP 响应的对象。

下表总结了示例返回类型签名以及服务器为处理它们而表现出的行为: 返回类型描述

  • Flowable<byte[]>:一个Flowable,将每个内容块作为一个字节[]发出而不阻塞
  • Flux<ByteBuf>:将每个块作为 Netty ByteBuf 发出的 Reactor Flux
  • Publisher<String>:将每个内容块作为字符串发出的 Publisher
  • Flowable<Book> 发射 POJO 时,每个发射的对象默认编码为 JSON,不阻塞

返回响应式类型时,服务器使用分块的 Transfer-Encoding 并不断写入数据,直到调用 Publisher onComplete 方法。

我理解这一点,因此如果您希望 Micronaut 机制流式传输您的内容,您需要具有类似Flowable<item>or Flux<item>or的签名Publisher<item>,其中 item 是您响应的一部分,而不是完整的项目。然后,Micronaut 将响应来自 Flowable 或等效的块。

在这种情况下,我想到的一件事是您可以自己拆分成合适的块。这样流大响应而不将它们缓冲到内存中应该可以工作。

所以是这样的:

@Get("all")
public Flowable<String> getAllExamples() {
    ObjectMapper objectMapper = new ObjectMapper();
    Publisher<Example> dev = dataStore.find();
    return Flowable.fromPublisher(dev)
            .map(mapper::map)
            .concatMap(item -> Flowable.just(objectMapper.writeValueAsString(item), ","))
            .startWith("{\"list\": [")
            .concatWith(Flowable.just("],\"meta\":\"whatever\"}"));
}

它很hacky,但似乎适用于这种情况。


一些不起作用的方法:

我确实测试了在自定义杰克逊映射器中直接写入 JsonGenerator ,按照杰克逊流 api中的概述刷新对象,但是 micronaut RoutingInboundHandler似乎没有将响应刷新回最终用户而是缓冲它,导致内存不足。方法适用于 Spring Boot,因此它可能是 Micronaut 中缺少的功能。

当使用Micronaut可写(阻塞)响应并尝试在写入数据时刷新数据时,我也发生了相同的缓冲。我向 micronaut core 提出了一个关于该问题的问题


推荐阅读