java - Redis在运行时获取命令不可预测的结果
问题描述
我有一个小程序,它通过加密交换建立 Web 套接字连接,接收数据并使用set
Redis 命令保存。
代码
//get Redis connection
RedisAsyncCommands<String, String> redis = TRedis.getRedis();
String symbol = "AGIETH";
Session session = null;
try {
//Open websocket connection.
session = (new
BinanceApi()).websocketTrades(BinanceSymbol.valueOf(symbol), new BinanceWebSocketAdapterAggTrades() {
@Override
public void onMessage(BinanceEventAggTrade message) {
double closeOrderBuy = 0;
double closeOrderSell = 0;
//check if we saved order information before and if yes get data from Redis
try {
if(redis.get(symbol+"Buy").get()!=null )
{
closeOrderBuy = Double.valueOf(redis.get(symbol+"Buy").get());
}
if( redis.get(symbol+"Sell").get()!=null)
{
closeOrderSell = Double.valueOf(redis.get(symbol+"Sell").get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// get current value from exchange
double currentCloseOrder = message.getPrice().multiply(message.getQuantity()).doubleValue();
// rewrite data in Redis
if(message.isMaker()) {
closeOrderBuy = currentCloseOrder + closeOrderBuy;
redis.set(symbol + "Buy",String.valueOf(closeOrderBuy));
}
else {
closeOrderSell = currentCloseOrder + closeOrderSell;
redis.set(symbol + "Sell",String.valueOf(closeOrderSell));
}
}
});
} catch (BinanceApiException e) {
e.printStackTrace();
}
try {
Thread.sleep(10000);
session.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
//check what we write
try {
System.out.println(symbol + redis.get(symbol + "Buy").get() + " " + redis.get(symbol + "Sell").get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
这是 AGIETH 对的控制台输出的一部分:
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandEncoder - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] Received: 5 bytes, 1 commands in the stack
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0x6893917d, /127.0.0.1:64152 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decode AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
18:01:00.197 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decoded AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], empty stack: true
***AGIETH 0.045027 null***
AGIBTC 对的输出
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandEncoder - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] Received: 25 bytes, 1 commands in the stack
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.CommandHandler - [channel=0xc565134b, /127.0.0.1:54361 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decode AsyncCommand [type=GET, output=ValueOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
13:51:02.646 [lettuce-nioEventLoop-4-1] DEBUG io.lettuce.core.protocol.RedisStateMachine - Decoded AsyncCommand [type=GET, output=ValueOutput [output=0.5475444299999999, error='null'], commandType=io.lettuce.core.protocol.Command], empty stack: true
**AGIBTC 0.20769342999999998 0.5475444299999999**
我收到一些对的 Null,但交换提供了此信息。不明白是Redis的问题,还是我的程序逻辑不对?
解决方案
我发现您的代码有两个问题:
- 您使用 Redis 异步 API,但未正确等待 set() 操作完成。RedisAsyncCommands.set 返回 RedisFuture。在调用 get() 之前,您必须确保完成未来。
- 您等待 10 秒等待来自 BinanceApi 的交易事件。但有些货币对可能很少有交易,所以 10 秒是不够的。
我通过添加适当的等待条件稍微修改了您的代码,一切似乎都工作正常:
//get Redis connection
RedisClient client = RedisClient.create("redis://localhost");
StatefulRedisConnection<String, String> connection = client.connect();
RedisAsyncCommands<String, String> redis = connection.async();
String symbol = "AGIETH";
Session session = null;
CompletableFuture<String> cfBuy = new CompletableFuture<>();
CompletableFuture<String> cfSell = new CompletableFuture<>();
try {
//Open websocket connection.
session = (new
BinanceApi()).websocketTrades(BinanceSymbol.valueOf(symbol), new BinanceWebSocketAdapterAggTrades() {
@Override
public void onMessage(BinanceEventAggTrade message) {
double closeOrderBuy = 0;
double closeOrderSell = 0;
//check if we saved order information before and if yes get data from Redis
try {
if(redis.get(symbol+"Buy").get()!=null )
{
closeOrderBuy = Double.valueOf(redis.get(symbol+"Buy").get());
}
if( redis.get(symbol+"Sell").get()!=null)
{
closeOrderSell = Double.valueOf(redis.get(symbol+"Sell").get());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// get current value from exchange
double currentCloseOrder = message.getPrice().multiply(message.getQuantity()).doubleValue();
// rewrite data in Redis
if(message.isMaker()) {
closeOrderBuy = currentCloseOrder + closeOrderBuy;
redis.set(symbol + "Buy",String.valueOf(closeOrderBuy)).thenAccept(cfBuy::complete);
}
else {
closeOrderSell = currentCloseOrder + closeOrderSell;
redis.set(symbol + "Sell",String.valueOf(closeOrderSell)).thenAccept(cfSell::complete);
}
}
});
} catch (BinanceApiException e) {
e.printStackTrace();
}
try {
System.out.println("Futures completed: " + cfBuy.get() + " " + cfSell.get());
session.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
//check what we write
try {
System.out.println(symbol + redis.get(symbol + "Buy").get() + " " + redis.get(symbol + "Sell").get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
输出:
Futures completed: OK OK
AGIETH0.06922798 0.08610368
推荐阅读
- dynamics-crm - 从 Dynamics 365 获取审核记录详细信息到 Power BI
- python - 了解嵌套字典的 .get() 方法
- javascript - 我可以将 Vue 输出保存为 Javascript 数组吗?
- spring - Spring boot 不断收到“需要一个 bean,但找到了 10 个”,尽管我只声明了一个服务
- azcopy - azcopy v10 - 仅当目标文件不存在时才复制到目标
- python - 如何使用唯一代码从 CSV 文件中读取一行?
- docker - 为什么我应该使用 docker 镜像存储库而不是从 Dockerfile 构建?
- c++ - C++ 难以在单例类中创建类的实例
- javascript - 如何使用 mssql 在 SQL Server 中创建可重用的预处理语句?
- java - MVP Clean Architecture - 计算和非数据相关任务属于哪里?