首页 > 解决方案 > 在 AWS EMR 中运行时,StreamingFileSink 批量写入器会导致一些检查点错误

问题描述

无法使用 StreamingFileSink 并以压缩方式存储传入事件。

我正在尝试使用 StreamingFileSink 将无界事件流写入 S3。在此过程中,我想压缩数据以更好地利用可用的存储大小。

SequenceFileWriterFactory我从 flink借用了一些代码,编写了一个压缩字符串编写器。除了我在下面描述的例外,它失败了。

如果我尝试使用BucketingSink,效果很好。使用 BucketingSink,我接近压缩字符串写入如下。同样,我从其他一些拉取请求中借用了这段代码。

import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;

import java.io.IOException;

public class CompressionStringWriter<T> extends StreamWriterBase<T> implements Writer<T> {

    private static final long serialVersionUID = 3231207311080446279L;

    private String codecName;

    private String separator;

    public String getCodecName() {
        return codecName;
    }

    public String getSeparator() {
        return separator;
    }

    private transient CompressionOutputStream compressedOutputStream;

    public CompressionStringWriter(String codecName, String separator) {
        this.codecName = codecName;
        this.separator = separator;
    }

    public CompressionStringWriter(String codecName) {
        this(codecName, System.lineSeparator());
    }

    protected CompressionStringWriter(CompressionStringWriter<T> other) {
        super(other);
        this.codecName = other.codecName;
        this.separator = other.separator;
    }

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        super.open(fs, path);
        Configuration conf = fs.getConf();
        CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
        CompressionCodec codec = codecFactory.getCodecByName(codecName);
        if (codec == null) {
            throw new RuntimeException("Codec " + codecName + " not found");
        }
        Compressor compressor = CodecPool.getCompressor(codec, conf);
        compressedOutputStream = codec.createOutputStream(getStream(), compressor);

    }

    @Override
    public void close() throws IOException {
        if (compressedOutputStream != null) {
            compressedOutputStream.close();
            compressedOutputStream = null;
        } else {
            super.close();
        }
    }

    @Override
    public void write(Object element) throws IOException {
        getStream();
        compressedOutputStream.write(element.toString().getBytes());
        compressedOutputStream.write(this.separator.getBytes());
    }

    @Override
    public CompressionStringWriter<T> duplicate() {
        return new CompressionStringWriter<>(this);
    }
}
BucketingSink<DeviceEvent> bucketingSink = new BucketingSink<>("s3://"+ this.bucketName + "/" + this.objectPrefix);
        bucketingSink
                .setBucketer(new OrgIdBasedBucketAssigner())
                .setWriter(new CompressionStringWriter<DeviceEvent>("Gzip", "\n"))
                .setPartPrefix("file-")
                .setPartSuffix(".gz")
                .setBatchSize(1_500_000);

BucketingSink作品的那个。

现在我使用 StreamingFileSink 的代码片段涉及以下代码集。

import org.apache.flink.api.common.serialization.BulkWriter;

import java.io.IOException;

public class CompressedStringBulkWriter<T> implements BulkWriter<T> {

    private final CompressedStringWriter compressedStringWriter;

    public CompressedStringBulkWriter(final CompressedStringWriter compressedStringWriter) {
        this.compressedStringWriter = compressedStringWriter;
    }

    @Override
    public void addElement(T element) throws IOException {
        this.compressedStringWriter.write(element);
    }

    @Override
    public void flush() throws IOException {
        this.compressedStringWriter.flush();
    }

    @Override
    public void finish() throws IOException {
        this.compressedStringWriter.close();
    }
}
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class CompressedStringBulkWriterFactory<T> implements BulkWriter.Factory<T> {

    private SerializableHadoopConfiguration serializableHadoopConfiguration;

    public CompressedStringBulkWriterFactory(final Configuration hadoopConfiguration) {
        this.serializableHadoopConfiguration = new SerializableHadoopConfiguration(hadoopConfiguration);
    }

    @Override
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
        return new CompressedStringBulkWriter(new CompressedStringWriter(out, serializableHadoopConfiguration.get(), "Gzip", "\n"));
    }
}
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;

public class CompressedStringWriter<T> implements Serializable {

    private static final Logger LOG = LoggerFactory.getLogger(CompressedStringWriter.class);

    private static final long serialVersionUID = 2115292142239557448L;


    private String separator;

    private transient CompressionOutputStream compressedOutputStream;

