首页 > 解决方案 > Flink 中 java.util.map 和自定义 pojo 的序列化

问题描述

我正在尝试使用不从 kinesis flink 应用程序中的外部库实现 Serializable 的 pojo。在 flatMap 函数中使用它时序列化失败。

波乔

public class ExecutionRecord {
    private Map<String, VariableGroup> factMap;
    private List<ModelResult> models;
    private List<RulesetResult> rulesets;
    private Outcome outcome;
    private ExecutionMetadata executionMetadata;
}

的输出TypeInformation.of(ExecutionRecord.class).toString()

PojoType<ExecutionRecord, fields = [executionMetadata: PojoType<ExecutionMetadata, fields = [endTime: String, evaluationType: String, executionHost: String, executionId: String, gmraInstanceIDs: GenericType<java.util.List>, startTime: String]>, factMap: GenericType<java.util.Map>, models: GenericType<java.util.List>, outcome: PojoType<Outcome, fields = [actions: GenericType<java.util.List>, failedActions: GenericType<java.util.List>, outcomeName: String]>, rulesets: GenericType<java.util.List>]>

错误- java.io.NotSerializableException: ExecutionRecord

堆栈跟踪也没有显示它无法序列化的特定字段。

我应该如何注册序列化程序java.util.list以及java.util.map哪些被识别为泛型类型以及其余的自定义 pojo

标签: javaserializationapache-flinkflink-streamingamazon-kinesis-analytics

解决方案


你可以做这样的事情

public static final TypeInformation<ExecutionRecord> TYPE_INFORMATION_POJO = Types.POJO(ExecutionRecord.class);

or

public static final TypeInformation<ExecutionRecord> TYPE_INFORMATION = TypeInformation.of(BehProdViewFLDTO.class);

并通过TYPE_INFORMATION_POJO或传递TYPE_INFORMATION到各州或在您可能需要的时候!


推荐阅读