apache-spark - Spark 作业读取数据框中已排序的 AVRO 文件,但无序写入 kafka
问题描述
我有按 ID 排序的 AVRO 文件,每个 ID 都有一个名为“ID=234”的文件夹,文件夹内的数据是 AVRO 格式并根据日期排序。我正在运行 spark 作业,它采用输入路径并在数据帧中读取 avro。然后,此数据帧以 5 个分区写入 kafka 主题。
val properties: Properties = getProperties(args)
val spark = SparkSession.builder().master(properties.getProperty("master"))
.appName(properties.getProperty("appName")).getOrCreate()
val sqlContext = spark.sqlContext
val sourcePath = properties.getProperty("sourcePath")
val dataDF = sqlContext.read.avro(sourcePath).as("data")
val count = dataDF.count();
val schemaRegAdd = properties.getProperty("schemaRegistry")
val schemaRegistryConfs = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> schemaRegAdd,
SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME
)
val start = Instant.now
dataDF.select(functions.struct(properties.getProperty("message.key.name")).alias("key"), functions.struct("*").alias("value"))
.toConfluentAvroWithPlainKey(properties.getProperty("topic"), properties.getProperty("schemaName"),
properties.getProperty("schemaNamespace"))(schemaRegistryConfs)
.write.format("kafka")
.option("kafka.bootstrap.servers",properties.getProperty("kafka.brokers"))
.option("topic",properties.getProperty("topic")).save()
}
我的用例是按顺序写入来自每个 ID(按日期排序)的所有消息,例如应首先添加来自一个 ID 1 的所有排序数据,然后从 ID 2 添加,依此类推。Kafka 消息具有密钥作为 ID。
解决方案
不要忘记,当你进行转换时,RDD/数据集中的数据是随机的,所以你会丢失顺序。
实现这一点的最佳方法是一个一个地读取文件并将其发送到 kafka 而不是读取您的完整目录val sourcePath = properties.getProperty("sourcePath")
推荐阅读
- sql - 查询以获取最大月数的值的总和
- javascript - 我如何独立使用 Babel 将 html 导入转换为变量?
- php - Foreach 无法正常工作 php in_array
- python - 在熊猫数据框中抖动几乎重复的行
- javascript - 创建一个按钮来隐藏/显示 d3.js 中的矩形和文本
- amazon-web-services - Amazon Managed Blockchain 的性能指标
- mysql - #1054 - 从 Mysql 导出到 MariaDB 后 order 子句中的未知列
- java - 在 IBM Domino 中创建重定向的 HTTP 请求的方式
- ruby-on-rails - 有人可以解释这种查找方法的行为吗?
- android - Flutter,TextFormField 的内容填充未按预期工作