首页 > 解决方案 > Kafka producer using HiveStorageHandler

问题描述

I am relatively new to hive/hadoop

I was reading through this Hive Storage Handlers.

Now I am trying to write a custom implementation of HiveStorageHandler for querying and pushing messages to Kafka using a Hive Table.

I saw that there are other implementations of HiveStorageHandler which lets us query and write on NoSQL databases using hive tables.

I am trying to replicate that for Kafka. I found a project on it

HiveKa - query Kafka using Hive

Here they are trying to read data from Kafka using queries on the hive table. I wish to write on the kafka topic using insert on the table.

Can someone please guide me on this ?

标签: hadoophiveapache-kafkakafka-producer-api

解决方案


我希望在表格上使用 insert 来写关于 kafka 的主题。

这可以使用 Kafka HiveStorageHandler。以下是此功能可能的一般用例

  1. 查询 Kafka 主题
  2. 从 Kafka 主题中查询数据并插入到 hive 托管/外部表中
  3. 从 Kafka 主题中查询数据并推送到其他 Kafka 主题中
  4. 从 hive 外部/托管表中查询数据并推送到 Kafka 主题中

您正在尝试执行第三个用例。

首先为源和目标 Kafka 主题创建两个外部表。

create external table if not exists source_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='source_topic_name',
'kafka.bootstrap.servers'=''
);


create external table if not exists target_topic_table
(
<fields>
)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES (
'kafka.topic'='target_topic_name',
'kafka.bootstrap.servers'=''
);

然后使用合并查询将数据插入目标 Kafka 主题

merge into target_topic_table
using (
select
<columns>,
cast(null as binary) as `__key`,
cast(null as int) as `__partition`,
cast(-1 as bigint) as `__offset`,
cast(to_epoch_milli(current_timestamp) as bigint) as `__timestamp`
from source_topic_table
) sub
on
sub.column_name = target_topic_table.coulmn_name <Some condition>
when not matched then insert values
(
<sub.columns>,
sub.`__key`,sub.`__partition`,sub.`__offset`,sub.`__timestamp`
);

笔记:

  1. 使用 Hive 外部非原生表

  2. 除了用户定义的有效负载模式之外,Kafka 存储处理程序还附加了 4 个附加列(__key、__partition、__offset、__timestmap),用户可以使用这些列来查询 Kafka 元数据字段

  3. 如果数据不是 csv 格式,用户必须设置“kafka.serde.class”表属性

  4. 用户还可以设置“kafka.write.semantic”表属性,该属性允许 NONE、AT_LEAST_ONCE 或 EXACTLY_ONCE 值。


推荐阅读