scala - 如何将 Protobuf 数据从 Flink 转发到 Kafka 和 stdout?
问题描述
我想在这里添加一些代码,并标准输出来自 Flink 的 protobuf 数据。
我正在使用 Flink 的 Apache Kafka 连接器将 Flink 连接到 Kafka。
这是我的 Flink 的代码。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("bootstrap.servers", "localhost:9092").
val producer = FlinkKafkaProducer011(topic, new myProtobufSchema, props)
env.addSink(producer)
env.execute("To Kafka")
这是我的卡夫卡的代码。
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "protobuf-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
// TODO: implement here to stdout
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(Duration.ofSeconds(10))
}
解决方案
您需要设置StreamsBuilder
从主题消费
val builder: StreamsBuilder = new StreamsBuilder()
.stream(topic)
.print(Printed.toSysOut());
推荐阅读
- oracle - 如何在 apex 19.2 中使用数据库中的表
- r - 使用多个标准过滤整齐的数据
- python-3.x - 解决两个矩阵的乘法
- typescript - 通过来自另一个组件的事件调用 vue 组件属性设置器
- react-native - 如何使用redux提供者将其与地图状态连接起来
- google-chrome - 在页面加载时,加载 div 内容(html 和 js 和 css)的异步 ajax 调用会导致 chrome 80 中出现不推荐使用的错误
- python - pyenv 使用正确的 python 版本但使用错误的库文件夹
- python - AttributeError:“pyodbc.Cursor”对象没有属性“方言”
- python - 如何从普通函数中向 Python Dask 调度程序提交任务
- virtual-machine - 如何在访客上启用 IOMMU