hadoop - Kafka producer using HiveStorageHandler
问题描述
I am relatively new to hive/hadoop
I was reading through this Hive Storage Handlers.
Now I am trying to write a custom implementation of HiveStorageHandler for querying and pushing messages to Kafka using a Hive Table.
I saw that there are other implementations of HiveStorageHandler which lets us query and write on NoSQL databases using hive tables.
I am trying to replicate that for Kafka. I found a project on it
HiveKa - query Kafka using Hive
Here they are trying to read data from Kafka using queries on the hive table. I wish to write on the kafka topic using insert on the table.
Can someone please guide me on this ?
解决方案
我希望在表格上使用 insert 来写关于 kafka 的主题。
这可以使用 Kafka HiveStorageHandler。以下是此功能可能的一般用例
- 查询 Kafka 主题
- 从 Kafka 主题中查询数据并插入到 hive 托管/外部表中
- 从 Kafka 主题中查询数据并推送到其他 Kafka 主题中
- 从 hive 外部/托管表中查询数据并推送到 Kafka 主题中
您正在尝试执行第三个用例。
首先为源和目标 Kafka 主题创建两个外部表。
create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);
create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);
然后使用合并查询将数据插入目标 Kafka 主题
merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);
笔记:
使用 Hive 外部非原生表
除了用户定义的有效负载模式之外,Kafka 存储处理程序还附加了 4 个附加列(__key、__partition、__offset、__timestmap),用户可以使用这些列来查询 Kafka 元数据字段
如果数据不是 csv 格式,用户必须设置“kafka.serde.class”表属性
用户还可以设置“kafka.write.semantic”表属性,该属性允许 NONE、AT_LEAST_ONCE 或 EXACTLY_ONCE 值。
推荐阅读
- apache-spark - 从 S3 读取数据时,Spark 是否保证一致性?
- r - 根据R中数据框中的其他变量替换变量上的值
- domain-driven-design - 如何在事件源系统中建模聊天消息?
- azure-ad-b2c - Azure B2C 邀请消费者用户并检索用户权限
- python - 从另一个容器内的 python 客户端调用容器内的服务时出现连接错误
- python - Visual Studio 2019 中的 Ironpython
- assembly - arm64汇编中adr指令的第一个参数之后的句点有什么作用?
- python - 使用海龟使用坐标点绘制三角形的周长
- servlets - HttpSession 无效正在重定向到登录页面
- android - 向下滚动 RecyclerView 会在 LiveData 更新其内容时跳回顶部,它应该保持其当前的 Y 偏移量