首页 > 解决方案 > RxJava2 Flowable:将对象发送到服务器并检测结束

问题描述

我需要将对象列表发送到我的远程服务器。由于它们可能很多而且很大,我使用一个 flowable 使用 request(1) 从一个数组列表中一个一个地发送它们。

对于每个对象,都会对服务器进行改造调用,作为回报,我会获得远程 ID,并使用远程 ID 更新本地对象。

我需要检测此任务的结束:即发送的最后一个对象的最后响应,以防止对同一对象的多个并发调用。

目前一切正常,但我在远程服务器收到答案之前收到“已完成”消息,因此在更新对象之前。

我怎样才能做到这一点 ?

Flowable<Integer> observable = Flowable.range(0, objList.size());

        observable.subscribe(new DefaultSubscriber<Integer>() {
            @Override
            public void onStart() {
                Log.d(TAG, "on start");
                request(1);
            }

            @Override
            public void onNext(Integer t) {
                Log.d(TAG, "on next : " + t);
                MyObj = objList.get(t);

                RetrofitHelper.createService(ObjService.class, true, authType, authToken).createOrUpdateObj(objList.get(t)).flatMap(p -> {

                    Log.d(TAG, "recu p");

                    if (p != null) {
                        try {
                            p.setSyncho(true);
                            // save remote id on obj
                            ObjDB.updateObj(p);
                            request(1);

                            return Observable.empty();
                        } catch (Throwable th) {
                            ExceptionHandler.logException(th);
                            return Observable.error(th);
                        }
                    } else {
                        request(1);
                        return Observable.empty();  // provisoirement si pb on renvoie vide
                    }
                })
                        .onErrorResumeNext(r -> {
                            request(1);
                            Observable.empty();
                        })
                        .onExceptionResumeNext(error -> Observable.empty()) // go to next on error
                        .subscribeOn(Schedulers.io()).onErrorReturn(error -> {
                    Log.d("ERROR", error.getMessage());
                    return 0;
                })

                        .onErrorResumeNext(Observable.empty()).subscribe();
            }


            @Override
            public void onError(Throwable t) {
                Log.e("XXX ERROR ", "" + t);
                request(1);
                patientSynchroInProgress = Boolean.FALSE;
            }

            @Override
            public void onComplete() {
                Log.e("XXX COMPLETE", "complete");
            }
        });

标签: androidrx-java2

解决方案


最后,我能够让它工作

  Flowable.fromIterable(patientList)
                    .concatMap(item -> {

                                item.setSomething();
                                return RetrofitHelper.createService(ObjService.class, true, authType, authToken)
                                        .createOrUpdateObj(item)
                                        .flatMap(p -> {
                                            if (p != null) {
                                                try {
                                                    p.setSyncho(true);
                                                    // save remote id on obj
                                                    ObjDB.updateObj(p);
                                                    return Observable.empty();
                                                } catch (Throwable th) {
                                                    ExceptionHandler.logException(th);
                                                    return Observable.error(th);
                                                }
                                            } else {
                                                return Observable.empty();  // provisoirement si pb on renvoie vide
                                            }
                                        })

                                        .onErrorResumeNext(Observable.empty())
                                        .toFlowable(BackpressureStrategy.BUFFER);
                            }
                    )
                    .doOnNext(s -> {
                        Log.d(TAG, ((Obj) s).toString());

                    })
                    .doOnComplete(() -> {
                        // do something when completed
                        Log.d(TAG, "COMPLETE");
                    })
                    .subscribe();
        }
    }

谢谢您的帮助


推荐阅读