首页 > 解决方案 > 在 rxjava 2 OnNext 观察者中保留一些实体时出现异常。我该如何解决?

问题描述

我的应用程序是加密货币证券交易所。我正在另一个线程中与 rxjava 进行交易。成功后,将执行 onNext 消费者。在 onNext(onSuccess) 中,我应该将已完成的交易实体保存在数据库中并更新相应的钱包。但是当它到达 save 时dstTransaction,它​​会在LambdaObserver.class文件中抛出异常:

public void onNext(T t) {
        if (!this.isDisposed()) {
            try {
                this.onNext.accept(t);
            } catch (Throwable var3) {
                Exceptions.throwIfFatal(var3);
                ((Disposable)this.get()).dispose();
                this.onError(var3);
            }
        }

    }

这是我的代码:

Observable.fromCallable(() -> transactionService.sendTransaction(currencyName, srcAddress.getAddress(),
                    destAddress.getAddress(), amountForBlcokCypherRx, srcPrivateKeyRx, cryptoFromUsername, cryptoToUsername,
                    unitPrice, size))
                    .subscribeOn(Schedulers.io())
                    .subscribe(
                            // on success handler - start
                            bcypherTransaction -> {
                                // make transaction for crypto
                                Transaction srcTransaction = new Transaction(currencyName, bcypherTransaction.getFees(), size,
                                        cryptoFromUsername, cryptoToUsername);
                                srcTransaction.setTxHash(bcypherTransaction.getHash());
                                srcTransaction.setTransactionType(TransactionType.TRADE);
                                srcTransaction.setStatus(TransactionStatus.DONE);

                                // make transaction for fiat
                                Transaction dstTransaction = new Transaction(CurrencyName.IRT, BigDecimal.ZERO, size.multiply(unitPrice),
                                        cryptoToUsername, cryptoFromUsername);
                                dstTransaction.setTransactionType(TransactionType.TRADE);
                                dstTransaction.setStatus(TransactionStatus.DONE);

                                Trade trade = new Trade(TradeStatus.COMPLETED, otherOrder.getMarketName(), size,
                                        unitPrice, bcypherTransaction.getFees());
                                if (refOrder.getSide() == OrderSide.SELL) {
                                    trade.setBuyerUsername(otherOrder.getUsername());
                                    trade.setSellerUsername(refOrder.getUsername());
                                } else {
                                    trade.setBuyerUsername(refOrder.getUsername());
                                    trade.setSellerUsername(otherOrder.getUsername());
                                }

                                trade = tradeRepository.save(trade);
                                dstTransaction.setTrade(trade);
                                srcTransaction.setTrade(trade);

                                transactionRepository.save(srcTransaction);

                                transactionRepository.save(dstTransaction);

                                walletOperation.finalizeBlock(cryptoFromUsername, currencyName, unitPrice, size, bcypherTransaction.getFees());
                                walletOperation.finalizeBlock(cryptoToUsername, CurrencyName.IRT, unitPrice, size, BigDecimal.ZERO); // zero fee for IRT

                                walletOperation.transferToDestWallet(cryptoToUsername, currencyName, size);
                                BigDecimal IRTToDeposit = (size.subtract(bcypherTransaction.getFees())).multiply(unitPrice);
                                walletOperation.transferToDestWallet(cryptoFromUsername, CurrencyName.IRT, IRTToDeposit);
                                // on success handler - end

                            }, // on Error handler
                            e -> {
                                logger.debug("Create transaction failed : " + e.getMessage());
                                // make transaction for crypto
                                Transaction srcTransaction = new Transaction(refOrder.getCurrencyName(), BigDecimal.ZERO, size,
                                        otherOrder.getUsername(), refOrder.getUsername());
                                srcTransaction.setTransactionType(TransactionType.TRADE);
                                srcTransaction.setStatus(TransactionStatus.SCHEDULED);
                                transactionRepository.save(srcTransaction);
                            }
                    );

我的代码有什么问题?

先感谢您。

标签: javaspring-bootexceptionspring-datarx-java2

解决方案


推荐阅读