apache-spark - oaspark.sql.sources.v2.writer.DataWriter.writeRecord 实现中出现重复记录?
问题描述
我们目前正在探索 Apache Spark(使用 Hadoop)来执行大规模数据转换(在 Java 中)。
我们正在使用新的(和实验性的)DataSourceV2 接口来构建我们的自定义输出数据文件。它的一个组件是org.apache.spark.sql.sources.v2.writer.DataWriter
接口的实现。这一切都很好,除了一个问题:
该org.apache.spark.sql.sources.v2.writer.DataWriter.write(record)
方法经常(但不总是)为相同的输入记录调用两次。
我希望这里有足够的代码让您了解我们正在做的事情的要点:
基本上,我们有许多大型输入数据集,我们通过 Spark 应用程序将这些输入数据放入 Hadoop 表中,使用如下代码:
final Dataset<Row> jdbcTableDataset = sparkSession.read()
.format("jdbc")
.option("url", sqlServerUrl)
.option("dbtable", tableName)
.option("user", jdbcUser)
.option("password", jdbcPassword)
.load();
final DataFrameWriter<Row> dataFrameWriter = jdbcTableDataset.write();
dataFrameWriter.save(hdfsDestination + "/" + tableName);
大约有五十张这样的桌子,物有所值。我知道数据中没有重复项,因为dataFrameWriter.count()
并dataFrameWriter.distinct().count()
返回相同的值。
转换过程涉及对这些表执行连接操作,并将结果以自定义格式写入(共享)文件系统中的文件。结果行包含一个唯一键、一dataGroup
列、一dataSubGroup
列和大约 40 个其他列。所选记录按dataGroup
,dataSubGroup
和 键排序。
每个输出文件由dataGroup
列区分,用于对write
操作进行分区:
final Dataset<Row> selectedData = dataSelector.selectData();
selectedData
.write()
.partitionBy("dataGroup")
.format("au.com.mycompany.myformat.DefaultSource")
.save("/path/to/shared/directory/");
为了让您了解规模,生成的选定数据由 56000 万条记录组成,在大约 3000 个dataGroup
文件之间不均匀分布。很大,但不是很大。
巧妙地partitionBy("dataGroup")
确保每个dataGroup
文件都由单个执行程序处理。到目前为止,一切都很好。
我的数据源实现了新的(和实验性的)DataSourceV2 接口:
package au.com.mycompany.myformat;
import java.io.Serializable;
import java.util.Optional;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultSource implements DataSourceRegister, WriteSupport , Serializable {
private static final Logger logger = LoggerFactory.getLogger(DefaultSource.class);
public DefaultSource() {
logger.info("created");
}
@Override
public String shortName() {
logger.info("shortName");
return "myformat";
}
@Override
public Optional<DataSourceWriter> createWriter(String writeUUID, StructType schema, SaveMode mode, DataSourceOptions options) {
return Optional.of(new MyFormatSourceWriter(writeUUID, schema, mode, options));
}
}
有一个DataSourceWriter
实现:
public class MyFormatSourceWriter implements DataSourceWriter, Serializable {
...
}
和一个DataSourceWriterFactory
实现:
public class MyDataWriterFactory implements DataWriterFactory<InternalRow> {
...
}
最后是一个DataWriter
实现。似乎 aDataWriter
已创建并发送给每个执行者。因此,每个都DataWriter
将处理许多数据组。
每条记录都有一个唯一的键列。
public class MyDataWriter implements DataWriter<InternalRow>, Serializable {
private static final Logger logger = LoggerFactory.getLogger(XcdDataWriter.class);
...
MyDataWriter(File buildDirectory, StructType schema, int partitionId) {
this.buildDirectory = buildDirectory;
this.schema = schema;
this.partitionId = partitionId;
logger.debug("Created MyDataWriter for partition {}", partitionId);
}
private String getFieldByName(InternalRow row, String fieldName) {
return Optional.ofNullable(row.getUTF8String(schema.fieldIndex(fieldName)))
.orElse(UTF8String.EMPTY_UTF8)
.toString();
}
/**
* Rows are written here. Each row has a unique key column as well as a dataGroup
* column. Right now we are frequently getting called with the same record twice.
*/
@Override
public void write(InternalRow record) throws IOException {
String nextDataFileName = getFieldByName(record, "dataGroup") + ".myExt";
// some non-trivial logic for determining the right output file
...
// write the output record
outputWriter.append(getFieldByName(row, "key")).append(',')
.append(getFieldByName(row, "prodDate")).append(',')
.append(getFieldByName(row, "nation")).append(',')
.append(getFieldByName(row, "plant")).append(',')
...
}
@Override
public WriterCommitMessage commit() throws IOException {
...
outputWriter.close();
...
logger.debug("Committed partition {} with {} data files for zip file {} for a total of {} zip files",
partitionId, dataFileCount, dataFileName, dataFileCount);
return new MyWriterCommitMessage(partitionId, dataFileCount);
}
@Override
public void abort() throws IOException {
logger.error("Failed to collect data for schema: {}", schema);
...
}
}
现在我正在通过跟踪处理的最后一个键并忽略重复项来解决这个问题。
解决方案
推荐阅读
- linux - 使用 SHIFT+ARROW(左或右)的 Ubuntu 终端上的文本选择不起作用
- sql - 递归获取 Postgres 表中树的根 ID
- javascript - 如何判断是否有 href 属性?
- javascript - 内容脚本无法在所有网页上正常运行
- python - Python:statsmodels - .predict(X) 实际预测什么?
- javascript - Django AJAX 调用上的空 request.POST
- c# - 是否有等效于 IServiceCollection 的“TryAdd”方法与 IOptions 一起使用
图案? - javascript - 为什么 Node.js(文件名)会导致 Windows 上的 Node.JS 出现问题?
- iptables - 如何使用 BPF 定位 TCP 数据包中的 MSS 值
- odbc - Sqlcmd:错误:Microsoft ODBC Driver 17 for SQL Server:未找到数据源名称且未指定默认驱动程序