java - 使用 Object 类的 Map 收集对象时的 Flink 收集器问题
问题描述
我面临一个问题,当我从 flink flatmap 收集器收集对象时,我没有正确收集到值。我得到了对象引用,它没有给我实际的价值。
dataStream.filter(new FilterFunction<GenericRecord>() {
@Override
public boolean filter(GenericRecord record) throws Exception {
if (record.get("user_id") != null) {
return true;
}
return false;
}
}).flatMap(new ProfileEventAggregateFlatMapFunction(aggConfig))
.map(new MapFunction<ProfileEventAggregateEmittedTuple, String>() {
@Override
public String map(
ProfileEventAggregateEmittedTuple profileEventAggregateEmittedTupleNew)
throws Exception {
String res=null;
try {
ObjectMapper mapper = new ObjectMapper();
mapper.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
res= mapper.writeValueAsString(profileEventAggregateEmittedTupleNew);
} catch (Exception e) {
e.printStackTrace();
}
return res;
}
}).print();
public class ProfileEventAggregateFlatMapFunction extends
RichFlatMapFunction<GenericRecord, ProfileEventAggregateEmittedTuple> {
private final ProfileEventAggregateTupleEmitter aggregator;
ObjectMapper mapper = ObjectMapperPool.getInstance().get();
public ProfileEventAggregateFlatMapFunction(String config) throws IOException {
this.aggregator = new ProfileEventAggregateTupleEmitter(config);
}
@Override
public void flatMap(GenericRecord event,
Collector<ProfileEventAggregateEmittedTuple> collector) throws Exception {
try {
List<ProfileEventAggregateEmittedTuple> aggregateTuples = aggregator.runAggregates(event);
for (ProfileEventAggregateEmittedTuple tuple : aggregateTuples) {
collector.collect(tuple);
}
}}
调试结果:我在收集器中收集的元组
tuple = {ProfileEventAggregateEmittedTuple@7880}
profileType = "userprofile"
key = "1152473"
businessType = "keyless"
name = "consumer"
aggregates = {ArrayList@7886} size = 1
0 = {ProfileEventAggregate@7888} "geo_id {geo_id=1} {keyless_select_destination_cnt=1, total_estimated_distance=12.5}"
entityType = "geo_id"
dimension = {LinkedHashMap@7891} size = 1
"geo_id" -> {Integer@7897} 1
key = "geo_id"
value = {Integer@7897} 1
metrics = {LinkedHashMap@7892} size = 2
"keyless_select_destination_cnt" -> {Long@7773} 1
key = "keyless_select_destination_cnt"
value = {Long@7773} 1
"total_estimated_distance" -> {Double@7904} 12.5
key = "total_estimated_distance"
value = {Double@7904} 12.5
这是我在我的地图函数中得到的 .map(new MapFunction<ProfileEventAggregateEmittedTuple, String>()
profileEventAggregateEmittedTuple = {ProfileEventAggregateEmittedTuple@7935}
profileType = "userprofile"
key = "1152473"
businessType = "keyless"
name = "consumer"
aggregates = {GenericData$Array@7948} size = 1
0 = {ProfileEventAggregate@7950} "geo_id {geo_id=java.lang.Object@863dce2} {keyless_select_destination_cnt=java.lang.Object@7cdb4bfc, total_estimated_distance=java.lang.Object@52e81f57}"
entityType = "geo_id"
dimension = {HashMap@7952} size = 1
"geo_id" -> {Object@7957}
key = "geo_id"
value = {Object@7957}
Class has no fields
metrics = {HashMap@7953} size = 2
"keyless_select_destination_cnt" -> {Object@7962}
key = "keyless_select_destination_cnt"
value = {Object@7962}
Class has no fields
"total_estimated_distance" -> {Object@7963}
请帮助我了解发生了什么,为什么我没有得到正确的数据。
public class ProfileEventAggregateEmittedTuple implements Cloneable, Serializable {
private String profileType;
private String key;
private String businessType;
private String name;
private List<ProfileEventAggregate> aggregates = new ArrayList<ProfileEventAggregate>();
private long startTime;
private long endTime;
public String getProfileType() {
return profileType;
}
public void setProfileType(String profileType) {
this.profileType = profileType;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getBusinessType() {
return businessType;
}
public void setBusinessType(String businessType) {
this.businessType = businessType;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public List<ProfileEventAggregate> getAggregates() {
return aggregates;
}
public void addAggregate(ProfileEventAggregate aggregate) {
this.aggregates.add(aggregate);
}
public void setAggregates(List<ProfileEventAggregate> aggregates) {
this.aggregates = aggregates;
}
public long getStartTime() {
return startTime;
}
public void setStartTime(long startTime) {
this.startTime = startTime;
}
public long getEndTime() {
return endTime;
}
public void setEndTime(long endTime) {
this.endTime = endTime;
}
@Override
public ProfileEventAggregateEmittedTuple clone() {
ProfileEventAggregateEmittedTuple clone = new ProfileEventAggregateEmittedTuple();
clone.setProfileType(this.profileType);
clone.setKey(this.key);
clone.setBusinessType(this.businessType);
clone.setName(this.name);
for (ProfileEventAggregate aggregate : this.aggregates) {
clone.addAggregate(aggregate.clone());
}
return clone;
}
public class ProfileEventAggregate implements Cloneable, Serializable {
private String entityType;
private Map<String, Object> dimension =new LinkedHashMap<String, Object>();
private Map<String, Object> metrics = new LinkedHashMap<String, Object>();
public Map<String, Object> getDimension() {
return dimension;
}
public void setDimension(Map<String, Object> dimension) {
this.dimension.putAll(dimension);
}
public void addDimension(String dimensionKey, Object dimensionValue) {
this.dimension.put(dimensionKey, dimensionValue);
}
public Map<String, Object> getMetrics() {
return metrics;
}
public void addMetric(String metricKey, Object metricValue) {
this.metrics.put(metricKey, metricValue);
}
public void setMetrics(Map<String, Object> metrics) {
this.metrics.putAll(metrics);
}
public String getEntityType() {
return entityType;
}
public void setEntityType(String entityType) {
this.entityType = entityType;
}
@Override
public ProfileEventAggregate clone() {
ProfileEventAggregate clone = new ProfileEventAggregate();
clone.setEntityType(this.entityType);
clone.getDimension().putAll(this.getDimension());
clone.getMetrics().putAll(this.metrics);
return clone;
}
解决方案
如果不这样做enableObjectReuse
,则使用配置的序列化程序复制对象(似乎是 Avro?)。
在您的情况下,您使用 Map<String, Object> 无法推断出合理的模式。
最简单的解决方法是enableObjectReuse
. 否则,请确保您的序列化程序与您的数据匹配。因此,您可以在您使用的地方添加一个单元测试,AvroSerializer#copy
并确保您的 POJO 得到正确注释,如果您想坚持使用 Avro 反射,甚至更好地使用模式优先方法,您可以使用 Avro 模式生成 Java POJO并使用特定的 Avro .
让我们讨论一些替代方案:
- 使用
GenericRecord
. 与其将其转换为 Java 类型,不如直接访问GenericRecord
. 这通常是完整记录灵活的唯一方法(例如,您的工作接受任何输入并将其写入 S3)。 - 非规范化架构。而不是有一些
class Event { int id; Map<String, Object> data; }
你会使用class EventInformation { int id; String predicate; Object value; }
. 您需要对所有信息进行分组以进行处理。但是,您将在使用 Avro 时遇到相同类型的问题。 - 使用宽模式。查看前面的方法,如果事先知道不同的谓词,那么您可以使用它来制作一个广泛的模式
class Event { int id; Long predicate1; Integer predicate2; ... String predicateN; }
,其中所有条目都是可以为空的,并且其中大多数确实是null
. 编码null
非常便宜。 - 抛弃 Avro。Avro 是全类型的。您可能想要使用更动态的东西。Protobuf 有Any来支持任意子消息。
- 使用 Kryo。Kryo 可以序列化任意对象树,但代价是速度较慢且开销更大。
如果要写入数据,还需要考虑添加类型信息以进行正确反序列化的解决方案。例如,查看这个JSON 问题。但是还有更多的方法来实现它。
推荐阅读
- python - 如何将一个文本文件中的行随机分割成两个不同的文本文件?
- python-3.x - Python - 按文件名复制某些文件的脚本
- azure - 使用代码从 Azure Databricks 笔记本中查找作业列表
- typescript - 为什么需要 await app.listen() for nestjs
- excel - Excel VBA循环遍历文本框多行中的每一行
- performance - 删除 pushgateway 中的指标会删除 prometheus 中的抓取指标
- javascript - React Material UI Toggle Switch in Loop with Database Update
- postgresql - TimescaleDB 中的经度和纬度 LOCF 值
- python-3.x - extras_require 如何覆盖
- java - 如何为 java swagger api 创建一个可重用的字段