spring-boot - 使用 Gradle 和 Kotlin 创建 Spring Cloud 数据流处理器
问题描述
我正在尝试使用 Kotlin Lambda 构建基于 Spring Cloud Stream 3.1 的 Kotlin 处理器。我正在使用 Spring Cloud Data Flow 将此处理器部署到流中,但它被标记为失败。
如果我查看错误日志,则会出现以下错误:IllegalArgumentException: Type must be one of Supplier, Function or Consumer
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:531) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.2.jar!/:5.3.2]
这是我的代码:
@SpringBootApplication
class TransformerUppercaseApplication{
@Bean
fun transform(): (String) -> String {
return { "This is my uppercase".plus(it.toUpperCase()) }
}
}
fun main(args: Array<String>) {
runApplication<TransformerUppercaseApplication>(*args)
}
我尝试在 application.yml 中指定函数名称,但没有成功:
spring:
cloud:
stream:
function:
definition: transform
这是我的 build.gradle.kts :
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
`maven-publish`
id("org.springframework.boot") version "2.4.1"
id("io.spring.dependency-management") version "1.0.10.RELEASE"
kotlin("jvm") version "1.4.21"
kotlin("plugin.spring") version "1.4.21"
}
group = "com.company"
version = "0.0.11"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
maven { url = uri("https://repo.spring.io/milestone") }
}
extra["springCloudVersion"] = "2020.0.0"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
publishing {
publications {
create<MavenPublication>("mavenJava") {
from(components["java"])
}
}
}
configurations {
val elements = listOf(apiElements, runtimeElements)
elements.forEach { element ->
element.get().outgoing.artifacts.removeIf { it -> it.buildDependencies.getDependencies(null).contains(tasks.jar.get())}
element.get().outgoing.artifact(tasks.bootJar.get())
}
}
编辑:我可以通过使用 java Function 类而不是 kotlin lambda 来避免错误:
@Bean
fun transform(): Function<String, String> = Function {
it.toUpperCase()
}
但是现在出现了一个新问题,处理器无法绑定到kafka主题,显示为匿名并且处理器kepp状态正在部署。我在处理器日志中得到了这个(我也在 java 示例中尝试过):
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1611044007674
2021-01-19 08:13:27.676 INFO 107 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Subscribed to topic(s): transform-in-0
2021-01-19 08:13:27.677 INFO 107 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2021-01-19 08:13:27.702 INFO 107 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@24fabd0f
2021-01-19 08:13:27.715 INFO 107 --- [ main] c.e.t.TransformerUppercaseApplicationKt : Started TransformerUppercaseApplicationKt in 7.538 seconds (JVM running for 8.374)
2021-01-19 08:13:27.729 INFO 107 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Cluster ID: gN63F8OzRwaEeKElBfkmjw
2021-01-19 08:13:28.827 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Discovered group coordinator kafka-broker:9092 (id: 2147482646 rack: null)
2021-01-19 08:13:28.833 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:31.890 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Finished assignment for group at generation 1: {consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2-61cd1b1a-6485-4bc8-83e8-003bc22db7e1=Assignment(partitions=[transform-in-0-0])}
2021-01-19 08:13:31.935 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Successfully joined group with generation 1
2021-01-19 08:13:31.936 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Notifying assignor about the new Assignment(partitions=[transform-in-0-0])
2021-01-19 08:13:31.939 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Adding newly assigned partitions: transform-in-0-0
2021-01-19 08:13:31.950 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.963 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.977 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Resetting offset for partition transform-in-0-0 to offset 0.
2021-01-19 08:13:32.007 INFO 107 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2 : anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3: partitions assigned: [transform-in-0-0]
解决方案
请将您的示例与我们在 Microsite 中的Kotlin 示例进行比较。
甚至可以按原样重复该示例并让它在您的环境中工作以适应它。事后您应该能够以相同的方式使用您的应用程序。
推荐阅读
- sfml - SFML Sprite 没有显示在窗口上,即使它没有给我任何错误
- c# - WPF C# Listview - 如何在不触发 SelectionChanged 的情况下刷新列表
- sql - BigQuery中如何判断具有多项数据的列具有一个指定值?
- android-studio - 添加任何图像时出现错误,除了一个
- python - OSError: [WinError 10022] 提供了无效参数。Pygame Windows 错误
- python - Numpy方法没有出现
- amazon-web-services - 使用 $sample 从 Amazon DocumentDB 检索随机文档
- java - 将 spacemacs 配置为 java (spring) ide
- sql - 从具有不同的其他表中获取所有列
- python - 向 celery 添加动态任务