首页 > 解决方案 > 如何将结果从一个源流传递到另一个源流

问题描述

我有一个处理 aSource并返回的方法。我正在尝试修改它,但似乎无法返回相同的内容:

原来的

def originalMethod[as: AS, mat: MAT, ec: EC](checkType: String) 
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
      collectStuff
      .map { ts =>
        val errors = MyEngine.checkAll(ts.code)
        (ts, errors)
      }
      .map { x =>
        x._2
          .leftMap(xs => {
            addInformation(x._1, xs.toList)
          })
          .toEither
      }
}

我正在通过使用另一个源进行修改并将结果传递给原始源,但返回相同的内容:

def calculate[T: AS: MAT](source: Source[T, NotUsed]): Future[Seq[T]] = 
{
 source.runWith(Sink.seq)
}


def modifiedMethod[as: AS, mat: MAT, ec: EC](checkType: String, mySource: Source[LoanApplicationRegister, NotUsed]) 
: Flow[ByteString, MyValidation[MyClass], NotUsed]{
  for {
    calc <- calculate(mySource)
    orig <-  collectStuff
        .map { ts =>
          val errors = MyEngine.checkAll(ts.code, calc)
          (ts, errors)
        }
        .map { x =>
          x._2
            .leftMap(xs => {
              addInformation(x._1, xs.toList)
            })
            .toEither
        }
  }
  yield {
    orig
  }
}

但我收到编译错误Expression of type Future[Nothing] doesn't conform to existing type Flow[ByteString, MyValidation[MyClass]

Flow[ByteString, MyValidation[MyClass]我怎样才能modifiedMethodoriginalMethod以前一样返回

标签: scalaakkaakka-stream

解决方案


  for { calc <- calculate(mySource)}
  yield {
    collectStuff
        .map { ts =>
          val errors = MyEngine.checkAll(ts.code, calc)
          (ts, errors)
        }
        .map { x =>
          x._2
            .leftMap(xs => {
              addInformation(x._1, xs.toList)
            })
            .toEither
        }
  }

会给你一个Future[Flow[ByteString, MyValidation[MyClass], NotUsed]]而不是,Future[Nothing] 但如果你想删除Future你需要在Await某个地方为它(当你调用计算时(然后你不需要for)或在它之后。通常,这不是使用期货的方式


推荐阅读