首页 > 解决方案 > 从流中读取和累积记录的反应式(java)方式

问题描述

这里是反应式编程的新手。

Multi转换中,我需要为几次迭代累积字节,检查传入的字节,当我遇到特定的字节模式时,我会发布整个累积的 byteArray 以供进一步使用。

似乎是一个简单的要求,但我被卡住了。

伪 Kotlin 代码:

   val array = listOf<ByteArray>(....)
   Multi.createFrom().items(array)
       .onItem.transform { oneByteArray: ByteArray ->
            if (oneByteArray.indexOf(0x0d) == -1) {
                // somehow accumulate oneByteArray somewhere (1)
                // do not publish anything downstream (2)
            } else {
                // publish accumulated byteArrays as one large byteArray
            }
    }

我坚持(1)和(2)。暂时停止下游发布(当我正在寻找特定字节时)的机制是什么?

标签: javakotlinreactive-programmingsmallrye

解决方案


推荐阅读