spring - 如何使用 Lease 自定义 ServerRSocketFactory
问题描述
我使用 Spring 5.2、Spring Boot 2.2.0M6 和 Spring Cloud Hoxton.M2 编写了一个简单的 RSocket 服务器和客户端。
我正在尝试通过根据官方RSocket 示例添加租约处理来自定义 RSocket 服务器的 ServerRSocketFactory 。
我正在使用ServerRSocketFactoryCustomizer
添加租赁处理。但是,当我声明定制器 bean 并启动服务器时,我收到来自 Spring-Cloud-Stream FunctionConfiguration
(版本 3.0.0.M3)的异常说“在 BeanFactory 中找到了多个函数”。
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'standAloneSupplierFlow' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'standAloneSupplierFlow' threw exception; nested exception is java.lang.IllegalArgumentException: Found more then one function in BeanFactory
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:645) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:625) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1339) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1178) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:878) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:877) ~[spring-context-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.boot.web.reactive.context.ReactiveWebServerApplicationContext.refresh(ReactiveWebServerApplicationContext.java:66) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.0.M6.jar:2.2.0.M6]
at com.equalities.cloud.rsocket.server.RsocketServerApplication.main(RsocketServerApplication.java:19) ~[classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'standAloneSupplierFlow' threw exception; nested exception is java.lang.IllegalArgumentException: Found more then one function in BeanFactory
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:640) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
... 17 common frames omitted
Caused by: java.lang.IllegalArgumentException: Found more then one function in BeanFactory
at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.discoverDefaultDefinitionIfNecessary(BeanFactoryAwareFunctionRegistry.java:194) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.compose(BeanFactoryAwareFunctionRegistry.java:212) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
at org.springframework.cloud.function.context.catalog.BeanFactoryAwareFunctionRegistry.lookup(BeanFactoryAwareFunctionRegistry.java:104) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
at org.springframework.cloud.function.context.FunctionCatalog.lookup(FunctionCatalog.java:72) ~[spring-cloud-function-context-3.0.0.M2.jar:3.0.0.M2]
at org.springframework.cloud.stream.function.FunctionConfiguration.standAloneSupplierFlow(FunctionConfiguration.java:90) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
at org.springframework.cloud.stream.function.FunctionConfiguration$$EnhancerBySpringCGLIB$$f29b466d.CGLIB$standAloneSupplierFlow$1(<generated>) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
at org.springframework.cloud.stream.function.FunctionConfiguration$$EnhancerBySpringCGLIB$$f29b466d$$FastClassBySpringCGLIB$$d3c910a8.invoke(<generated>) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363) ~[spring-context-5.2.0.RC2.jar:5.2.0.RC2]
at org.springframework.cloud.stream.function.FunctionConfiguration$$EnhancerBySpringCGLIB$$f29b466d.standAloneSupplierFlow(<generated>) ~[spring-cloud-stream-3.0.0.M3.jar:3.0.0.M3]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[na:na]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.2.0.RC2.jar:5.2.0.RC2]
... 18 common frames omitted
我正在使用以下代码来声明定制器:
@Bean
public ServerRSocketFactoryCustomizer leaseCustomizer() {
// Here, we return a ServerRSocketFactoryCustomizer bean to influence
// how the RSocket server is configured.
//
// A ServerRSocketFactory is defined by rsocket-java as an API that
// is used to create a server-side RSocket, using RSocketFactory.receive().
// Among other things, it is used to configure leases to clients as shown in this sample:
// https://github.com/rsocket/rsocket-java/blob/master/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/lease/LeaseExample.java
//
// On the client side, a similar class, ClientRSocketFactory, exists.
// This can be customized using the RSocketRequester.Builder's .rsocketFactory() method.
// See: https://docs.spring.io/spring/docs/5.2.0.RELEASE/spring-framework-reference/web-reactive.html#rsocket-requester-client-advanced
// See: org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration
return new LeaseCustomizer();
}
LeaseCustomizer
如下所示:
public class LeaseCustomizer implements ServerRSocketFactoryCustomizer {
@Override
public ServerRSocketFactory apply(ServerRSocketFactory factory) {
factory.lease(() -> Leases.<NoopStats>create()
.sender(new LeaseSender("Server", 7_000, 5))
.receiver(new LeaseReceiver("Server")));
return factory;
}
private static class NoopStats implements LeaseStats {
@Override
public void onEvent(EventType eventType) {}
}
@Slf4j
private static class LeaseSender implements Function<Optional<NoopStats>, Flux<Lease>> {
private final String tag;
private final int ttlMillis;
private final int allowedRequests;
public LeaseSender(String tag, int ttlMillis, int allowedRequests) {
this.tag = tag;
this.ttlMillis = ttlMillis;
this.allowedRequests = allowedRequests;
}
@Override
public Flux<Lease> apply(Optional<NoopStats> leaseStats) {
log.info("{} stats are {}", tag, leaseStats.isPresent() ? "present" : "absent");
return Flux.interval(ofSeconds(1), ofSeconds(10))
.onBackpressureLatest()
.map( tick -> {
log.info("{} responder sends new leases: ttl: {}, requests: {}", tag, ttlMillis, allowedRequests);
return Lease.create(ttlMillis, allowedRequests);
});
}
}
@Slf4j
private static class LeaseReceiver implements Consumer<Flux<Lease>> {
private final String tag;
public LeaseReceiver(String tag) {
this.tag = tag;
}
@Override
public void accept(Flux<Lease> receivedLeases) {
receivedLeases.subscribe(lease -> log.info("{} received leases - ttl: {}, requests: {}", tag, lease.getTimeToLiveMillis(), lease.getAllowedRequests()));
}
}
}
我的bootstrap.yml
样子如下:
debug: true
server:
port: ${PORT:3333}
spring:
application:
name: rsocket-server
cloud:
config:
discovery:
enabled: true
service-id: config-server # should come from environment
rsocket:
server:
port: 9999
transport: tcp
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka
我的pom.xml
样子如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.0.M6</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.equalities.cloud</groupId>
<artifactId>rsocket-server</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rsocket-server</name>
<description>An RSocket server application</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>Hoxton.M2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-bus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope> <!-- See: https://projectlombok.org/setup/maven -->
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
</pluginRepository>
</pluginRepositories>
</project>
如您所见,我将Spring-Cloud-Config
(server) 与Spring–Boot-Starter-AMQP
and结合使用Spring–Cloud-Stream-Binder-RabbitMQ
。
我已经调试过,问题出在哪里,在我看来,这要么是 Spring Boot RSocket 支持的问题,要么是Spring-Cloud-Stream
.
问题是它ServerRSocketFactoryCustomizer
是一个@FunctionalInterface
,即行为像一个函数,并被类拾取,org.springframework.cloud.stream.function.FunctionConfiguration
该类在内部调用functionCatalog.lookup(functionProperties.getDefinition())
试图从注册表中查找一个函数。由于我声明该ServerRSocketFactoryCustomizer
注册表包含两个函数,因此引发了异常。
一般来说,我希望我可以声明尽可能多的ServerRSocketFactoryCustomizer
bean,并@Order
相应地影响 RSocket 服务器的行为方式。今天,这似乎是不可能的,而且基于“带注释的响应者”的 Spring 对 RSocket 的支持几乎隐藏了 RSocket 服务器套接字,这有点可惜。
有什么方法可以自定义 Spring/Spring-Boot 提供的 RSocket 服务器以添加租约等,就像我正在尝试做的那样?
谢谢!
解决方案
所以这是一个问题,在某种程度上,spring-cloud-function 的一种特性。我们正在讨论它并将很快修复它,但现在这里有一个快速的解决方法。只需在不存在的地方添加一个属性--spring.cloud.function.definition=blah
,就可以了。blah
推荐阅读
- jquery-ui - 如何从对象数组中删除对象。在淘汰赛JS。错误:对象不支持属性或方法“删除”
- swift - 无法从 Firestore DocumentSnapshot 获取时间戳值
- swift - 在保持正常突出显示行为的同时添加长按按钮?
- python - Range 类是如何自动执行的?
- javascript - 用 eq() 或 index 得到不同的结果
- r - kableExtra 表中具有多个案例的方程
- llvm - LLVM Kaleidoscope 教程在本地外部失败
- objective-c - 将 UIlabel 添加到相机覆盖视图 uiimagepickercontroller Xcode objc
- security - 为了安全起见,OIDC提供的id token有什么好处?如果 id 令牌被盗怎么办?
- php - 星级评分,半星支持