apache-kafka - 复杂对象 KStream GlobalKTable 左连接
问题描述
我是 Kafka Streams 的新手。我想执行以下基于 KStream-GlobalKTable 纯 DSL 的左连接操作,而不是使用映射操作。
我有一个输入流a.topic
,它是 <String, A>,其中 value :
{
"b_obj": {
"b_value": "xyz",
"c_list": [
{
"d_obj": {
"d_id1": "value1",
"d_id2": "value2",
"d_value": "some value"
},
"c_value": "jkl"
},
{
"d_obj": {
"d_id1": "value3",
"d_id2": "value4",
"d_value": "some value 2"
},
"c_value": "pqr"
}
]
},
"a_value": "abcd"
}
另一个输入主题e.topic
是 <String, E>,其中 value :
{
"e_id1": "value1",
"e_id2": "value2",
"e_value": "some value"
}
我想执行左连接操作a.topic
是一个流,主数据e.topic
是一个全局表来实现结果值
{
"b_obj": {
"b_value": "xyz",
"c_list": [
{
"d_obj": {
"d_id1": "value1",
"d_id2": "value2",
"d_value": "some value"
},
"e_obj": {
"e_id1": "value1",
"e_id2": "value2",
"e_value": "some value a"
},
"c_value": "jkl"
},
{
"d_obj": {
"d_id1": "value3",
"d_id2": "value4",
"d_value": "some value 2"
},
"e_obj": {
"e_id1": "value3",
"e_id2": "value4",
"e_value": "some value b"
},
"c_value": "pqr"
}
]
},
"a_value": "abcd"
}
并且加入条件是a.b.c[i].d.d_id1 == e.e_id1 && a.b.c[i].d.d_id2 == e.e_id2
代码:
public class ComplexBeanStream {
public static void main(String[] args) {
Serde<A> aSerde = new JsonSerde<>(A.class);
Serde<E> eSerde = new JsonSerde<>(E.class);
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "complex-bean-app");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "complex-bean-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
StreamsBuilder builder = new StreamsBuilder();
final GlobalKTable<String, E> eGlobalTable =
builder.globalTable(
"e.topic",
Materialized.<String, E, KeyValueStore<Bytes, byte[]>>
as("E-STORE")
.withKeySerde(Serdes.String())
.withValueSerde(eSerde)
);
final KStream<String, A> aStream =
builder.stream(
"a.topic",
Consumed.with(Serdes.String(), aSerde));
// perform left-join here
Topology topology = builder.build();
System.out.println("\n\nComplexBeanStream Topology: \n" + topology.describe());
final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
streams.cleanUp();
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
class A {
private B b_obj;
private String a_value;
}
class B {
private List<C> c_list;
private String b_value;
}
class C {
private D d_obj;
private E e_obj;
private String c_value;
}
class D {
private String d_id1;
private String d_id2;
private String d_value;
}
class E {
private String e_id1;
private String e_id2;
private String e_value;
}
解决方案
只要连接查找基于值而不是键,就不可能在没有映射的情况下加入。
Kafka-Streams 只能基于双方相同的密钥加入。这意味着,您应该为加入的双方映射并选择新键(重新键控)以实现a.b.c[i].d.d_id1 == e.e_id1 && a.b.c[i].d.d_id2 == e.e_id2
.
在这种情况下,一侧可以[a.b.c[i].d.d_id1, a.b.c[i].d.d_id2]
作为其密钥,另一侧作为其密钥[e.e_id1, e.e_id2]
。如果您有匹配项,则可以将这些值连接到一个新对象。可能您应该c_list
在重新键入之前对您的进行平面映射。
阅读有关加入Kafka-Streams 的信息也很有帮助。
推荐阅读
- qt - 了解 QtService 中的后端
- python - 如何在熊猫中使用字典过滤数据框?
- python - 在 NLTK 中创建语法
- python - 循环遍历 pandas 并查找行列对
- typescript - 约束参数函数的返回类型
- quasar-framework - q-card-actions 的使用
- javascript - 单击按钮时滚动到元素
- google-cloud-firestore - 如何使用 Firestore 和 Express 应用部署 Firebase 应用以供全球访问?
- python - 如何在 python 上显示确切的文件链接本身?
- swift - 如何创建可以为 Xcode 添加模板的 Pod Lib?