apache-kafka - 我想在 Apache Flink 中做流媒体工作来做 Kafka -> Flink -> HIVE
问题描述
我想在 Apache Flink 中进行流式处理以在 Apache Flink(Scala)中执行 Kafka -> Flink -> HIVE。任何人都可以提供代码示例,因为他们的官方文档不是很清楚理解。
这应该是流式处理。
解决方案
如需帮助开始使用 Table API,您可以参考使用 Table API 进行实时报告的教程。它是用 Java 编写的,但 Scala API 并没有太大的不同。
这是一个使用 SQL 从 Kafka 读取并写入 Hive 的示例。要在 Scala 中执行相同操作,您可以使用 包装 SQL 语句tableEnv.executeSql(...)
,如
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
或者
val tableResult1 = tEnv.executeSql("INSERT INTO ...")
如果您需要进行多次插入,那么您需要使用StatementSet
. 有关详细信息,请参阅下面链接的文档。
请参阅运行 CREATE 语句、运行 INSERT 语句、Apache Kafka SQL 连接器和写入 Hive。
如果您遇到困难,请向我们展示您尝试了什么以及它是如何失败的。
推荐阅读
- c# - 在 Visual Studio 中为 C# 更新 App.Config 文件的最佳方法是什么?
- python-3.x - 如何识别一行中特定序列第一次出现的位置
- php - 数组忽略 post_type
- python - 如何使用python的数据表按日期过滤
- python - Python:使用 \n 作为换行符读取文件。文件还包含 \r\n
- oracle - 将 oracle 表中的前导零提取到文件中
- java - 预览 Google 幻灯片/PowerPoint
- arrays - 屏蔽多维数组 - numpy
- r - 有没有办法将公式应用于两个值向量并获取每个组合的输出数据框?
- ubuntu-18.04 - libVLC 未在 Ubuntu 18.04 (WSL 2.0) 上初始化