java - 如何从 SubscriableChannel 构建 KStream
问题描述
我正在使用 Spring 云流,并想稍微摆弄一下 KStreams/KTables。
我正在寻找从标准 Kafka 主题转换为流的方法。
我已经在 KSQL 中完成了这项工作,但我试图弄清楚是否有办法让 SpringBoot 处理这个问题。我能找到的最好的例子是@Input
和@Output
渠道都已经存在的例子,KStreams
但我认为这不是我想要的。
卡夫卡设置
在 SpringBoot 内部,我正在执行以下操作:
- 我的数据来了:
force-entities-topic
- 然后我“清理”
[UTC]
从时间消息中删除标签的数据并重新发布:force-entities-topic-clean
从那里我希望得到它的输出并在现场构建 aKStream
和KTable
keyed platformUID
。
输入数据
所以我正在使用的数据是:
{
"platformUID": "UID",
"type": "TLPS",
"state": "PLATFORM_INITIALIZED",
"fuelremaining": 5.9722E+24,
"latitude": 39,
"longitude": -115,
"altitude": 0,
"time": "2018-07-18T00:00:00Z[UTC]"
}
KSQL
我可以运行这些 KSQL 命令来创建我需要的东西。(这里我以字符串的形式读取时间,而不是我在 java/kotlin 实现中执行的实际时间)
CREATE STREAM force_no_key (
platformUID string,
type string,
state string,
fuelremaining DOUBLE,
latitude DOUBLE,
longitude DOUBLE,
altitude DOUBLE
) with (
kafka_topic='force-entities-topic',
value_format='json');
从那里我制作另一个流(因为我无法让它正确读取密钥)
CREATE STREAM force_with_key
WITH (KAFKA_TOPIC='blue_force_with_key') AS
select PLATFORMUID as UID, LATITUDE as lat, LONGITUDE as LON, ALTITUDE as ALT, state, type
FROM force_no_key
PARTITION BY UID;
而从这一点
CREATE TABLE FORCE_TABLE
( UID VARCHAR,
LAT DOUBLE,
LON DOUBLE,
ALT DOUBLE
) WITH (KAFKA_TOPIC = 'force_with_key',
VALUE_FORMAT='JSON',
KEY = 'UID');
爪哇风格!
我认为我遇到麻烦的地方就在这里。我在这里定义我的绑定接口:
interface ForceStreams {
companion object {
// From the settings file we configure it with the value of-force-in
const val DIRTY_INPUT = "dirty-force-in"
const val CLEANED_OUTPUT = "clean-force-out"
const val CLEANED_INPUT = "clean-force-in"
const val STREAM_OUT = "stream-out"
}
@Input(DIRTY_INPUT)
fun initialInput(): MessageChannel
@Output(CLEANED_OUTPUT)
fun cleanOutput(): SubscribableChannel
@Input(CLEANED_INPUT)
fun cleanInput(): MessageChannel
@Output(STREAM_OUT)
fun cleanedBlueForceMessage(): KStream<String, ForceEntity>
@Output(TABLE_OUT)
fun tableOutput(): KTable<String, ForceEntity>
}
然后我用这个块进行清洁:
@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {
var inputMap: Map<String, Any> = objectMapper.readValue(message)
var map = inputMap.toMutableMap()
map["type"] = map["type"].toString().replace("-", "_")
map["time"] = map["time"].toString().replace("[UTC]", "")
val json = objectMapper.writeValueAsString(map)
val fe : ForceEntity = objectMapper.readValue(json, ForceEntity::class.java)
return fe
}
但我要从MessageChannel
到SubscribableChannel
我不确定该怎么做是从SubscribableChannel
到KStream<String,ForceEntity>
或者KTable<String,ForceEntity>
任何帮助将不胜感激 - 谢谢
编辑 - applicaiton.yml
server:
port: 8888
spring:
application:
name: Blue-Force-Table
kafka:
bootstrap-servers: # This seems to be for the KStreams the other config is for normal streams
- localhost:19092
cloud:
stream:
defaultBinder: kafka
kafka:
binder:
brokers: localhost:19092
bindings:
dirty-force-in:
destination: force-entities-topic
contentType: application/json
clean-force-in:
destination: force-entities-topic-clean
contentType: application/json
clean-force-out:
destination: force-entities-topic-clean
contentType: application/json
stream-out:
destination: force_stream
contentType: application/json
table-out:
destination: force_table
contentType: application/json
我想接下来的问题是 - 这甚至可能吗?您可以在单个函数中混合和匹配活页夹吗?
解决方案
首先StreamListener
,您通过绑定接收数据DIRTY_INPUT
并通过绑定写入数据CLEANED_OUTPUT
。然后你需要有另一个StreamListener
,在那里你接收该数据KStream
并进行处理并写入输出。
第一处理器:
@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {
....
将以下内容更改为KStream
绑定。
@Input(CLEANED_INPUT)
fun cleanInput(): MessageChannel
至
@Input(CLEANED_INPUT)
fun cleanInput(): KStream<String, ForceEntity>
第二处理器:
@StreamListener(CLEANED_INPUT)
@SendTo(STREAM_OUT)
public KStream<String, ForceEntity> process(
KStream<String, ForceEntity> forceEntityStream) {
return forceEntityStream
........
.toStream();
}
目前,Spring Cloud Stream 中的 Kafka Streams binder 不支持将数据写为KTable
. 输出上只KStream
允许使用对象(KTable
输入上允许绑定)。如果这是一个硬性要求,您需要研究 Spring Kafka,您可以在其中降低级别并执行此类出站操作。
希望有帮助。
推荐阅读
- ios - 为什么在使用“lazy var”创建实例时出现编译器错误?
- c - 需要 C 语言客户端和服务器编程方面的帮助
- python-3.x - 选择具有类名的特定行
- c# - 在 SelectedValueChange 事件中获取组合框的当前值
- laravel - laravel 中的不同文件类型和一个输入
- javascript - `render` 在下面的单元测试中做了什么?
- eclipse - Eclipse:通过引导仪表板运行 Spring Boot Maven 项目时出现异常(空指针)
- javascript - 基于另一个数组 (JavaScript) 验证和格式化数组输出
- xcode - 命令 CompileAssetCatalog 失败,退出代码为非零
- javascript - 更新数据时不工作 equalHeights