scala - 序列化 Guava 的 MinMaxPriorityQueue
问题描述
经过几天的研究,为什么我的Flink 应用程序无法正常工作,我得出的结论是问题出在MinMaxPriorityQueue
我正在使用的应用程序上。
看来这个结构是不可序列化的。我尝试了几种方法来序列化它:
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[JavaSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], classOf[ProtobufSerializer]);
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
他们都没有运气。
但是我发现了这个:Serializing Guava's ImmutableTable
是否有等效于 MinMaxPriorityQueue 或序列化它的方法?
更新
我已经将 Tomasz 翻译成 scala:
class MinMaxPriorityQueueSerializer extends Serializer[MinMaxPriorityQueue[Object]] {
private[this] val log = LoggerFactory.getLogger(this.getClass)
setImmutable(false)
setAcceptsNull(false)
val OPTIMIZE_POSITIVE = true
override def read(kryo: Kryo, input: Input, aClass: Class[MinMaxPriorityQueue[Object]]): MinMaxPriorityQueue[Object] = {
log.error("Kryo READ")
val comparator: Ordering[Object] = kryo.readClassAndObject(input).asInstanceOf[Ordering[Object]]
val size = input.readInt(OPTIMIZE_POSITIVE)
val queue: MinMaxPriorityQueue[Object] = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create()
(0 to size).foreach(_ => queue.offer(kryo.readClassAndObject(input)))
queue
}
override def write(kryo: Kryo, output: Output, queue: MinMaxPriorityQueue[Object]): Unit = {
log.error("Kryo WRITE")
kryo.writeClassAndObject(output, queue.comparator)
val declaredSize = queue.size
output.writeInt(declaredSize, OPTIMIZE_POSITIVE)
val actualSize = queue.toArray.foldLeft(0) {
case (z, q) =>
kryo.writeClassAndObject(output, q)
z + 1
}
Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize)
}
}
并在 flink 中设置 kryo 以使用该序列化器:
env.getConfig.addDefaultKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
env.getConfig.registerTypeWithKryoSerializer(classOf[MinMaxPriorityQueue[Double]], classOf[MinMaxPriorityQueueSerializer])
然而,它似乎永远不会被调用,因为我在日志中的任何地方都没有看到log.error("Kryo READ")
和log.error("Kryo WRITE")
并且转换仍然返回一个空的 MinMaxPriorityQueue,即使我正在更新它。
更新 2
我已经实现了 SerializerTester,但我得到了一个 bufferUnderflow:
object Main {
def main(args: Array[String]) {
val tester = new MinMaxPriorityQueueSerializerTester()
val inQueue: MinMaxPriorityQueue[java.lang.Double] = MinMaxPriorityQueue.create()
inQueue.add(1.0)
val outputStream = new ByteArrayOutputStream()
tester.serialize(outputStream, inQueue)
val inputStream = new ByteArrayInputStream(outputStream.toByteArray())
val outQueue: MinMaxPriorityQueue[java.lang.Double] = tester.deserialize(inputStream);
System.out.println(inQueue);
System.out.println(outQueue);
}
class MinMaxPriorityQueueSerializerTester {
val kryo = new Kryo
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy)
registerMinMaxSerializer();
// allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
def registerMinMaxSerializer() {
kryo.addDefaultSerializer(classOf[MinMaxPriorityQueue[java.lang.Double]], new MinMaxPriorityQueueSerializer());
}
def serialize(out: OutputStream, queue: MinMaxPriorityQueue[java.lang.Double]) {
// try (Output output = new Output(out)) {
val output = new Output(out)
kryo.writeClassAndObject(output, queue)
// kryo.writeObject(output, queue)
//}
output.flush
}
def deserialize(in: InputStream): MinMaxPriorityQueue[java.lang.Double] = {
//try (Input input = new Input(in)) {
val input = new Input(in)
//kryo.readObject(input, classOf[MinMaxPriorityQueue[java.lang.Double]])
kryo.readClassAndObject(input).asInstanceOf[MinMaxPriorityQueue[java.lang.Double]]
//p}
}
}
解决方案
您可以使用自定义 Kryo Serializer
。
这是一个示例(在 Java 中):
class MinMaxPriorityQueueSerializer extends Serializer<MinMaxPriorityQueue<Object>> {
private static final boolean OPTIMIZE_POSITIVE = true;
protected MinMaxPriorityQueueSerializer() {
setAcceptsNull(false);
setImmutable(false);
}
@Override
public void write(Kryo kryo, Output output, MinMaxPriorityQueue<Object> queue) {
kryo.writeClassAndObject(output, queue.comparator());
int declaredSize = queue.size();
output.writeInt(declaredSize, OPTIMIZE_POSITIVE);
int actualSize = 0;
for (Object element : queue) {
kryo.writeClassAndObject(output, element);
actualSize++;
}
Preconditions.checkState(
declaredSize == actualSize,
"Declared size (%s) different than actual size (%s)", declaredSize, actualSize
);
}
@Override
public MinMaxPriorityQueue<Object> read(Kryo kryo, Input input, Class<MinMaxPriorityQueue<Object>> type) {
@SuppressWarnings("unchecked")
Comparator<Object> comparator = (Comparator<Object>) kryo.readClassAndObject(input);
int size = input.readInt(OPTIMIZE_POSITIVE);
MinMaxPriorityQueue<Object> queue = MinMaxPriorityQueue.orderedBy(comparator)
.expectedSize(size)
.create();
for (int i = 0; i < size; ++i) {
queue.offer(kryo.readClassAndObject(input));
}
return queue;
}
}
以下是您可以如何使用它:
class MinMaxPriorityQueueSerializerTester {
public static void main(String[] args) {
MinMaxPriorityQueueSerializerTester tester = new MinMaxPriorityQueueSerializerTester();
MinMaxPriorityQueue<Integer> inQueue = MinMaxPriorityQueue.<Integer>orderedBy(Comparator.reverseOrder())
.create(Arrays.asList(5, 2, 7, 2, 4));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
tester.serialize(outputStream, inQueue);
ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
@SuppressWarnings("unchecked")
MinMaxPriorityQueue<Integer> outQueue = (MinMaxPriorityQueue<Integer>) tester.deserialize(inputStream);
System.out.println(inQueue);
System.out.println(outQueue);
}
private final Kryo kryo;
public MinMaxPriorityQueueSerializerTester() {
this.kryo = new Kryo();
registerMinMaxSerializer();
allowForClassesWithoutNoArgConstructor(); // needed to serialize Ordering
}
private void registerMinMaxSerializer() {
kryo.addDefaultSerializer(MinMaxPriorityQueue.class, new MinMaxPriorityQueueSerializer());
}
private void allowForClassesWithoutNoArgConstructor() {
((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
}
public void serialize(OutputStream out, MinMaxPriorityQueue<?> queue) {
try (Output output = new Output(out)) {
kryo.writeObject(output, queue);
}
}
public MinMaxPriorityQueue<?> deserialize(InputStream in) {
try (Input input = new Input(in)) {
return kryo.readObject(input, MinMaxPriorityQueue.class);
}
}
}
推荐阅读
- javascript - 如何使用 create-react-app 设置 Jest Watch 插件
- sorting - 如何使用柏树找到哪个变量更大
- azure - 无法在 Swagger 中进行身份验证
- html - 用于在大量链接列表中填充标签的“标题”属性的复杂(?)正则表达式
- angular - 我们可以将两个布尔变量中的任何一个传递给子组件像这样
- jquery - Jquery - 查找/每个触发两次
- python - 如何使用python每5分钟下载和保存交换数据
- python - Mechanize 在所需网站中未找到任何表格
- sabre - Sabre CreatePassengerNameRecord - 使用 3 条目输入事实,TTY 请求挂起
- azure - 如何修改 Azure 部署服务的输入/输出架构?