首页 > 解决方案 > oaspark.sql.sources.v2.writer.DataWriter.writeRecord 实现中出现重复记录?

问题描述

我们目前正在探索 Apache Spark(使用 Hadoop)来执行大规模数据转换(在 Java 中)。

我们正在使用新的(和实验性的)DataSourceV2 接口来构建我们的自定义输出数据文件。它的一个组件是org.apache.spark.sql.sources.v2.writer.DataWriter 接口的实现。这一切都很好,除了一个问题:

org.apache.spark.sql.sources.v2.writer.DataWriter.write(record)方法经常(但不总是)为相同的输入记录调用两次。

我希望这里有足够的代码让您了解我们正在做的事情的要点:

基本上,我们有许多大型输入数据集,我们通过 Spark 应用程序将这些输入数据放入 Hadoop 表中,使用如下代码:

  final Dataset<Row> jdbcTableDataset = sparkSession.read()
    .format("jdbc")
    .option("url", sqlServerUrl)
    .option("dbtable", tableName)
    .option("user", jdbcUser)
    .option("password", jdbcPassword)
    .load();

  final DataFrameWriter<Row> dataFrameWriter = jdbcTableDataset.write();
  dataFrameWriter.save(hdfsDestination + "/" + tableName);

大约有五十张这样的桌子,物有所值。我知道数据中没有重复项,因为dataFrameWriter.count()dataFrameWriter.distinct().count() 返回相同的值。

转换过程涉及对这些表执行连接操作,并将结果以自定义格式写入(共享)文件系统中的文件。结果行包含一个唯一键、一dataGroup列、一dataSubGroup列和大约 40 个其他列。所选记录按dataGroup,dataSubGroup和 键排序。

每个输出文件由dataGroup列区分,用于对write操作进行分区:

  final Dataset<Row> selectedData = dataSelector.selectData();
  selectedData
    .write()
    .partitionBy("dataGroup")
    .format("au.com.mycompany.myformat.DefaultSource")
    .save("/path/to/shared/directory/");

为了让您了解规模,生成的选定数据由 56000 万条记录组成,在大约 3000 个dataGroup文件之间不均匀分布。很大,但不是很大。

巧妙地partitionBy("dataGroup")确保每个dataGroup文件都由单个执行程序处理。到目前为止,一切都很好。

我的数据源实现了新的(和实验性的)DataSourceV2 接口:

package au.com.mycompany.myformat;

import java.io.Serializable;
import java.util.Optional;

import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSource implements DataSourceRegister, WriteSupport , Serializable {

    private static final Logger logger = LoggerFactory.getLogger(DefaultSource.class);

    public DefaultSource() {
        logger.info("created");
    }

    @Override
    public String shortName() {
        logger.info("shortName");
        return "myformat";
    }

    @Override
    public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
        return Optional.of(new MyFormatSourceWriter(writeUUID, schema, mode, options));
    }

}

有一个DataSourceWriter实现:

public class MyFormatSourceWriter implements DataSourceWriter, Serializable {

  ...

}

和一个DataSourceWriterFactory实现:

public class MyDataWriterFactory implements DataWriterFactory<InternalRow> {

  ...

}

最后是一个DataWriter实现。似乎 aDataWriter已创建并发送给每个执行者。因此,每个都DataWriter将处理许多数据组。

每条记录都有一个唯一的键列。

public class MyDataWriter implements DataWriter<InternalRow>, Serializable {

  private static final Logger logger = LoggerFactory.getLogger(XcdDataWriter.class);

  ...

  MyDataWriter(File buildDirectory, StructType schema, int partitionId) {
      this.buildDirectory = buildDirectory;
      this.schema = schema;
      this.partitionId = partitionId;
      logger.debug("Created MyDataWriter for partition {}", partitionId);
  }

  private String getFieldByName(InternalRow row, String fieldName) {
      return Optional.ofNullable(row.getUTF8String(schema.fieldIndex(fieldName)))
        .orElse(UTF8String.EMPTY_UTF8)
        .toString();
  }

  /**
   * Rows are written here. Each row has a unique key column as well as a dataGroup
   * column. Right now we are frequently getting called with the same record twice.
   */ 
  @Override
  public void write(InternalRow record) throws IOException {
      String nextDataFileName = getFieldByName(record, "dataGroup") + ".myExt";

      // some non-trivial logic for determining the right output file
      ...

      // write the output record
        outputWriter.append(getFieldByName(row, "key")).append(',')
          .append(getFieldByName(row, "prodDate")).append(',')
          .append(getFieldByName(row, "nation")).append(',')
          .append(getFieldByName(row, "plant")).append(',')
        ...              

  }

  @Override
  public WriterCommitMessage commit() throws IOException {
      ...
      outputWriter.close();
      ...
      logger.debug("Committed partition {} with {} data files for zip file {} for a total of {} zip files",
        partitionId, dataFileCount, dataFileName, dataFileCount);
      return new MyWriterCommitMessage(partitionId, dataFileCount);
  }

  @Override
  public void abort() throws IOException {
      logger.error("Failed to collect data for schema: {}", schema);
      ...
  }

}

现在我正在通过跟踪处理的最后一个键并忽略重复项来解决这个问题。

标签: apache-sparkapache-spark-sql

解决方案


推荐阅读