    public CompressedStringWriter(FSDataOutputStream out, Configuration hadoopConfiguration, String codecName, String separator) {
        this.separator = separator;
        try {
            Preconditions.checkNotNull(hadoopConfiguration, "Unable to determine hadoop configuration using path");
            CompressionCodecFactory codecFactory = new CompressionCodecFactory(hadoopConfiguration);
            CompressionCodec codec = codecFactory.getCodecByName(codecName);
            Preconditions.checkNotNull(codec, "Codec " + codecName + " not found");
            LOG.info("The codec name that was loaded from hadoop {}", codec);
            Compressor compressor = CodecPool.getCompressor(codec, hadoopConfiguration);
            this.compressedOutputStream = codec.createOutputStream(out, compressor);
            LOG.info("Setup a compressor for codec {} and compressor {}", codec, compressor);
        } catch (IOException ex) {
            throw new RuntimeException("Unable to compose a hadoop compressor for the path", ex);
        }
    }

    public void flush() throws IOException {
        if (compressedOutputStream != null) {
            compressedOutputStream.flush();
        }
    }

    public void close() throws IOException {
        if (compressedOutputStream != null) {
            compressedOutputStream.close();
            compressedOutputStream = null;
        }
    }

    public void write(T element) throws IOException {
        compressedOutputStream.write(element.toString().getBytes());
        compressedOutputStream.write(this.separator.getBytes());
    }
}
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

public class SerializableHadoopConfiguration implements Serializable {

    private static final long serialVersionUID = -1960900291123078166L;

    private transient Configuration hadoopConfig;

    SerializableHadoopConfiguration(Configuration hadoopConfig) {
        this.hadoopConfig = hadoopConfig;
    }

    Configuration get() {
        return this.hadoopConfig;
    }

    // --------------------
    private void writeObject(ObjectOutputStream out) throws IOException {
        this.hadoopConfig.write(out);
    }

    private void readObject(ObjectInputStream in) throws IOException {
        final Configuration config = new Configuration();
        config.readFields(in);

        if (this.hadoopConfig == null) {
            this.hadoopConfig = config;
        }
    }
}

我实际的 flink 工作

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kinesisConsumerConfig = new Properties();
...
...
DataStream<DeviceEvent> kinesis =
                env.addSource(new FlinkKinesisConsumer<>(this.streamName, new DeviceEventSchema(), kinesisConsumerConfig)).name("source")
                .setParallelism(16)
                .setMaxParallelism(24);

final StreamingFileSink<DeviceEvent> bulkCompressStreamingFileSink = StreamingFileSink.<DeviceEvent>forBulkFormat(
                path,
                new CompressedStringBulkWriterFactory<>(
                        BucketingSink.createHadoopFileSystem(
                                new Path("s3a://"+ this.bucketName + "/" + this.objectPrefix),
                                null).getConf()))
                .withBucketAssigner(new OrgIdBucketAssigner())
                .build();

deviceEventDataStream.addSink(bulkCompressStreamingFileSink).name("bulkCompressStreamingFileSink").setParallelism(16);

env.execute();

我希望数据作为多个文件保存在 S3 中。不幸的是,没有创建任何文件。

在日志中,我看到以下异常

2019-05-15 22:17:20,855 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: bulkCompressStreamingFileSink (11/16) (c73684c10bb799a6e0217b6795571e22) switched from RUNNING to FAILED.
java.lang.Exception: Could not perform checkpoint 1 for operator Sink: bulkCompressStreamingFileSink (11/16).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Sink: bulkCompressStreamingFileSink (11/16).
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
    ... 8 more
Caused by: java.io.IOException: Stream closed.
    at org.apache.flink.fs.s3.common.utils.RefCountedFile.requireOpened(RefCountedFile.java:117)
    at org.apache.flink.fs.s3.common.utils.RefCountedFile.write(RefCountedFile.java:74)
    at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:105)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:199)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:166)
    at org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)

所以想知道,我错过了什么。

我正在使用最新的 AWS EMR (5.23)。

标签: javaapache-flinkamazon-emrflink-streaming

解决方案


CompressedStringBulkWriter#close()中,您正在调用该close()方法,该方法CompressionCodecStream也关闭了底层流,即 Flink 的FSDataOutputStream。Flink 内部必须打开它才能正确完成检查点,以保证流可恢复。这就是为什么你得到

Caused by: java.io.IOException: Stream closed.
    at org.apache.flink.fs.s3.common.utils.RefCountedFile.requireOpened(RefCountedFile.java:117)
    at org.apache.flink.fs.s3.common.utils.RefCountedFile.write(RefCountedFile.java:74)
    at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:105)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:199)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:166)

因此compressedOutputStream.close(),使用, 而不是compressedOutputStream.finish()将缓冲区中的所有内容刷新到输出流而不关闭它。顺便说一句,在最新版本的 Flink 中有一个内置的 HadoopCompressionBulkWriter,你也可以使用它。


推荐阅读