首页 > 解决方案 > Pyflink:组窗口需要一个时间属性来在流环境中进行分组

问题描述

我使用 pyflink 1.11 有以下代码:

import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.datastream.time_characteristic import TimeCharacteristic
from pyflink.table import (
    StreamTableEnvironment,
    DataTypes,
    EnvironmentSettings,
    CsvTableSink,
)
from pyflink.table.descriptors import Schema, Rowtime, Json, Kafka
from pyflink.table.window import Tumble

env_settings = (
    EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
)

execution_environment = StreamExecutionEnvironment.get_execution_environment()
execution_environment.set_parallelism(1)
execution_environment.set_stream_time_characteristic(TimeCharacteristic.EventTime)

table_env = StreamTableEnvironment.create(
    execution_environment, environment_settings=env_settings
)
statement_set = table_env.create_statement_set()

KAFKA_BROKER_URL = "<URL>"
KAFKA_TOPIC = "user_events"


def get_data_from_kafka_source():
    table_env.connect(
        Kafka()
        .version("universal")
        .topic(KAFKA_TOPIC)
        .start_from_earliest()
        .property("bootstrap.servers", KAFKA_BROKER_URL)
    ).with_format(
        Json()
        .fail_on_missing_field(False)
        .schema(
            DataTypes.ROW(
                [
                    DataTypes.FIELD("event_timestamp", DataTypes.TIMESTAMP(3)),
                    DataTypes.FIELD("event_uid", DataTypes.STRING()),
                    DataTypes.FIELD("user_id", DataTypes.STRING()),
                    DataTypes.FIELD("country", DataTypes.STRING()),
                ]
            )
        )
    ).with_schema(
        Schema()
        .field("event_timestamp", DataTypes.TIMESTAMP(3))
        .field("event_uid", DataTypes.STRING()) 
        .field("country", DataTypes.STRING())
        .field("user_id", DataTypes.STRING())
        .field("rowtime", DataTypes.TIMESTAMP(3))
        .rowtime(
            Rowtime()
            .timestamps_from_field("event_timestamp")
            .watermarks_periodic_bounded(60000)
        )
    ).in_append_mode().create_temporary_table(
        "user_events"
    )


def sink_into_csv():
    result_file = "/opt/examples/data/output/output_file.csv"
    if os.path.exists(result_file):
        os.remove(result_file)

    table_env.register_table_sink(
        "sink_into_csv",
        CsvTableSink(
            ["country", "count_sessions", "last_timestamp"],
            [
                DataTypes.STRING(),
                DataTypes.DOUBLE(),
                DataTypes.TIMESTAMP(3),
            ],
            result_file,
        ),
    )


def run_job():
    get_data_from_kafka_source()
    sink_into_csv()

    table_env.scan("user_events").window(
        Tumble.over("5.minutes").on("rowtime").alias("w")
    ).group_by("country, w").select(
        "country AS country, COUNT(1) AS count_sessions, w.end AS last_timestamp"
    ).insert_into(
        "sink_into_csv"
    )

    table_env.execute("kafka to csv")


if __name__ == "__main__":
    run_job()

但我不断收到此错误:

Traceback (most recent call last):
  File "code/kafka_to_csv.py", line 227, in <module>
    run_job()
  File "code/kafka_to_csv.py", line 210, in run_job
    "country AS country, COUNT(1) AS count_sessions, w.end AS last_timestamp"
  File "/opt/flink/opt/python/pyflink.zip/pyflink/table/table.py", line 907, in select
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 147, in deco
  File "/opt/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o210.select.
: org.apache.flink.table.api.ValidationException: A group window expects a time attribute for grouping in a stream environment.
        at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateStreamTimeAttribute(AggregateOperationFactory.java:293)
        at org.apache.flink.table.operations.utils.AggregateOperationFactory.validateTimeAttributeType(AggregateOperationFactory.java:278)
        at org.apache.flink.table.operations.utils.AggregateOperationFactory.getValidatedTimeAttribute(AggregateOperationFactory.java:271)
        at org.apache.flink.table.operations.utils.AggregateOperationFactory.createResolvedWindow(AggregateOperationFactory.java:233)
        at org.apache.flink.table.operations.utils.OperationTreeBuilder.windowAggregate(OperationTreeBuilder.java:250)
        at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:794)
        at org.apache.flink.table.api.internal.TableImpl$WindowGroupedTableImpl.select(TableImpl.java:781)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

org.apache.flink.client.program.ProgramAbortException
        at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)

有人可以帮我理解我做错了什么吗?不确定它是否重要,但它event_timestamp"2021-11-03 20:24:46.095000"格式。

先感谢您!

标签: pythonapache-flinkpyflink

解决方案


事件时间属性始终是定义了水印的时间戳字段。你的情况是event_timestamp。这就是您需要在翻滚窗口(而不是rowtime)中使用的字段。

此外,您的表似乎实际上没有名为rowtime的列,因此架构可以将该字段排除在外。


推荐阅读