首页 > 解决方案 > 协程中 rxJava onNext 和 onError 的等价物是什么

问题描述

嗨,我有一些用 rxJava 用 Ja​​va 编写的用例。我已经将它们转换为 kotlin 文件,而不是 rxJava,我将它们变成了 couroutines 挂起函数。

在我的 rxJava 代码中,我正在从用例进行 api 调用并返回结果,但同时 onNext 它做了一些事情,而 onError 它做了一些事情。

我怎样才能在协程中做同样的事情

这是我的 rxjava 代码

    @PerApp
public class StartFuellingUseCase {

    @Inject
    App app;
    @Inject
    CurrentOrderStorage orderStorage;
    @Inject
    FuelOrderRepository repository;

    @Inject
    StartFuellingUseCase() {
        // empty constructor for injection usage
    }

    public Observable<GenericResponse> execute(Equipment equipment) {
        if (orderStorage.getFuelOrder() == null) return null;

        DateTime startTime = new DateTime();
        TimestampedAction action = new TimestampedAction(
                app.getSession().getUser().getId(), null, startTime
        );

        return repository.startFuelling(orderStorage.getFuelOrder().getId(), action)
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(response -> onSuccess(startTime, equipment))
                .doOnError(this::onError);
    }

    private void onSuccess(DateTime startTime, Equipment equipment) {
        if (orderStorage.getFuelOrder() == null) return;

        orderStorage.getFuelOrder().setStatus(FuelOrderData.STATUS_FUELLING);
        equipment.getTimes().setStart(startTime);

        app.saveState();
    }

    private void onError(Throwable e) {
        Timber.e(e, "Error calling started fuelling! %s", e.getMessage());
    }
}

我已经使用协程用例在 Kotlin 中重新编写了代码

 @PerApp
class StartFuellingUseCaseCoroutine  @Inject constructor(
    private val currentOrderStorage: CurrentOrderStorage,
    private val fuelOrderRepository: FuelOrderRepository,
    private val app: App
): UseCaseCoroutine<GenericResponse, StartFuellingUseCaseCoroutine.Params>() {

    override suspend fun run(params: Params): GenericResponse {
        val startTime = DateTime()
        val action = TimestampedAction(
            app.session.user.id, null, startTime
        )
        return fuelOrderRepository.startFuelling(
            currentOrderStorage.fuelOrder!!.id,
            action
        )
        //SHOULD RETURN THE VALUE FROM THE fuelOrderRepository.startFuelling
        //AND ALSO
        //ON NEXT
        //CALL onSuccess PASSING startTime and equipment
        //ON ERROR
        //CALL onError
    }

    private fun onSuccess(startTime: DateTime, equipment: Equipment) {
        if (currentOrderStorage.getFuelOrder() == null) return
        currentOrderStorage.getFuelOrder()!!.setStatus(FuelOrderData.STATUS_FUELLING)
        equipment.times.start = startTime
        app.saveState()
    }

    private fun onError(errorMessage: String) {
        Timber.e(errorMessage, "Error calling started fuelling! %s", errorMessage)
    }

    data class Params(val equipment: Equipment)
}

您能否建议我如何调用 onSuccess 和 onError,类似于我们在 rxjava onnext 和 onError 中的调用方式。

你能建议如何解决这个问题吗

谢谢R

标签: androidkotlinrx-javakotlin-coroutinesuse-case

解决方案


您可以使用 Kotlin Flow,如下面的转换示例:

RxJava

private fun observable(
    value: Int = 1
): Observable<Int> {
    return Observable.create { emitter ->
        emitter.onNext(value)
        emitter.onError(RuntimeException())
    }
}

流动:

private fun myFlow(
    value: Int = 1
): Flow<Int> {
    return flow {
        emit(value)
        throw RuntimeException()
    }
}

更多详情:https ://developer.android.com/kotlin/flow


推荐阅读