apache-kafka - ksql - 接收器主题 XXXX 在元存储中不存在
问题描述
我正在尝试基于 sourceTopic1 和 sourceTopic2 在 targetTopic1 中创建数据。两个源主题都应该具有相同的事件结构。创建第一个目标流,然后尝试将数据从另一个源流插入到当前流。有什么建议吗?
ksql> CREATE STREAM sourceTopic1Stream (category varchar, source varchar, type varchar, id varchar, payload varchar) WITH (KAFKA_TOPIC='sourceTopic1', VALUE_FORMAT='json');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM sourceTopic2Stream (category varchar, source varchar, type varchar, id varchar, payload varchar) WITH (KAFKA_TOPIC='sourceTopic2', VALUE_FORMAT='json');
Message
----------------
Stream created
----------------
ksql> CREATE STREAM targetTopic1Stream WITH (kafka_topic='targetTopic1', partitions=3) AS select 'sourceTopic1' topicname, category, source, type, id, payload from sourceTopic1Stream where id like 'myid%';
Message
----------------------------
Stream created and running
----------------------------
ksql> INSERT INTO targetTopic1Stream SELECT 'sourceTopic2' topicname, category, source, type, id, payload FROM sourceTopic2Stream where id like 'myid%';
io.confluent.ksql.util.KsqlException: Sink topic TARGETTOPIC1STREAM does not exist in th e metastore.
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
------------------------------------------------------------------------------------------------
_confluent-metrics | false | 12 | 1 | 0 | 0
_schemas | false | 1 | 1 | 0 | 0
sourceTopic1 | true | 3 | 1 | 3 | 1
sourceTopic2 | true | 3 | 1 | 0 | 0
targetTopic1 | true | 3 | 1 | 0 | 0
------------------------------------------------------------------------------------------------
ksql> show streams;
Stream Name | Kafka Topic | Format
--------------------------------------------
SOURCETOPIC2STREAM | sourceTopic2 | JSON
TARGETTOPIC1STREAM | targetTopic1 | JSON
SOURCETOPIC1STREAM | sourceTopic1 | JSON
--------------------------------------------
ksql>
解决方案
这是 KSQL 中的一个错误。我在这里写了:https ://github.com/confluentinc/ksql/issues/2123
解决方法是不要kafka_topic
在您的CREATE STREAM … AS
:
ksql>
ksql> CREATE STREAM TargetStream WITH (partitions=3) AS select 'sourceTopic1' topicname, category, source, type, id, payload from sourceTopic1Stream where id like 'myid%';
Message
----------------------------
Stream created and running
----------------------------
ksql> INSERT INTO TargetStream SELECT 'sourceTopic2' AS topicname, * FROM sourceTopic2Stream where id like 'myid%';
Message
-------------------------------
Insert Into query is running.
-------------------------------
ksql> SELECT * FROM TargetStream;
1541496149897 | null | sourceTopic2 | Foo2 | bar: | x | myid2 | asdf
1541496141671 | null | sourceTopic1 | Foo1 | bar: | x | myid1 | asdf
推荐阅读
- python - 如何使用 webbrowser 从 Python 脚本启动 Microsoft Edge
- angular - 角度材质菜单
- scala - Spark:一台机器上有更多的执行者,每个任务的持续时间更长
- linux - 在 GLX 中使用 X11 转发时遇到问题
- elasticsearch - 将文档放入索引时发生 Elasticsearch 冲突
- python - Kivy - 如何使用自定义元数据/信息标记按钮或事件?
- css - CSS 变换动画在 Chrome 中不起作用
- api - Google 日历 API 免费插槽
- c++ - c++ queue.front(); 为什么不从第一个元素开始?
- c++ - 为什么这个选择排序算法仍然切换一个元素,当它已经是其余元素中的最小元素时?