apache-kafka - 是否可以从 kafka 消息中获取消息键的最新值
问题描述
假设我对同一个消息键有不同的值。
例如:
{
userid: 1,
email: user123@xyz.com }
{
userid: 1,
email: user456@xyz.com }
{
userid: 1,
email: user789@xyz.com }
在上述情况下,我只想要用户更新的最新值,即“user789@xyz.com”。
我的 kafka 流应该只给我第三个值,而不是前两个值。
解决方案
由于您没有指定特定的客户端,我将向您展示如何使用 ksqlDB 和新添加的函数LATEST_BY_OFFSET
.
首先,我用源数据填充主题:
kafkacat -b broker:29092 -P -t test_topic -K: <<EOF
1:{ "userid": 1, "email": "user123@xyz.com" }
1:{ "userid": 1, "email": "user456@xyz.com" }
1:{ "userid": 1, "email": "user789@xyz.com" }
EOF
然后在 ksqlDB 中首先将其建模为事件流:
ksql> CREATE STREAM USER_UPDATES (USERID INT, EMAIL VARCHAR) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='JSON');
Message
----------------
Stream created
----------------
ksql> SET 'auto.offset.reset' = 'earliest'; [35/60]
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT ROWKEY, USERID, EMAIL FROM USER_UPDATES EMIT CHANGES LIMIT 3;
+---------+---------+-----------------+
|ROWKEY |USERID |EMAIL |
+---------+---------+-----------------+
|1 |1 |user123@xyz.com |
|1 |1 |user456@xyz.com |
|1 |1 |user789@xyz.com |
现在我们可以告诉 ksqlDB 获取这个事件流并只给我们最新的值(基于偏移量),或者直接:
ksql> SELECT USERID, LATEST_BY_OFFSET(EMAIL) FROM USER_UPDATES GROUP BY USERID EMIT CHANGES;
+--------------------+--------------------+
|USERID |KSQL_COL_1 |
+--------------------+--------------------+
|1 |user789@xyz.com |
Press CTRL-C to interrupt
或者更有用的是,作为 ksqlDB 中的物化状态:
CREATE TABLE USER_LATEST_STATE AS
SELECT USERID, LATEST_BY_OFFSET(EMAIL) AS EMAIL
FROM USER_UPDATES
GROUP BY USERID
EMIT CHANGES;
该表仍然由对 Kafka 主题的更改驱动,但可以直接查询当前状态,无论是现在(“拉查询”):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
Query terminated
ksql>
或者作为状态演变的变化流(“推送查询”):
ksql> SELECT EMAIL FROM USER_LATEST_STATE WHERE ROWKEY=1 EMIT CHANGES;
+--------------------+
|EMAIL |
+--------------------+
|user789@xyz.com |
[ query continues indefinitely ]
推荐阅读
- grails - 使用 Grails 4/5 在已部署的应用程序中重新加载 gsp
- proxy - SonarQube Scanner 的 Jenkins 工具的自动安装似乎忽略了 Jenkins 的代理设置
- python - 硒文本选择的异常索引问题
- rp2040 - RP2040 PIO 输出位序
- list - 具有无限列表的 Haskell 惰性求值和引入符号
- unix - 如何使用 sed 对日期/时间进行分组?
- url - 带有 mod_rewrite 的友好 URL - 如何?
- flutter-layout - 如何在颤动中将小部件移动到其他小部件?
- django - 400. 这是一个错误。您的客户发出了格式错误或非法的请求。这就是我们所知道的 - Google App Engine Django App
- c - 什么情况下会执行条件代码?