首页 > 解决方案 > 使用 Object 类的 Map 收集对象时的 Flink 收集器问题

问题描述

我面临一个问题,当我从 flink flatmap 收集器收集对象时,我没有正确收集到值。我得到了对象引用,它没有给我实际的价值。

dataStream.filter(new FilterFunction<GenericRecord>() {
      @Override
      public boolean filter(GenericRecord record) throws Exception {
        if (record.get("user_id") != null) {
          return true;
        }
        return false;
      }
    }).flatMap(new ProfileEventAggregateFlatMapFunction(aggConfig))
        .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>() {
          @Override
          public String map(
              ProfileEventAggregateEmittedTuple profileEventAggregateEmittedTupleNew)
              throws Exception {
            String res=null;
            try {
              ObjectMapper mapper = new ObjectMapper();
              mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
              res= mapper.writeValueAsString(profileEventAggregateEmittedTupleNew);
            } catch (Exception e) {
              e.printStackTrace();
            }
            return res;
          }
        }).print();




public class ProfileEventAggregateFlatMapFunction extends
    RichFlatMapFunction<GenericRecord, ProfileEventAggregateEmittedTuple> {

  private final ProfileEventAggregateTupleEmitter aggregator;
  ObjectMapper mapper = ObjectMapperPool.getInstance().get();

  public ProfileEventAggregateFlatMapFunction(String config) throws IOException {
    this.aggregator = new ProfileEventAggregateTupleEmitter(config);
  }

  @Override
  public void flatMap(GenericRecord event,
      Collector<ProfileEventAggregateEmittedTuple> collector) throws Exception {
    try {

      List<ProfileEventAggregateEmittedTuple> aggregateTuples = aggregator.runAggregates(event);

      for (ProfileEventAggregateEmittedTuple tuple : aggregateTuples) {

        collector.collect(tuple);
      }
}}

调试结果:我在收集器中收集的元组

tuple = {ProfileEventAggregateEmittedTuple@7880} 
 profileType = "userprofile"
 key = "1152473"
 businessType = "keyless"
 name = "consumer"
 aggregates = {ArrayList@7886}  size = 1
  0 = {ProfileEventAggregate@7888} "geo_id {geo_id=1} {keyless_select_destination_cnt=1, total_estimated_distance=12.5}"
   entityType = "geo_id"
   dimension = {LinkedHashMap@7891}  size = 1
    "geo_id" -> {Integer@7897} 1
     key = "geo_id"
     value = {Integer@7897} 1
   metrics = {LinkedHashMap@7892}  size = 2
    "keyless_select_destination_cnt" -> {Long@7773} 1
     key = "keyless_select_destination_cnt"
     value = {Long@7773} 1
    "total_estimated_distance" -> {Double@7904} 12.5
     key = "total_estimated_distance"
     value = {Double@7904} 12.5

这是我在我的地图函数中得到的 .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>()

 profileEventAggregateEmittedTuple = {ProfileEventAggregateEmittedTuple@7935} 
 profileType = "userprofile"
 key = "1152473"
 businessType = "keyless"
 name = "consumer"
 aggregates = {GenericData$Array@7948}  size = 1
  0 = {ProfileEventAggregate@7950} "geo_id {geo_id=java.lang.Object@863dce2} {keyless_select_destination_cnt=java.lang.Object@7cdb4bfc, total_estimated_distance=java.lang.Object@52e81f57}"
   entityType = "geo_id"
   dimension = {HashMap@7952}  size = 1
    "geo_id" -> {Object@7957} 
     key = "geo_id"
     value = {Object@7957} 
      Class has no fields
   metrics = {HashMap@7953}  size = 2
    "keyless_select_destination_cnt" -> {Object@7962} 
     key = "keyless_select_destination_cnt"
     value = {Object@7962} 
      Class has no fields
    "total_estimated_distance" -> {Object@7963} 

请帮助我了解发生了什么,为什么我没有得到正确的数据。

public class ProfileEventAggregateEmittedTuple implements Cloneable, Serializable {
  private String profileType;
  private String key;
  private String businessType;
  private String name;
  private List<ProfileEventAggregate> aggregates = new ArrayList<ProfileEventAggregate>();
  private long startTime;
  private long endTime;

  public String getProfileType() {
    return profileType;
  }

  public void setProfileType(String profileType) {
    this.profileType = profileType;
  }

  public String getKey() {
    return key;
  }

  public void setKey(String key) {
    this.key = key;
  }

  public String getBusinessType() {
    return businessType;
  }

  public void setBusinessType(String businessType) {
    this.businessType = businessType;
  }

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public List<ProfileEventAggregate> getAggregates() {
    return aggregates;
  }

  public void addAggregate(ProfileEventAggregate aggregate) {
    this.aggregates.add(aggregate);
  }

  public void setAggregates(List<ProfileEventAggregate> aggregates) {
    this.aggregates = aggregates;
  }

  public long getStartTime() {
    return startTime;
  }

  public void setStartTime(long startTime) {
    this.startTime = startTime;
  }

  public long getEndTime() {
    return endTime;
  }

  public void setEndTime(long endTime) {
    this.endTime = endTime;
  }

 @Override
  public ProfileEventAggregateEmittedTuple clone() {
    ProfileEventAggregateEmittedTuple clone = new ProfileEventAggregateEmittedTuple();

    clone.setProfileType(this.profileType);
    clone.setKey(this.key);
    clone.setBusinessType(this.businessType);
    clone.setName(this.name);

    for (ProfileEventAggregate aggregate : this.aggregates) {
      clone.addAggregate(aggregate.clone());
    }
    return clone;
  }

public class ProfileEventAggregate  implements Cloneable, Serializable {

  private String entityType;
  private Map<String, Object> dimension =new LinkedHashMap<String, Object>();
  private Map<String, Object> metrics = new LinkedHashMap<String, Object>();

  public Map<String, Object> getDimension() {
    return dimension;
  }

  public void setDimension(Map<String, Object> dimension) {
    this.dimension.putAll(dimension);
  }

  public void addDimension(String dimensionKey, Object dimensionValue) {
    this.dimension.put(dimensionKey, dimensionValue);
  }

  public Map<String, Object> getMetrics() {
    return metrics;
  }
  public void addMetric(String metricKey, Object metricValue) {
    this.metrics.put(metricKey, metricValue);
  }
  public void setMetrics(Map<String, Object> metrics) {
    this.metrics.putAll(metrics);
  }
  public String getEntityType() {
    return entityType;
  }
  public void setEntityType(String entityType) {
    this.entityType = entityType;
  }

  @Override
  public ProfileEventAggregate clone()  {
    ProfileEventAggregate clone = new ProfileEventAggregate();

    clone.setEntityType(this.entityType);
    clone.getDimension().putAll(this.getDimension());
    clone.getMetrics().putAll(this.metrics);
    return clone;
  }

标签: javajava-8apache-flinkflink-streaming

解决方案


如果不这样做enableObjectReuse,则使用配置的序列化程序复制对象(似乎是 Avro?)。

在您的情况下,您使用 Map<String, Object> 无法推断出合理的模式。

最简单的解决方法是enableObjectReuse. 否则,请确保您的序列化程序与您的数据匹配。因此,您可以在您使用的地方添加一个单元测试,AvroSerializer#copy并确保您的 POJO 得到正确注释,如果您想坚持使用 Avro 反射,甚至更好地使用模式优先方法,您可以使用 Avro 模式生成 Java POJO并使用特定的 Avro .

让我们讨论一些替代方案:

  • 使用GenericRecord. 与其将其转换为 Java 类型,不如直接访问GenericRecord. 这通常是完整记录灵活的唯一方法(例如,您的工作接受任何输入并将其写入 S3)。
  • 非规范化架构。而不是有一些class Event { int id; Map<String, Object> data; }你会使用class EventInformation { int id; String predicate; Object value; }. 您需要对所有信息进行分组以进行处理。但是,您将在使用 Avro 时遇到相同类型的问题。
  • 使用宽模式。查看前面的方法,如果事先知道不同的谓词,那么您可以使用它来制作一个广泛的模式class Event { int id; Long predicate1; Integer predicate2; ... String predicateN; },其中所有条目都是可以为空的,并且其中大多数确实是null. 编码null非常便宜。
  • 抛弃 Avro。Avro 是全类型的。您可能想要使用更动态的东西。Protobuf 有Any来支持任意子消息。
  • 使用 Kryo。Kryo 可以序列化任意对象树,但代价是速度较慢且开销更大。

如果要写入数据,还需要考虑添加类型信息以进行正确反序列化的解决方案。例如,查看这个JSON 问题。但是还有更多的方法来实现它。


推荐阅读