apache-flink - Pyflink 1.14 数据流源 -> 熊猫处理 -> 数据流接收器
问题描述
我最近在 Pyflink 中看到了可以通过 Table API 在 flink 中使用 pandas 数据帧的文档。因此,我的目标是:
- 从 Kafka 源接收数据流
- 转换为表 API 实例 -> 然后可以转换为 Pandas
- --- Pandas 处理逻辑
- 将 pandas 数据帧转换回 Table 实例
- 然后将其转换回数据流并沉入kafka
根据 flink 文档,我引用了转换Datastream <-> Table Instance和Table <-> pandas之间的代码。
import os
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.serialization import Encoder
from pyflink.common.serialization import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FileSink, StreamingFileSink
from pyflink.datastream.connectors import FlinkKafkaConsumer, FlinkKafkaProducer
from pyflink.datastream.data_stream import DataStream
from pyflink.datastream.functions import MapFunction
from pyflink.table.table import Table
from pyflink.table.table_environment import StreamTableEnvironment
KAFKA_SERVERS = os.getenv('KAFKA_BS_SERVERS',"kafka:9094").split(',')
KAFKA_USERNAME = "user"
KAFKA_PASSWORD = "123"
KAFKA_SOURCE_TOPIC = 'topic_one'
KAFKA_SINK_TOPIC = 'topic_two'
# creating a kafka source for the pipeline
def create_kafka_source(usern: str, password: str, topic: str):
kafka_props = {
'bootstrap.servers': ','.join(KAFKA_SERVERS),
'group.id': 'testgroup12',
'auto.offset.reset': 'earliest',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config' : f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{usern}\" password=\"{password}\";", # correct one
'security.protocol': 'SASL_PLAINTEXT',
"enable.auto.commit": "true",
"auto.commit.enable": "true",
"auto.commit.interval.ms": "1000",
"session.timeout.ms": "30000",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_source = FlinkKafkaConsumer(
topics=[topic],
deserialization_schema=SimpleStringSchema(),
properties= kafka_props
)
kafka_source.set_commit_offsets_on_checkpoints(True)
return kafka_source
# creating a kafka sink for the pipeline
def create_kafka_sink(usern: str, password: str, topic: str):
kafka_producer = FlinkKafkaProducer(
topic= topic,
serialization_schema=SimpleStringSchema(),
producer_config= {
'bootstrap.servers': ','.join(KAFKA_SERVERS),
'group.id': 'testgroup12',
'sasl.mechanism': 'PLAIN',
'sasl.jaas.config' : f"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{usern}\" password=\"{password}\";", # correct one
'security.protocol': 'SASL_PLAINTEXT'
}
)
return kafka_producer
# the pipeline which will run
def pipeline():
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.12-1.14.0.jar")
env.set_parallelism(1)
env.enable_checkpointing(5000)
t_env = StreamTableEnvironment.create(stream_execution_environment= env)
kafka_source = create_kafka_source(KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_SOURCE_TOPIC)
ds = env.add_source(kafka_source)
# Stream to Table
table : Table = t_env.from_data_stream(ds)
pd = table.to_pandas()
# custom pandas logic
pd["Testing"] = 'new Vals'
# Table to stream
table = t_env.from_pandas(pd,pd.columns.tolist())
t_env.create_temporary_view("InputTable", table)
res_table = t_env.sql_query("SELECT * FROM InputTable")
res_ds = t_env.to_data_stream(res_table)
# Sink to file and Kafka
res_ds.add_sink(StreamingFileSink
.for_row_format('/opt/flink/outputs_dumps', Encoder.simple_string_encoder())
.build())
kafka_sink = create_kafka_sink(KAFKA_USERNAME, KAFKA_PASSWORD, KAFKA_SINK_TOPIC)
res_ds.add_sink(kafka_sink)
env.execute("proto_1")
if __name__ == '__main__':
pipeline()
将其提交给 flink 时,将创建作业而不会出现任何错误或异常:
$ /opt/flink/bin/flink 运行 --python script.py
但是在flink UI上可以看到job name没有分别注册
以及未反映在输出主题上的熊猫逻辑。(1)作为源数据包接收的 json 数据包,(2) pandas 本质上为数据包添加了一个新值,(3)然后应该将此数据包接收回输出主题
收到的源主题:
{“气缸”:8.0,“排量”:360.0,“马力”:215.0,“重量”:4615.0,“加速度”:14.0,“车型年”:70.0,“美国”:1.0,“欧洲”:0.0, “日本”:0.0}
目标主题的输出('testing': 'New vals' not added):
{“气缸”:8.0,“排量”:360.0,“马力”:215.0,“重量”:4615.0,“加速度”:14.0,“车型年”:70.0,“美国”:1.0,“欧洲”:0.0, “日本”:0.0}
如果我的方法不正确,有人可以告诉我正确的实施方式吗?这应该作为无界流操作工作(不是作为批处理操作,如果我的术语在这里正确......)
解决方案
推荐阅读
- mysql - 如何在 .sql 脚本中导出 Laravel 数据库
- java - 为什么 JButton 不显示?
- python - 如何消除频率之间的“点击”?
- ios - 单例中的@Published 属性在第一个事件之后不发出事件
- asp.net - 如何将 JObject 作为对象添加到实体框架
- mysql - 有没有办法在 GCP 中创建非只读 Cloud SQL MySQL 副本?
- matlab - 如何解决 MATLAB Simulink 的定点工具中的下溢(/上溢)问题?
- r - 在 R 中将数据框项从字符串转换为 int 时出现意外行为
- reactjs - 失败的道具类型:提供给`ForwardRef(Select)`的无效道具`children`,需要一个ReactNode
- html - 需要用参数设置一个div的背景