java - 在 rxjava 2 OnNext 观察者中保留一些实体时出现异常。我该如何解决?
问题描述
我的应用程序是加密货币证券交易所。我正在另一个线程中与 rxjava 进行交易。成功后,将执行 onNext 消费者。在 onNext(onSuccess) 中,我应该将已完成的交易实体保存在数据库中并更新相应的钱包。但是当它到达 save 时dstTransaction
,它会在LambdaObserver.class
文件中抛出异常:
public void onNext(T t) {
if (!this.isDisposed()) {
try {
this.onNext.accept(t);
} catch (Throwable var3) {
Exceptions.throwIfFatal(var3);
((Disposable)this.get()).dispose();
this.onError(var3);
}
}
}
这是我的代码:
Observable.fromCallable(() -> transactionService.sendTransaction(currencyName, srcAddress.getAddress(),
destAddress.getAddress(), amountForBlcokCypherRx, srcPrivateKeyRx, cryptoFromUsername, cryptoToUsername,
unitPrice, size))
.subscribeOn(Schedulers.io())
.subscribe(
// on success handler - start
bcypherTransaction -> {
// make transaction for crypto
Transaction srcTransaction = new Transaction(currencyName, bcypherTransaction.getFees(), size,
cryptoFromUsername, cryptoToUsername);
srcTransaction.setTxHash(bcypherTransaction.getHash());
srcTransaction.setTransactionType(TransactionType.TRADE);
srcTransaction.setStatus(TransactionStatus.DONE);
// make transaction for fiat
Transaction dstTransaction = new Transaction(CurrencyName.IRT, BigDecimal.ZERO, size.multiply(unitPrice),
cryptoToUsername, cryptoFromUsername);
dstTransaction.setTransactionType(TransactionType.TRADE);
dstTransaction.setStatus(TransactionStatus.DONE);
Trade trade = new Trade(TradeStatus.COMPLETED, otherOrder.getMarketName(), size,
unitPrice, bcypherTransaction.getFees());
if (refOrder.getSide() == OrderSide.SELL) {
trade.setBuyerUsername(otherOrder.getUsername());
trade.setSellerUsername(refOrder.getUsername());
} else {
trade.setBuyerUsername(refOrder.getUsername());
trade.setSellerUsername(otherOrder.getUsername());
}
trade = tradeRepository.save(trade);
dstTransaction.setTrade(trade);
srcTransaction.setTrade(trade);
transactionRepository.save(srcTransaction);
transactionRepository.save(dstTransaction);
walletOperation.finalizeBlock(cryptoFromUsername, currencyName, unitPrice, size, bcypherTransaction.getFees());
walletOperation.finalizeBlock(cryptoToUsername, CurrencyName.IRT, unitPrice, size, BigDecimal.ZERO); // zero fee for IRT
walletOperation.transferToDestWallet(cryptoToUsername, currencyName, size);
BigDecimal IRTToDeposit = (size.subtract(bcypherTransaction.getFees())).multiply(unitPrice);
walletOperation.transferToDestWallet(cryptoFromUsername, CurrencyName.IRT, IRTToDeposit);
// on success handler - end
}, // on Error handler
e -> {
logger.debug("Create transaction failed : " + e.getMessage());
// make transaction for crypto
Transaction srcTransaction = new Transaction(refOrder.getCurrencyName(), BigDecimal.ZERO, size,
otherOrder.getUsername(), refOrder.getUsername());
srcTransaction.setTransactionType(TransactionType.TRADE);
srcTransaction.setStatus(TransactionStatus.SCHEDULED);
transactionRepository.save(srcTransaction);
}
);
我的代码有什么问题?
先感谢您。
解决方案
推荐阅读
- python - Pip freeze 无法正确显示 venv 中的 git 依赖项
- javascript - 在 TypeScript 类中使用 RequireJS 模块
- r - 如何解决 NA/NaN/Inf 执行停止错误?
- python - 如何使用 Matplotlib 在 Python 中的同一 Y 轴上有 2 个不同的比例
- javascript - 如何在 Angular 12 中读取本地 JSON 文件?
- date - UDF 函数如何在 pyspark 中以日期为参数工作?
- r - 如何结合 unicode 在 ggplot2 中使用它?
- android - Android IAP 订阅 - 如何在接收实时开发者通知时获取原始 orderID
- javascript - 使用 useState() 方法时出错
- python - 使用python进行时间数据分析