首页 > 解决方案 > Flink 获取 KeyedState 状态值并在另一个 Stream 中使用

问题描述

我知道键控状态属于它的键,只有当前键访问它的状态值,其他键不能访问不同键的状态值。

我尝试使用相同的密钥但在不同的流中访问状态。可能吗?

如果不可能,那么我将有 2 个重复数据?

不:我需要两个流,因为它们每个都有不同的时间窗口和不同的实现。

这是示例(我知道 keyBy(somthing) 对于两个流操作都是相同的):

public class Sample{
       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(4))
                .process(new CustomMyProcessFunction())
                .name("CustomMyProcessFunction")
                .print();

       streamA
                .keyBy(something)
                .timeWindow(Time.seconds(1))
                .process(new CustomMyAnotherProcessFunction())
                .name("CustomMyProcessFunction")
                .print();
}

public class CustomMyProcessFunction extends ProcessWindowFunction<..>
{
    private Logger logger = LoggerFactory.getLogger(CustomMyProcessFunction.class);
    private transient ValueState<SimpleEntity> simpleEntityValueState;
    private SimpleEntity simpleEntity;

    @Override
    public void open(Configuration parameters) throws Exception
    {
        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value == null)
        {
            SimpleEntity newVal = new SimpleEntity("sample");
            logger.info("New Value put");
            simpleEntityValueState.update(newVal);
        }
        ...
    }
...
}

public class CustomMyAnotherProcessFunction extends ProcessWindowFunction<..>
{


    private transient ValueState<SimpleEntity> simpleEntityValueState;

    @Override
    public void open(Configuration parameters) throws Exception
    {

        ValueStateDescriptor<SimpleEntity> simpleEntityValueStateDescriptor = new ValueStateDescriptor<SimpleEntity>(
                "sample",
                TypeInformation.of(SimpleEntity.class)
        );
        simpleEntityValueState = getRuntimeContext().getState(simpleEntityValueStateDescriptor);
    }

    @Override
    public void process(...) throws Exception
    {
        SimpleEntity value = simpleEntityValueState.value();
        if (value != null)
            logger.info(value.toString()); // I expect that SimpleEntity("sample")
        out.collect(...);
    }
...
}

标签: apache-flinkflink-streaming

解决方案


正如已经指出的那样,状态对于单个操作员实例来说总是本地的。它不能被共享。

但是,您可以做的是将状态更新从持有状态的操作员流式传输到需要它的其他操作员。使用侧面输出,您可以创建复杂的数据流,而无需共享状态。


推荐阅读